Skip to main content

Event-Driven Architecture

Easy AppServer uses event-driven patterns to enable loose coupling between applications, ensure eventual consistency, and coordinate distributed operations across the platform.

Overview

The platform implements a publish-subscribe (pub/sub) architecture using RabbitMQ as the event bus. Applications can:

  • Publish domain events when their state changes
  • Subscribe to events from other applications and platform services
  • Implement eventual consistency patterns across distributed components
  • Trigger cache invalidation across all platform instances
  • React to system-wide state changes asynchronously

Code Reference: pkg/v2/domain/event/bus.go:10

Event Bus Architecture

Bus Interface

The platform defines a vendor-neutral event bus interface:

Code Reference: pkg/v2/domain/event/bus.go:10

// Bus provides centralized event distribution for domain events
type Bus interface {
// Publish publishes a domain event to all subscribers
Publish(ctx context.Context, event Event) error

// Subscribe subscribes to a specific event type
Subscribe(eventType string, handler Handler) (<-chan struct{}, error)

// Unsubscribe removes a subscription
Unsubscribe(eventType string, handler Handler) error

// Close closes the event bus and cleans up resources
Close() error
}

This interface can be implemented by different backends:

  • RabbitMQ - Production event bus with persistence and durability
  • In-Memory - Development/testing bus without external dependencies
  • Kafka - Future high-throughput implementation

