Event Store
PostgreSQL-based event store for event sourcing, audit trails, and event replay capabilities.
Overview
The event store provides persistent, immutable storage for all domain events in the system. It enables:
- Event Sourcing: Rebuild application state from historical events
- Audit Trails: Complete history of all changes with metadata
- Event Replay: Reconstruct read models or migrate data
- Temporal Queries: Query events by time ranges, types, or aggregates
- Integration: Works with RabbitMQ event bus for real-time pub/sub
Architecture
┌─────────────────────────────────────────────────────────┐
│ Applications │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Marketplace │ │ Hooks │ │ Settings │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
└──────────┼──────────────────┼──────────────────┼─────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────┐
│ PostgreSQL Event Store │
│ ┌────────────────────────────────────────┐ │
│ │ Table: appserver.events │ │
│ │ - Append-only (immutable) │ │
│ │ - Indexed by type, topic, aggregate │ │
│ │ - JSONB metadata with GIN index │ │
│ └────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
│ │
│ Query/Replay │ Retention
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Replay │ │ Retention │
│ Service │ │ Manager │
└─────────────┘ └─────────────┘
The event store is separate from but complementary to the RabbitMQ event bus:
- Event Bus: Real-time event distribution to subscribers
- Event Store: Persistent event history for replay and auditing
Database Schema
CREATE TABLE appserver.events (
id UUID PRIMARY KEY,
event_type TEXT NOT NULL,
topic TEXT NOT NULL,
source_app_id UUID,
source_app_name TEXT,
aggregate_id UUID NOT NULL,
aggregate_type TEXT NOT NULL,
metadata JSONB,
payload_bytes BYTEA,
occurred_at TIMESTAMP NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- Indexes for common queries
CREATE INDEX idx_events_type ON appserver.events(event_type);
CREATE INDEX idx_events_topic ON appserver.events(topic);
CREATE INDEX idx_events_aggregate ON appserver.events(aggregate_id);
CREATE INDEX idx_events_occurred ON appserver.events(occurred_at);
CREATE INDEX idx_events_metadata ON appserver.events USING GIN(metadata);
-- Unique constraint for deduplication
ALTER TABLE appserver.events ADD CONSTRAINT events_id_unique UNIQUE (id);
Core Operations
Append Events
Store new events in the event store (append-only, immutable):
event := &domain.Event{
ID: uuid.New(),
Type: "app.registered",
Topic: "app.lifecycle.registered",
AggregateID: appID,
AggregateType: "app",
Metadata: map[string]interface{}{
"user_id": userID,
"correlation_id": correlationID,
},
Payload: payloadBytes,
OccurredAt: time.Now(),
Version: 1,
}
err := eventStore.Append(ctx, event)
Deduplication: Events with duplicate IDs are rejected (constraint violation).
Query Events
Query events with flexible filtering:
filter := &event.EventFilter{
EventTypes: []string{"app.registered", "app.installed"},
Topics: []string{"app.lifecycle.#"}, // RabbitMQ wildcard
AggregateID: &appID,
From: startTime,
To: endTime,
Limit: 100,
Offset: 0,
OrderBy: "occurred_at",
Ascending: false,
}
events, err := eventStore.Query(ctx, filter)
Supported Filters:
EventTypes- Array of event type stringsTopics- Array of topic patterns (supports RabbitMQ wildcards:*for one word,#for zero or more)AggregateID- Filter by specific aggregate UUIDAggregateType- Filter by aggregate type stringFrom/To- Time range filteringMetadata- JSONB contains queriesLimit/Offset- Pagination (default limit: 100)OrderBy- Sort field (default:occurred_at)Ascending- Sort direction (default: false)
Stream Events
Stream events via channels for large result sets:
filter := &event.EventFilter{
Topics: []string{"app.#"},
From: time.Now().Add(-24 * time.Hour),
}
eventChan, errChan := eventStore.Stream(ctx, filter)
for {
select {
case evt, ok := <-eventChan:
if !ok {
return // Stream closed
}
processEvent(evt)
case err := <-errChan:
handleError(err)
return
case <-ctx.Done():
return
}
}
Buffer Size: 100 events (configured in implementation)
Count Events
Count matching events without fetching:
filter := &event.EventFilter{
EventTypes: []string{"app.installed"},
From: startOfMonth,
}
count, err := eventStore.Count(ctx, filter)
Delete Old Events
Delete events older than a timestamp:
cutoffTime := time.Now().Add(-90 * 24 * time.Hour) // 90 days ago
deleted, err := eventStore.Delete(ctx, cutoffTime)
Warning: Deletion is permanent and affects event replay capabilities.
Event Replay Service
The replay service reconstructs state by replaying historical events to handlers.
Use Cases
- Read Model Reconstruction: Rebuild projection databases
- Cache Warming: Populate caches from historical data
- Data Migration: Transform old events to new format
- Analytics: Process historical events for insights
- Testing: Replay production events in development
Configuration
config := eventstore.ReplayConfig{
BatchSize: 100, // Events per batch (default: 100)
ContinueOnError: false, // Stop on first error (default: false)
}
replayService := eventstore.NewReplayService(eventStore, logger)
Replaying Events
// Define event handler
handler := func(ctx context.Context, event *domain.Event) error {
switch event.Type {
case "app.registered":
return rebuildAppCache(event)
case "app.installed":
return updateInstallationMetrics(event)
default:
return nil
}
}
// Replay events
filter := &event.EventFilter{
EventTypes: []string{"app.registered", "app.installed"},
From: time.Time{}, // All time
}
err := replayService.Replay(ctx, filter, handler)
Replay Metrics
The replay service exposes Prometheus metrics:
eventstore_replay_runs_total- Total replay operationseventstore_events_replayed_total- Total events processedeventstore_replay_failed_events_total- Failed event counteventstore_replay_last_run_timestamp- Unix timestamp of last runeventstore_replay_last_duration_seconds- Duration of last replayeventstore_replay_last_count- Events processed in last run
Retention Manager
The retention manager automatically deletes old events to prevent unbounded storage growth.
Configuration
config := eventstore.RetentionConfig{
RetentionPeriod: 90 * 24 * time.Hour, // Keep 90 days (default)
CleanupInterval: 24 * time.Hour, // Run daily (default)
}
retentionMgr := eventstore.NewRetentionManager(eventStore, config, logger)
Lifecycle
// Start background cleanup loop
retentionMgr.Start(ctx)
// Manually trigger cleanup
deleted, err := retentionMgr.Cleanup(ctx)
// Stop background loop
retentionMgr.Stop()
Retention Metrics
The retention manager exposes Prometheus metrics:
eventstore_retention_cleanup_runs_total- Total cleanup operationseventstore_retention_total_events_deleted- Total events deletedeventstore_retention_last_cleanup_timestamp- Unix timestamp of last cleanupeventstore_retention_last_cleanup_duration_seconds- Duration of last cleanupeventstore_retention_last_cleanup_deleted- Events deleted in last cleanup
Retention Strategy
Default Policy: 90 days retention
- Events older than 90 days are automatically deleted
- Cleanup runs every 24 hours
- Adjust based on compliance requirements and storage capacity
Compliance Considerations:
- Financial data: May require 7+ years retention
- PII data: May require shorter retention (GDPR "right to be forgotten")
- Audit logs: Often require 1-3 years retention
Topic Wildcards
The event store supports RabbitMQ-style topic patterns for flexible subscriptions:
Wildcard Syntax:
*- Matches exactly one word#- Matches zero or more words- Words are separated by
.(dot)
Examples:
// Match any lifecycle event
Topics: []string{"app.lifecycle.#"}
// Matches: app.lifecycle.registered, app.lifecycle.installed, etc.
// Match specific hook events
Topics: []string{"hook.triggered.*"}
// Matches: hook.triggered.order_created, hook.triggered.user_updated
// Match all app events
Topics: []string{"app.#"}
// Matches: app.registered, app.lifecycle.installed, app.settings.updated
// Multiple patterns
Topics: []string{"app.lifecycle.#", "hook.#", "activity.#"}
// Matches events from multiple categories
Metadata Queries
Query events by metadata using PostgreSQL JSONB operators:
// Find events with specific user_id
filter := &event.EventFilter{
Metadata: map[string]interface{}{
"user_id": "123e4567-e89b-12d3-a456-426614174000",
},
}
// Find events with correlation_id
filter := &event.EventFilter{
Metadata: map[string]interface{}{
"correlation_id": "trace-abc-123",
},
}
The metadata is stored as JSONB and indexed with a GIN index for fast lookups.
Performance Considerations
Query Performance
- Default Limit: 100 events per query (prevents large result sets)
- Pagination: Use
LimitandOffsetfor large result sets - Indexes: Automatic indexes on
event_type,topic,aggregate_id,occurred_at - JSONB Index: GIN index on metadata for fast JSON queries
- Stream API: Use for large result sets (100-event buffer)
Write Performance
- Append-Only: No updates or deletes (except retention cleanup)
- Batch Writes: Consider batching for high-throughput scenarios
- Deduplication: Unique constraint on event ID (slight write overhead)
Storage Considerations
- Event Size: Payload stored as BYTEA (can be large)
- Retention: Configure based on storage capacity
- Partitioning: Consider PostgreSQL partitioning for large tables (by date)
- Monitoring: Track table size and growth rate
Integration with Event Bus
The event store works alongside the RabbitMQ event bus:
// 1. Append to event store (persistence)
err := eventStore.Append(ctx, event)
if err != nil {
return err
}
// 2. Publish to event bus (real-time)
err = eventBus.Publish(ctx, event.Topic, event)
if err != nil {
logger.Error("Failed to publish event", "error", err)
// Event is still persisted even if publish fails
}
Benefits of Dual Approach:
- Durability: Events persisted even if bus subscribers fail
- Replay: Can replay events even if original subscribers missed them
- Audit: Complete history for compliance and debugging
- Flexibility: Can add new projections by replaying old events
Best Practices
Event Design
- Use Descriptive Types:
app.registerednotreg - Include Metadata: Add
correlation_id,user_id,trace_idfor tracing - Version Events: Include schema version for evolution
- Immutable Payloads: Never modify events after creation
- Idempotent Handlers: Handle duplicate events gracefully
Topic Naming
Follow hierarchical naming convention:
<domain>.<entity>.<action>
Examples:
app.lifecycle.registeredhook.trigger.completedactivity.request.receivedsettings.value.updated
Error Handling
// Graceful degradation
err := eventStore.Append(ctx, event)
if err != nil {
logger.Error("Failed to store event", "error", err)
// Event still published to bus for real-time subscribers
eventBus.Publish(ctx, event.Topic, event)
}
Monitoring
Monitor these key metrics:
- Event append rate (events/sec)
- Query latency (p50, p95, p99)
- Event store size (GB)
- Replay duration
- Retention cleanup rate
Code References
- pkg/v2/infrastructure/eventstore/postgres_store.go:40 - PostgresStore implementation
- pkg/v2/infrastructure/eventstore/replay_service.go - Replay service
- pkg/v2/infrastructure/eventstore/retention_manager.go - Retention manager
- pkg/v2/domain/event/event.go - Event interface and types
Related Topics
- Event Bus - RabbitMQ pub/sub for real-time events
- Platform Architecture - Event-driven architecture
- Hooks and Activity - Event-driven communication