Skip to main content

Event Store

PostgreSQL-based event store for event sourcing, audit trails, and event replay capabilities.

Overview

The event store provides persistent, immutable storage for all domain events in the system. It enables:

  • Event Sourcing: Rebuild application state from historical events
  • Audit Trails: Complete history of all changes with metadata
  • Event Replay: Reconstruct read models or migrate data
  • Temporal Queries: Query events by time ranges, types, or aggregates
  • Integration: Works with RabbitMQ event bus for real-time pub/sub

Architecture

┌─────────────────────────────────────────────────────────┐
│ Applications │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Marketplace │ │ Hooks │ │ Settings │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
└──────────┼──────────────────┼──────────────────┼─────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────┐
│ PostgreSQL Event Store │
│ ┌────────────────────────────────────────┐ │
│ │ Table: appserver.events │ │
│ │ - Append-only (immutable) │ │
│ │ - Indexed by type, topic, aggregate │ │
│ │ - JSONB metadata with GIN index │ │
│ └────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
│ │
│ Query/Replay │ Retention
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Replay │ │ Retention │
│ Service │ │ Manager │
└─────────────┘ └─────────────┘

The event store is separate from but complementary to the RabbitMQ event bus:

  • Event Bus: Real-time event distribution to subscribers
  • Event Store: Persistent event history for replay and auditing

Database Schema

CREATE TABLE appserver.events (
id UUID PRIMARY KEY,
event_type TEXT NOT NULL,
topic TEXT NOT NULL,
source_app_id UUID,
source_app_name TEXT,
aggregate_id UUID NOT NULL,
aggregate_type TEXT NOT NULL,
metadata JSONB,
payload_bytes BYTEA,
occurred_at TIMESTAMP NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);

-- Indexes for common queries
CREATE INDEX idx_events_type ON appserver.events(event_type);
CREATE INDEX idx_events_topic ON appserver.events(topic);
CREATE INDEX idx_events_aggregate ON appserver.events(aggregate_id);
CREATE INDEX idx_events_occurred ON appserver.events(occurred_at);
CREATE INDEX idx_events_metadata ON appserver.events USING GIN(metadata);

-- Unique constraint for deduplication
ALTER TABLE appserver.events ADD CONSTRAINT events_id_unique UNIQUE (id);

Core Operations

Append Events

Store new events in the event store (append-only, immutable):

event := &domain.Event{
ID: uuid.New(),
Type: "app.registered",
Topic: "app.lifecycle.registered",
AggregateID: appID,
AggregateType: "app",
Metadata: map[string]interface{}{
"user_id": userID,
"correlation_id": correlationID,
},
Payload: payloadBytes,
OccurredAt: time.Now(),
Version: 1,
}

err := eventStore.Append(ctx, event)

Deduplication: Events with duplicate IDs are rejected (constraint violation).

Query Events

Query events with flexible filtering:

filter := &event.EventFilter{
EventTypes: []string{"app.registered", "app.installed"},
Topics: []string{"app.lifecycle.#"}, // RabbitMQ wildcard
AggregateID: &appID,
From: startTime,
To: endTime,
Limit: 100,
Offset: 0,
OrderBy: "occurred_at",
Ascending: false,
}

events, err := eventStore.Query(ctx, filter)