event.Bus.Subscribe matches exact event types. Wildcard topic bindings only exist within the RabbitMQ subscriber (for example when binding app.# directly through AMQP), but the domain bus requires the concrete event type.

Event Interface

All events implement a common interface:

type Event interface {
EventID() uuid.UUID // Unique event identifier
EventType() string // Event type (e.g., "app.installed")
OccurredAt() time.Time // Event timestamp
AggregateID() uuid.UUID // ID of the aggregate that generated the event
AggregateType() string // Type of aggregate (e.g., "app", "user")
Topic() string // RabbitMQ routing topic
SourceAppID() *uuid.UUID // Optional app that triggered event
SourceAppName() *string // Optional app name
Metadata() map[string]interface{} // Optional metadata
Payload() []byte // Serialized event payload
Version() int // Event schema version
}

Handler Function

Event handlers process events asynchronously:

type Handler func(ctx context.Context, event Event) error

Handlers should be:

  • Idempotent - Safe to run multiple times
  • Fast - Process quickly or delegate to background workers
  • Error-Resilient - Handle failures gracefully

RabbitMQ Integration

Production Event Bus

The AppServer integrates with RabbitMQ for production deployments.

Code Reference: pkg/v2/server/eventbus.go:12

func createEventBus(cfg *config.Config, logger telemetry.Logger) (event.Bus, error) {
if !cfg.EventBus.Enabled {
logger.Info("using in-memory event bus (RabbitMQ disabled)")
return event.NewInMemoryBus(), nil
}

var rabbitmqConfig *eventbus.RabbitMQConfig
if cfg.EventBus.TestMode {
// Test-optimized config (exclusive queues, auto-delete)
rabbitmqConfig = eventbus.TestModeRabbitMQConfig(cfg.EventBus.URL)
} else {
// Production config (durable queues, persistent)
rabbitmqConfig = eventbus.DefaultRabbitMQConfig(cfg.EventBus.URL)
}

// Create RabbitMQ event bus
bus, err := eventbus.NewRabbitMQBus(rabbitmqConfig, logger)
if err != nil {
return nil, fmt.Errorf("failed to create RabbitMQ event bus: %w", err)
}

return bus, nil
}

RabbitMQ Features

Durable Message Queues:

  • Messages persisted to disk
  • Survive RabbitMQ restarts
  • Guaranteed delivery (at-least-once)

Topic-Based Routing:

  • Events routed by topic patterns
  • Subscribers use wildcard patterns
  • Flexible subscription filtering

Dead Letters (planned):

  • Queues are declared with DLQ bindings, but the current subscriber requeues messages on error instead of routing them to the DLQ
  • Manual inspection/replay tooling has not been implemented yet

Message Prefetch:

  • Control consumer throughput
  • Configurable prefetch count
  • Balance latency vs throughput

In-Memory Event Bus

For development and testing, an in-memory event bus is available:

Features:

  • Enabled when APPSERVER_EVENTBUS_ENABLED=false
  • No external dependencies (no RabbitMQ required)
  • Events not persisted (lost on restart)
  • No cross-instance communication (single instance only)
  • Synchronous delivery (no message broker delay)

Use Cases:

  • Local development
  • Unit testing
  • Integration testing
  • CI/CD pipelines

Domain Events

Platform Events

Events published by the AppServer itself for system-wide coordination:

Application Lifecycle Events

Code Reference: pkg/v2/domain/event/app_events.go

EventTypeAppRegistered      - Application registered with platform
EventTypeAppInstalled - Application successfully installed
EventTypeAppUninstalled - Application removed
EventTypeAppDeregistered - Application deregistered from platform

Event Payload Example:

{
"eventID": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "app.installed",
"occurredAt": "2025-01-24T15:30:00Z",
"aggregateID": "app-uuid",
"aggregateType": "app",
"payload": {
"appID": "app-uuid",
"appName": "de.easy-m.todos",
"version": "1.0.0",
"installedAt": "2025-01-24T15:30:00Z"
}
}

Settings & Permission Events

Code Reference:

  • Settings: pkg/v2/domain/event/settings_events.go
  • Permission cache: pkg/v2/domain/event/permission_events.go
EventTypeSettingsRegistered       - Settings schema registered
EventTypeSettingsUpdated - Application settings changed
EventTypeSettingsValidationFailed - Settings validation failed
EventTypeSettingsDeleted - Settings schema deleted
permission.invalidate - Invalidate permission caches for an app/pattern

Settings Updated Event:

func NewSettingsUpdated(appID uuid.UUID, appName string, changedKeys []string, updatedBy *uuid.UUID) Event {
payload := SettingsUpdatedPayload{
AppID: appID,
AppName: appName,
ChangedKeys: changedKeys,
UpdatedBy: updatedBy,
}
// ... create and return event
}

This event triggers cache invalidation across all platform instances.

Permission Events

Code Reference: pkg/v2/domain/event/permission_events.go:20

PermissionInvalidateEventType - Permission cache invalidation

Permission Invalidation Event:

func NewPermissionInvalidateEvent(appName, pattern string) *PermissionInvalidateEvent {
baseEvent := NewBaseEvent(
PermissionInvalidateEventType,
uuid.Nil,
"permission",
)
// Set metadata with app name and pattern
return &PermissionInvalidateEvent{BaseEvent: baseEvent}
}

User Events

EventTypeUserCreated  - New user account created
EventTypeUserUpdated - User profile updated
EventTypeUserDeleted - User account deleted

Session Events

EventTypeSessionCreated  - User session created
EventTypeSessionRevoked - User session revoked
EventTypeSessionExpired - User session expired

Application-Specific Events

Implementation Status

Current: Event bus integration is server-side only. The Go services (pkg/v2/) have full access to the event bus for publishing and subscribing to events.

Planned: Node.js SDK event bus helpers (publishEvent, eventBus.subscribe) are not yet implemented. Applications currently communicate via Hooks (for event-driven patterns with responses) and Activities (for request/response workflows).

See Hooks Architecture for inter-app event communication today.

Applications running as Go services can publish custom domain events directly via the event bus:

Example: Publishing Events from Go:

// From within a Go service
event := event.NewBaseEvent(
"invoice.created",
invoiceID,
"invoice",
)
event.SetPayload(map[string]interface{}{
"invoiceID": "inv-123",
"customerID": "cust-456",
"amount": 1000.00,
"currency": "EUR",
})

if err := eventBus.Publish(ctx, event); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}

Event Subscriptions

Applications subscribe to events they care about. Currently, event subscriptions are available for server-side Go services that integrate directly with the platform's event bus.

Subscription Patterns (Go Services)

Exact Match

Subscribe to a specific event type:

// Subscribe to exact event type
_, err := eventBus.Subscribe(event.EventTypeAppInstalled, func(ctx context.Context, evt event.Event) error {
appEvt := evt.(*event.AppInstalled)
log.Info("App installed", "appName", appEvt.AppName)
return sendNotification(appEvt)
})

Wildcard Patterns (RabbitMQ Only)

RabbitMQ supports topic-based wildcards when binding queues directly:

Patterns:

  • invoice.* - All invoice events (created, updated, paid, etc.)
  • *.created - All creation events across all entities
  • app.# - All application events (multi-level wildcard)
  • # - All events (use with caution, high volume)

Note: The domain event.Bus interface requires exact event types. Wildcard subscriptions work at the RabbitMQ infrastructure level but are not exposed through the standard Bus interface.

Server-Side Subscription Example

Platform services subscribe during initialization:

// Example from settings service
func (s *SettingsService) Start(ctx context.Context) error {
// Subscribe to app lifecycle events
_, err := s.eventBus.Subscribe(event.EventTypeAppUninstalled, func(ctx context.Context, evt event.Event) error {
appEvt := evt.(*event.AppUninstalled)
return s.handleAppUninstalled(ctx, appEvt.AppID)
})
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
return nil
}
Future Work

Planned: Node.js SDK event subscription APIs (eventBus.subscribe) for application-level event handling. For now, applications should use:

  • Hooks - For subscribing to events from other apps with synchronous responses
  • Activities - For request/response patterns between apps

Event Handling

Handler Implementation (Go Services)

Event handlers process events asynchronously:

func handleUserCreated(ctx context.Context, evt event.Event) error {
userEvt := evt.(*event.UserCreated)

// Extract payload
logger.Info("New user created",
"userID", userEvt.UserID,
"email", userEvt.Email)

// Create user profile in local database
if err := db.UserProfiles.Create(ctx, &UserProfile{
UserID: userEvt.UserID,
Email: userEvt.Email,
CreatedAt: userEvt.OccurredAt(),
}); err != nil {
return fmt.Errorf("failed to create profile: %w", err)
}

// Send welcome email
if err := emailService.Send(ctx, WelcomeEmail{
To: userEvt.Email,
Template: "welcome",
}); err != nil {
logger.Warn("Failed to send welcome email", telemetry.Error(err))
// Don't fail the handler for non-critical operations
}

// Publish follow-up event
profileEvent := event.NewBaseEvent("profile.initialized", userEvt.UserID, "user")
if err := eventBus.Publish(ctx, profileEvent); err != nil {
logger.Warn("Failed to publish follow-up event", telemetry.Error(err))
}

return nil
}

Error Handling

Handlers should handle errors gracefully:

func handleInvoiceCreated(ctx context.Context, evt event.Event) error {
invoiceEvt := evt.(*event.InvoiceCreated)

// Process event
if err := processInvoice(ctx, invoiceEvt.InvoiceID); err != nil {
// Log error
logger.Error("Failed to process invoice event",
"eventID", evt.EventID(),
"invoiceID", invoiceEvt.InvoiceID,
telemetry.Error(err))

// Publish error event for monitoring
errorEvent := event.NewBaseEvent("invoice.processing.failed", invoiceEvt.InvoiceID, "invoice")
errorEvent.SetPayload(map[string]interface{}{
"originalEventID": evt.EventID(),
"error": err.Error(),
})
if pubErr := eventBus.Publish(ctx, errorEvent); pubErr != nil {
logger.Warn("Failed to publish error event", telemetry.Error(pubErr))
}

// Return error to trigger retry (RabbitMQ will requeue)
return err
}

return nil
}

Retry Strategies

Automatic Retry (RabbitMQ):

  • Failed messages are requeued
  • Configurable retry count
  • Exponential backoff (via TTL + DLQ)

Manual Retry (Go):