Supported Filters:

  • EventTypes - Array of event type strings
  • Topics - Array of topic patterns (supports RabbitMQ wildcards: * for one word, # for zero or more)
  • AggregateID - Filter by specific aggregate UUID
  • AggregateType - Filter by aggregate type string
  • From / To - Time range filtering
  • Metadata - JSONB contains queries
  • Limit / Offset - Pagination (default limit: 100)
  • OrderBy - Sort field (default: occurred_at)
  • Ascending - Sort direction (default: false)

Stream Events

Stream events via channels for large result sets:

filter := &event.EventFilter{
Topics: []string{"app.#"},
From: time.Now().Add(-24 * time.Hour),
}

eventChan, errChan := eventStore.Stream(ctx, filter)

for {
select {
case evt, ok := <-eventChan:
if !ok {
return // Stream closed
}
processEvent(evt)
case err := <-errChan:
handleError(err)
return
case <-ctx.Done():
return
}
}

Buffer Size: 100 events (configured in implementation)

Count Events

Count matching events without fetching:

filter := &event.EventFilter{
EventTypes: []string{"app.installed"},
From: startOfMonth,
}

count, err := eventStore.Count(ctx, filter)

Delete Old Events

Delete events older than a timestamp:

cutoffTime := time.Now().Add(-90 * 24 * time.Hour) // 90 days ago
deleted, err := eventStore.Delete(ctx, cutoffTime)

Warning: Deletion is permanent and affects event replay capabilities.

Event Replay Service

The replay service reconstructs state by replaying historical events to handlers.

Use Cases

  • Read Model Reconstruction: Rebuild projection databases
  • Cache Warming: Populate caches from historical data
  • Data Migration: Transform old events to new format
  • Analytics: Process historical events for insights
  • Testing: Replay production events in development

Configuration

config := eventstore.ReplayConfig{
BatchSize: 100, // Events per batch (default: 100)
ContinueOnError: false, // Stop on first error (default: false)
}

replayService := eventstore.NewReplayService(eventStore, logger)

Replaying Events

// Define event handler
handler := func(ctx context.Context, event *domain.Event) error {
switch event.Type {
case "app.registered":
return rebuildAppCache(event)
case "app.installed":
return updateInstallationMetrics(event)
default:
return nil
}
}

// Replay events
filter := &event.EventFilter{
EventTypes: []string{"app.registered", "app.installed"},
From: time.Time{}, // All time
}

err := replayService.Replay(ctx, filter, handler)

Replay Metrics

The replay service exposes Prometheus metrics:

  • eventstore_replay_runs_total - Total replay operations
  • eventstore_events_replayed_total - Total events processed
  • eventstore_replay_failed_events_total - Failed event count
  • eventstore_replay_last_run_timestamp - Unix timestamp of last run
  • eventstore_replay_last_duration_seconds - Duration of last replay
  • eventstore_replay_last_count - Events processed in last run

Retention Manager

The retention manager automatically deletes old events to prevent unbounded storage growth.

Configuration

config := eventstore.RetentionConfig{
RetentionPeriod: 90 * 24 * time.Hour, // Keep 90 days (default)
CleanupInterval: 24 * time.Hour, // Run daily (default)
}

retentionMgr := eventstore.NewRetentionManager(eventStore, config, logger)

Lifecycle

// Start background cleanup loop
retentionMgr.Start(ctx)

// Manually trigger cleanup
deleted, err := retentionMgr.Cleanup(ctx)

// Stop background loop
retentionMgr.Stop()

Retention Metrics

The retention manager exposes Prometheus metrics:

  • eventstore_retention_cleanup_runs_total - Total cleanup operations
  • eventstore_retention_total_events_deleted - Total events deleted
  • eventstore_retention_last_cleanup_timestamp - Unix timestamp of last cleanup
  • eventstore_retention_last_cleanup_duration_seconds - Duration of last cleanup
  • eventstore_retention_last_cleanup_deleted - Events deleted in last cleanup

Retention Strategy

Default Policy: 90 days retention

  • Events older than 90 days are automatically deleted
  • Cleanup runs every 24 hours
  • Adjust based on compliance requirements and storage capacity

Compliance Considerations:

  • Financial data: May require 7+ years retention
  • PII data: May require shorter retention (GDPR "right to be forgotten")
  • Audit logs: Often require 1-3 years retention

Topic Wildcards

The event store supports RabbitMQ-style topic patterns for flexible subscriptions:

Wildcard Syntax:

  • * - Matches exactly one word
  • # - Matches zero or more words
  • Words are separated by . (dot)

Examples:

// Match any lifecycle event
Topics: []string{"app.lifecycle.#"}
// Matches: app.lifecycle.registered, app.lifecycle.installed, etc.

// Match specific hook events
Topics: []string{"hook.triggered.*"}
// Matches: hook.triggered.order_created, hook.triggered.user_updated

// Match all app events
Topics: []string{"app.#"}
// Matches: app.registered, app.lifecycle.installed, app.settings.updated

// Multiple patterns
Topics: []string{"app.lifecycle.#", "hook.#", "activity.#"}
// Matches events from multiple categories

Metadata Queries

Query events by metadata using PostgreSQL JSONB operators:

// Find events with specific user_id
filter := &event.EventFilter{
Metadata: map[string]interface{}{
"user_id": "123e4567-e89b-12d3-a456-426614174000",
},
}

// Find events with correlation_id
filter := &event.EventFilter{
Metadata: map[string]interface{}{
"correlation_id": "trace-abc-123",
},
}

The metadata is stored as JSONB and indexed with a GIN index for fast lookups.

Performance Considerations

Query Performance

  • Default Limit: 100 events per query (prevents large result sets)
  • Pagination: Use Limit and Offset for large result sets
  • Indexes: Automatic indexes on event_type, topic, aggregate_id, occurred_at
  • JSONB Index: GIN index on metadata for fast JSON queries
  • Stream API: Use for large result sets (100-event buffer)

Write Performance

  • Append-Only: No updates or deletes (except retention cleanup)
  • Batch Writes: Consider batching for high-throughput scenarios
  • Deduplication: Unique constraint on event ID (slight write overhead)

Storage Considerations

  • Event Size: Payload stored as BYTEA (can be large)
  • Retention: Configure based on storage capacity
  • Partitioning: Consider PostgreSQL partitioning for large tables (by date)
  • Monitoring: Track table size and growth rate

Integration with Event Bus

The event store works alongside the RabbitMQ event bus:

// 1. Append to event store (persistence)
err := eventStore.Append(ctx, event)
if err != nil {
return err
}

// 2. Publish to event bus (real-time)
err = eventBus.Publish(ctx, event.Topic, event)
if err != nil {
logger.Error("Failed to publish event", "error", err)
// Event is still persisted even if publish fails
}

Benefits of Dual Approach:

  • Durability: Events persisted even if bus subscribers fail
  • Replay: Can replay events even if original subscribers missed them
  • Audit: Complete history for compliance and debugging
  • Flexibility: Can add new projections by replaying old events

Best Practices

Event Design

  1. Use Descriptive Types: app.registered not reg
  2. Include Metadata: Add correlation_id, user_id, trace_id for tracing
  3. Version Events: Include schema version for evolution
  4. Immutable Payloads: Never modify events after creation
  5. Idempotent Handlers: Handle duplicate events gracefully

Topic Naming

Follow hierarchical naming convention:

<domain>.<entity>.<action>

Examples:

  • app.lifecycle.registered
  • hook.trigger.completed
  • activity.request.received
  • settings.value.updated

Error Handling

// Graceful degradation
err := eventStore.Append(ctx, event)
if err != nil {
logger.Error("Failed to store event", "error", err)
// Event still published to bus for real-time subscribers
eventBus.Publish(ctx, event.Topic, event)
}

Monitoring

Monitor these key metrics:

  • Event append rate (events/sec)
  • Query latency (p50, p95, p99)
  • Event store size (GB)
  • Replay duration
  • Retention cleanup rate

Code References

  • pkg/v2/infrastructure/eventstore/postgres_store.go:40 - PostgresStore implementation
  • pkg/v2/infrastructure/eventstore/replay_service.go - Replay service
  • pkg/v2/infrastructure/eventstore/retention_manager.go - Retention manager
  • pkg/v2/domain/event/event.go - Event interface and types