func handleEventWithRetry(ctx context.Context, evt event.Event) error {
maxRetries := 3
attempt := 0

for attempt < maxRetries {
if err := processEvent(ctx, evt); err != nil {
attempt++
if attempt >= maxRetries {
// Max retries exceeded - will move to DLQ
logger.Error("Max retries exceeded",
"eventID", evt.EventID(),
"attempts", attempt)
return err
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
time.Sleep(backoff)
continue
}
return nil // Success
}
return fmt.Errorf("unexpected retry loop exit")
}

Dead Letter Queues

Failed messages are routed to dead letter queues for manual intervention:

Message Processing Flow:
├─ Attempt 1: Process message
│ └─ Failure: Requeue
├─ Attempt 2: Process message
│ └─ Failure: Requeue
├─ Attempt 3: Process message
│ └─ Failure: Move to DLQ
└─ Dead Letter Queue:
├─ Manual inspection
├─ Fix root cause
└─ Replay or discard

Idempotency

Event handlers should be idempotent (safe to run multiple times):

func handleUserCreatedIdempotent(ctx context.Context, evt event.Event) error {
userEvt := evt.(*event.UserCreated)

// Check if already processed (using event ID)
processed, err := db.ProcessedEvents.Exists(ctx, evt.EventID())
if err != nil {
return fmt.Errorf("failed to check processed events: %w", err)
}
if processed {
logger.Info("Event already processed, skipping", "eventID", evt.EventID())
return nil
}

// Check if user profile already exists
existingProfile, err := db.UserProfiles.FindByUserID(ctx, userEvt.UserID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("failed to check existing profile: %w", err)
}
if existingProfile != nil {
logger.Info("User profile already exists, skipping", "userID", userEvt.UserID)
// Mark as processed
if err := db.ProcessedEvents.Create(ctx, evt.EventID(), time.Now()); err != nil {
return fmt.Errorf("failed to mark as processed: %w", err)
}
return nil
}

// Create user profile
if err := db.UserProfiles.Create(ctx, &UserProfile{
UserID: userEvt.UserID,
Email: userEvt.Email,
}); err != nil {
return fmt.Errorf("failed to create profile: %w", err)
}

// Mark as processed
if err := db.ProcessedEvents.Create(ctx, evt.EventID(), time.Now()); err != nil {
logger.Warn("Failed to mark event as processed", telemetry.Error(err))
// Don't fail - profile was created successfully
}

return nil
}

Cache Invalidation

The event bus is the primary mechanism for distributed cache invalidation.

Invalidation Flow

Cache Invalidation Pattern:
├─ 1. Service updates data in database
│ (e.g., marketplace service updates app state)
├─ 2. Service publishes event
│ (e.g., app.installed event)
├─ 3. Event bus distributes to all instances
│ (RabbitMQ routes to all subscribers)
├─ 4. Each instance receives event
│ (Including the publishing instance)
├─ 5. Subscribers invalidate cache entries
│ (cache.Invalidate(appID))
└─ 6. Next read fetches fresh data from database

Settings Cache Invalidation

Code Reference: pkg/v2/application/settings/settings_service.go:47

// After updating settings, publish event
if err := s.eventBus.Publish(ctx, event.NewSettingsUpdated(appID, appName, changedKeys, updatedBy)); err != nil {
s.logger.Warn("Failed to publish settings.updated event", telemetry.Error(err))
}

// Subscribers invalidate cache
s.eventBus.Subscribe(event.EventTypeSettingsUpdated, func(ctx context.Context, evt event.Event) error {
settingsEvt := evt.(*event.SettingsUpdated)
s.cache.Invalidate(settingsEvt.AppID)
return nil
})

Permission Cache Invalidation

Code Reference: pkg/v2/infrastructure/permission/cache/cache.go:12

// Subscribe to permission invalidation events
s.eventBus.Subscribe(event.PermissionInvalidateEventType, func(ctx context.Context, evt event.Event) error {
permEvt := evt.(*event.PermissionInvalidateEvent)

// Invalidate L1 cache (in-memory)
s.l1Cache.InvalidatePattern(permEvt.Pattern)

// Invalidate L2 cache (Redis)
s.l2Cache.DeleteByPattern(permEvt.Pattern)

return nil
})

The permission.invalidate event type is defined in pkg/v2/domain/event/permission_events.go. Some consumers (like the UI service) still subscribe to the legacy string permission.invalidated, so those handlers will not fire until they are updated.

Asset Cache Invalidation

When lifecycle events occur, the UI service clears asset caches. It subscribes to the wildcard topic app.* (works with RabbitMQ bindings; the in-memory bus does not support wildcards) and drops cache entries for the affected app.

_, err := s.eventBus.Subscribe("app.*", s.handleAppLifecycleEvent)
if err != nil {
return err
}

GraphQL Subscriptions

GraphQL subscriptions bridge the event bus to real-time UI updates.

Code Reference: pkg/v2/presentation/graphql/resolvers/subscriptions.resolvers.go:20

AppStateChanged Subscription

GraphQL Subscription Flow:
├─ 1. Client subscribes via WebSocket
│ subscription { appStateChanged { name state } }
├─ 2. Resolver subscribes to event bus
│ Subscribe to: app.registered, app.installed, app.uninstalled
├─ 3. Event published to event bus
│ Event: app.installed (appName: "de.easy-m.todos")
├─ 4. Event handler receives event
│ Extract appName, check filters
├─ 5. Fetch full app data
│ Query marketplace service for complete app object
├─ 6. Convert to GraphQL type
│ Transform domain.App → generated.App
├─ 7. Send to GraphQL channel
│ Channel pushes to WebSocket
└─ 8. Client receives real-time update
UI updates automatically

Implementation:

func (r *subscriptionResolver) AppStateChanged(ctx context.Context, filter *generated.AppStateFilter) (<-chan *generated.App, error) {
// Create channel for sending app updates
appChan := make(chan *generated.App, 10)

// Determine which event types to subscribe to
eventTypes := []string{
event.EventTypeAppRegistered,
event.EventTypeAppInstalled,
event.EventTypeAppUninstalled,
}

// Create event handler
handler := func(eventCtx context.Context, evt event.Event) error {
// Extract app name from event
var appName string
switch e := evt.(type) {
case *event.AppInstalled:
appName = e.AppName
}

// Apply filters
if filter != nil && filter.AppNames != nil {
if !contains(filter.AppNames, appName) {
return nil // Skip filtered app
}
}

// Fetch full app data
domainApp, err := r.marketplaceService.GetApp(eventCtx, marketplace.GetAppQuery{AppName: &appName})

// Convert and send to channel
gqlApp := convertAppToGraphQL(domainApp)
select {
case appChan <- gqlApp:
case <-ctx.Done():
return ctx.Err()
}

return nil
}

// Subscribe to all relevant event types
for _, eventType := range eventTypes {
r.eventBus.Subscribe(eventType, handler)
}

// Cleanup on context cancellation
go func() {
<-ctx.Done()
close(appChan)
}()

return appChan, nil
}

Client Usage

Frontend Subscription:

import { useSubscription, gql } from '@apollo/client';

const APP_STATE_SUBSCRIPTION = gql`
subscription OnAppStateChanged {
appStateChanged {
name
state
version
installedAt
}
}
`;

function AppList() {
const { data, loading } = useSubscription(APP_STATE_SUBSCRIPTION, {
onData: ({ data }) => {
console.log('App state changed:', data.appStateChanged);
// UI automatically re-renders
}
});

// ...
}

Event Ordering

Per-Partition Ordering

RabbitMQ guarantees message order within a single queue:

Queue: app.lifecycle
├─ Event 1: app.registered (app-a) - Delivered first
├─ Event 2: app.installed (app-a) - Delivered second
└─ Event 3: app.uninstalled (app-a) - Delivered third

Order preserved within same queue/topic

Global Ordering

For strict global ordering across all event types:

Use Case: Audit log reconstruction

Solution: Single queue with all events (sacrifices parallelism)

Global Event Queue:
├─ Event 1: app.registered (app-a)
├─ Event 2: user.created (user-1)
├─ Event 3: app.installed (app-a)
├─ Event 4: settings.updated (app-a)
└─ Event 5: user.updated (user-1)

All events ordered by occurrence time

Idempotency Requirements

Due to potential redelivery, all handlers must be idempotent:

// Use event IDs to detect duplicates (in-memory example)
type EventTracker struct {
processed sync.Map
}

func (t *EventTracker) handleEvent(ctx context.Context, evt event.Event) error {
eventID := evt.EventID().String()

// Check if already processed
if _, exists := t.processed.LoadOrStore(eventID, true); exists {
logger.Info("Event already processed, skipping", "eventID", eventID)
return nil // Already processed
}

// Do work
if err := doWork(ctx, evt); err != nil {
t.processed.Delete(eventID) // Remove on failure to allow retry
return err
}

return nil
}

Eventual Consistency

The platform embraces eventual consistency for distributed operations.

Saga Pattern

Multi-step distributed transactions coordinated via events:

Order Fulfillment Saga:
├─ 1. Order Service: Publish order.placed
├─ 2. Inventory Service: Reserve stock → Publish inventory.reserved
├─ 3. Payment Service: Charge customer → Publish payment.captured
├─ 4. Shipping Service: Create shipment → Publish shipment.created
└─ 5. Order Service: Mark order complete → Publish order.fulfilled

If any step fails:
├─ Publish compensation events
├─ inventory.reservation.cancelled
├─ payment.refunded
└─ order.cancelled

Implementation (Go):

type OrderSaga struct {
eventBus event.Bus
logger telemetry.Logger
}

func (s *OrderSaga) Execute(ctx context.Context, orderID uuid.UUID) error {
// Step 1: Reserve inventory
if err := s.publishCommand(ctx, "inventory.reserve", orderID); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("reserve failed: %w", err))
}
if err := s.waitForEvent(ctx, "inventory.reserved", orderID, 30*time.Second); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("inventory timeout: %w", err))
}

// Step 2: Capture payment
if err := s.publishCommand(ctx, "payment.capture", orderID); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("payment failed: %w", err))
}
if err := s.waitForEvent(ctx, "payment.captured", orderID, 30*time.Second); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("payment timeout: %w", err))
}

// Step 3: Create shipment
if err := s.publishCommand(ctx, "shipment.create", orderID); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("shipment failed: %w", err))
}
if err := s.waitForEvent(ctx, "shipment.created", orderID, 30*time.Second); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("shipment timeout: %w", err))
}

// Success: Mark order fulfilled
evt := event.NewBaseEvent("order.fulfilled", orderID, "order")
return s.eventBus.Publish(ctx, evt)
}

func (s *OrderSaga) compensate(ctx context.Context, orderID uuid.UUID, reason error) error {
s.logger.Warn("Compensating saga", "orderID", orderID, telemetry.Error(reason))

// Compensate in reverse order
_ = s.publishCommand(ctx, "inventory.unreserve", orderID)
_ = s.publishCommand(ctx, "payment.refund", orderID)

cancelEvt := event.NewBaseEvent("order.cancelled", orderID, "order")
_ = s.eventBus.Publish(ctx, cancelEvt)

return reason
}

Read Models

Maintain denormalized read models updated via events:

Write Model (Source of Truth):
├─ Invoice created → DB: invoices table
└─ Publish: invoice.created event

Read Model (Optimized for Queries):
├─ Subscribe to: invoice.created
├─ Update: customer_invoices_view
└─ Faster customer-specific queries

Configuration

Event bus configuration via environment variables:

RabbitMQ Configuration

# Enable RabbitMQ event bus
APPSERVER_EVENTBUS_ENABLED=true

# RabbitMQ connection
APPSERVER_RABBITMQ_URL=amqp://user:pass@localhost:5672/
APPSERVER_RABBITMQ_VHOST=/

# Connection retry
APPSERVER_RABBITMQ_MAX_RETRIES=5
APPSERVER_RABBITMQ_RETRY_DELAY=5s

# Consumer settings
APPSERVER_RABBITMQ_PREFETCH_COUNT=10 # Messages fetched at once
APPSERVER_RABBITMQ_CONSUMER_TIMEOUT=30s

# Queue settings
APPSERVER_RABBITMQ_QUEUE_DURABLE=true
APPSERVER_RABBITMQ_QUEUE_AUTO_DELETE=false

Test Mode

For testing with ephemeral queues:

# Test mode (exclusive, auto-delete queues)
APPSERVER_EVENTBUS_TEST_MODE=true

In-Memory Fallback

Disable RabbitMQ for local development:

# Use in-memory event bus
APPSERVER_EVENTBUS_ENABLED=false

Best Practices

Event Design

Use Past Tense:

Good:
- user.created
- invoice.paid
- order.shipped

Bad:
- create.user
- pay.invoice
- ship.order

Include Correlation IDs:

evt := event.NewBaseEvent("order.placed", orderID, "order")
evt.SetPayload(map[string]interface{}{
"orderID": orderID.String(),
})
// Set metadata with correlation tracking
evt.SetMetadata(map[string]interface{}{
"correlationID": requestID, // Track request flow
"causationID": parentEventID, // Event that caused this event
})
if err := eventBus.Publish(ctx, evt); err != nil {
return fmt.Errorf("failed to publish: %w", err)
}

Keep Payloads Small:

// Good: Reference IDs
payload := map[string]interface{}{
"invoiceID": "inv-123",
"paymentID": "pay-456",
"amount": 1000.00,
}

// Bad: Full objects (avoid this)
payload := map[string]interface{}{
"invoice": fullInvoiceObject, // 500 fields
"payment": fullPaymentObject, // 300 fields
"customer": fullCustomerObject, // 200 fields
}

Version Event Schemas:

type UserCreatedPayloadV2 struct {
UserID string `json:"userID"`
Email string `json:"email"`
Name string `json:"name"` // Added in v2
PhoneNumber string `json:"phoneNumber"` // Added in v2
}

evt := event.NewBaseEvent("user.created", userID, "user")
evt.SetVersion(2) // Incremented when schema changes
evt.SetPayload(UserCreatedPayloadV2{
UserID: userID.String(),
Email: "user@example.com",
Name: "John Doe",
PhoneNumber: "+1234567890",
})

Performance

Batch Publishing:

// Good: Batch publish (if supported by implementation)
events := make([]event.Event, len(orders))
for i, order := range orders {
events[i] = createOrderEvent(order)
}
// Note: Batch publish may not be implemented - check your event bus
for _, evt := range events {
if err := eventBus.Publish(ctx, evt); err != nil {
logger.Warn("Failed to publish event", telemetry.Error(err))
}
}

// Process in parallel for better performance
var wg sync.WaitGroup
for _, order := range orders {
wg.Add(1)
go func(o Order) {
defer wg.Done()
evt := createOrderEvent(o)
_ = eventBus.Publish(ctx, evt)
}(order)
}
wg.Wait()

Tune Prefetch Count:

Low prefetch (1-5):
- Better fairness across consumers
- Lower memory usage
- Higher latency

High prefetch (50-100):
- Better throughput
- Higher memory usage
- Potential message loss on failure

Monitor Queue Depths:

Healthy: Queue depth < 1000
Warning: Queue depth > 10000
Critical: Queue depth > 100000 (backlog growing)

Security

Validate Payloads:

func handleUserCreated(ctx context.Context, evt event.Event) error {
userEvt := evt.(*event.UserCreated)

// Validate required fields
if userEvt.Email == "" || userEvt.UserID == uuid.Nil {
logger.Error("Invalid event payload - missing required fields",
"eventID", evt.EventID())
return nil // Skip invalid events (or return error to requeue)
}

// Sanitize data
sanitized := sanitizeEmail(userEvt.Email)

return processUser(ctx, userEvt.UserID, sanitized)
}

Check Permissions:

func handleSensitiveEvent(ctx context.Context, evt event.Event) error {
// Verify event source has permission
if evt.SourceAppName() == nil {
logger.Warn("Event missing source app", "eventID", evt.EventID())
return nil
}

hasPermission, err := checkPermission(ctx,
*evt.SourceAppName(),
"publish",
evt.EventType(),
)
if err != nil {
return fmt.Errorf("permission check failed: %w", err)
}

if !hasPermission {
logger.Warn("Unauthorized event source",
"source", *evt.SourceAppName(),
"type", evt.EventType())
return nil // Skip unauthorized events
}

return processEvent(ctx, evt)
}

Audit Sensitive Events:

func handlePaymentCaptured(ctx context.Context, evt event.Event) error {
paymentEvt := evt.(*event.PaymentCaptured)

// Audit log
if err := auditLog.Create(ctx, &AuditEntry{
EventType: evt.EventType(),
EventID: evt.EventID(),
Timestamp: evt.OccurredAt(),
Actor: evt.SourceAppName(),
Action: "payment.captured",
Resource: paymentEvt.PaymentID,
Amount: paymentEvt.Amount,
}); err != nil {
logger.Error("Failed to create audit log", telemetry.Error(err))
// Continue processing even if audit fails
}

return processPayment(ctx, paymentEvt)
}

Monitoring

Key Metrics

Queue Metrics:

  • Queue depth (messages waiting)
  • Message rate (messages/second)
  • Consumer lag (time since oldest message)
  • Dead letter queue size

Processing Metrics:

  • Event processing latency (ms)
  • Handler success rate (%)
  • Handler error rate (errors/minute)
  • Retry count

System Metrics:

  • RabbitMQ connection status
  • Consumer count per queue
  • Memory usage
  • Disk usage (message persistence)

Alerts

Critical Alerts:
- RabbitMQ connection down
- Dead letter queue size > 1000
- Queue depth growing > 10 min
- Handler error rate > 10%

Warning Alerts:
- Queue depth > 1000
- Processing latency > 1s
- Consumer count = 0 (no consumers)
- Disk usage > 80%

Dashboards

Monitor event bus health in real-time:

RabbitMQ Dashboard:
├─ Queue depths (graph over time)
├─ Message rates (in/out per second)
├─ Consumer counts per queue
├─ Connection status
└─ Memory/disk usage

Application Dashboard:
├─ Event publish rate by type
├─ Handler success/error rates
├─ Processing latency percentiles (p50, p95, p99)
└─ Dead letter queue size

Troubleshooting

Messages Not Being Delivered

Problem: Published events not received by subscribers

Diagnosis:
1. Check RabbitMQ connection: Is RabbitMQ running?
2. Verify subscription: Is handler registered for event type?
3. Check routing: Does event topic match subscription pattern?
4. Review logs: Are there subscription errors?

Solution:
- Restart RabbitMQ if connection lost
- Re-register subscription after reconnect
- Fix topic/pattern mismatch
- Check firewall/network connectivity

Dead Letter Queue Growing

Problem: Failed messages accumulating in DLQ

Diagnosis:
1. Check DLQ messages: What events are failing?
2. Review handler logs: What errors are occurring?
3. Identify pattern: Is it a specific event type?
4. Check dependencies: Are downstream services healthy?

Solution:
- Fix handler bug causing failures
- Restore downstream service if unavailable
- Replay DLQ messages after fix
- Update event schema if incompatible

High Queue Depth

Problem: Queue depth growing, messages not processing fast enough

Diagnosis:
1. Check consumer count: Are enough consumers running?
2. Review processing latency: Are handlers slow?
3. Check resource usage: CPU/memory bottleneck?
4. Verify prefetch count: Is it configured appropriately?

Solution:
- Scale up consumer count (horizontal scaling)
- Optimize slow handlers (async processing)
- Increase resources (CPU/memory)
- Tune prefetch count for better throughput

Further Reading