Event-Driven Architecture
Easy AppServer uses event-driven patterns to enable loose coupling between applications, ensure eventual consistency, and coordinate distributed operations across the platform.
Overview
The platform implements a publish-subscribe (pub/sub) architecture using RabbitMQ as the event bus. Applications can:
- Publish domain events when their state changes
- Subscribe to events from other applications and platform services
- Implement eventual consistency patterns across distributed components
- Trigger cache invalidation across all platform instances
- React to system-wide state changes asynchronously
Code Reference: pkg/v2/domain/event/bus.go:10
Event Bus Architecture
Bus Interface
The platform defines a vendor-neutral event bus interface:
Code Reference: pkg/v2/domain/event/bus.go:10
// Bus provides centralized event distribution for domain events
type Bus interface {
// Publish publishes a domain event to all subscribers
Publish(ctx context.Context, event Event) error
// Subscribe subscribes to a specific event type
Subscribe(eventType string, handler Handler) (<-chan struct{}, error)
// Unsubscribe removes a subscription
Unsubscribe(eventType string, handler Handler) error
// Close closes the event bus and cleans up resources
Close() error
}
This interface can be implemented by different backends:
- RabbitMQ - Production event bus with persistence and durability
- In-Memory - Development/testing bus without external dependencies
- Kafka - Future high-throughput implementation
event.Bus.Subscribematches exact event types. Wildcard topic bindings only exist within the RabbitMQ subscriber (for example when bindingapp.#directly through AMQP), but the domain bus requires the concrete event type.
Event Interface
All events implement a common interface:
type Event interface {
EventID() uuid.UUID // Unique event identifier
EventType() string // Event type (e.g., "app.installed")
OccurredAt() time.Time // Event timestamp
AggregateID() uuid.UUID // ID of the aggregate that generated the event
AggregateType() string // Type of aggregate (e.g., "app", "user")
Topic() string // RabbitMQ routing topic
SourceAppID() *uuid.UUID // Optional app that triggered event
SourceAppName() *string // Optional app name
Metadata() map[string]interface{} // Optional metadata
Payload() []byte // Serialized event payload
Version() int // Event schema version
}
Handler Function
Event handlers process events asynchronously:
type Handler func(ctx context.Context, event Event) error
Handlers should be:
- Idempotent - Safe to run multiple times
- Fast - Process quickly or delegate to background workers
- Error-Resilient - Handle failures gracefully
RabbitMQ Integration
Production Event Bus
The AppServer integrates with RabbitMQ for production deployments.
Code Reference: pkg/v2/server/eventbus.go:12
func createEventBus(cfg *config.Config, logger telemetry.Logger) (event.Bus, error) {
if !cfg.EventBus.Enabled {
logger.Info("using in-memory event bus (RabbitMQ disabled)")
return event.NewInMemoryBus(), nil
}
var rabbitmqConfig *eventbus.RabbitMQConfig
if cfg.EventBus.TestMode {
// Test-optimized config (exclusive queues, auto-delete)
rabbitmqConfig = eventbus.TestModeRabbitMQConfig(cfg.EventBus.URL)
} else {
// Production config (durable queues, persistent)
rabbitmqConfig = eventbus.DefaultRabbitMQConfig(cfg.EventBus.URL)
}
// Create RabbitMQ event bus
bus, err := eventbus.NewRabbitMQBus(rabbitmqConfig, logger)
if err != nil {
return nil, fmt.Errorf("failed to create RabbitMQ event bus: %w", err)
}
return bus, nil
}
RabbitMQ Features
Durable Message Queues:
- Messages persisted to disk
- Survive RabbitMQ restarts
- Guaranteed delivery (at-least-once)
Topic-Based Routing:
- Events routed by topic patterns
- Subscribers use wildcard patterns
- Flexible subscription filtering
Dead Letters (planned):
- Queues are declared with DLQ bindings, but the current subscriber requeues messages on error instead of routing them to the DLQ
- Manual inspection/replay tooling has not been implemented yet
Message Prefetch:
- Control consumer throughput
- Configurable prefetch count
- Balance latency vs throughput
In-Memory Event Bus
For development and testing, an in-memory event bus is available:
Features:
- Enabled when
APPSERVER_EVENTBUS_ENABLED=false - No external dependencies (no RabbitMQ required)
- Events not persisted (lost on restart)
- No cross-instance communication (single instance only)
- Synchronous delivery (no message broker delay)
Use Cases:
- Local development
- Unit testing
- Integration testing
- CI/CD pipelines
Domain Events
Platform Events
Events published by the AppServer itself for system-wide coordination:
Application Lifecycle Events
Code Reference: pkg/v2/domain/event/app_events.go
EventTypeAppRegistered - Application registered with platform
EventTypeAppInstalled - Application successfully installed
EventTypeAppUninstalled - Application removed
EventTypeAppDeregistered - Application deregistered from platform
Event Payload Example:
{
"eventID": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "app.installed",
"occurredAt": "2025-01-24T15:30:00Z",
"aggregateID": "app-uuid",
"aggregateType": "app",
"payload": {
"appID": "app-uuid",
"appName": "de.easy-m.todos",
"version": "1.0.0",
"installedAt": "2025-01-24T15:30:00Z"
}
}
Settings & Permission Events
Code Reference:
- Settings:
pkg/v2/domain/event/settings_events.go - Permission cache:
pkg/v2/domain/event/permission_events.go
EventTypeSettingsRegistered - Settings schema registered
EventTypeSettingsUpdated - Application settings changed
EventTypeSettingsValidationFailed - Settings validation failed
EventTypeSettingsDeleted - Settings schema deleted
permission.invalidate - Invalidate permission caches for an app/pattern
Settings Updated Event:
func NewSettingsUpdated(appID uuid.UUID, appName string, changedKeys []string, updatedBy *uuid.UUID) Event {
payload := SettingsUpdatedPayload{
AppID: appID,
AppName: appName,
ChangedKeys: changedKeys,
UpdatedBy: updatedBy,
}
// ... create and return event
}
This event triggers cache invalidation across all platform instances.
Permission Events
Code Reference: pkg/v2/domain/event/permission_events.go:20
PermissionInvalidateEventType - Permission cache invalidation
Permission Invalidation Event:
func NewPermissionInvalidateEvent(appName, pattern string) *PermissionInvalidateEvent {
baseEvent := NewBaseEvent(
PermissionInvalidateEventType,
uuid.Nil,
"permission",
)
// Set metadata with app name and pattern
return &PermissionInvalidateEvent{BaseEvent: baseEvent}
}
User Events
EventTypeUserCreated - New user account created
EventTypeUserUpdated - User profile updated
EventTypeUserDeleted - User account deleted
Session Events
EventTypeSessionCreated - User session created
EventTypeSessionRevoked - User session revoked
EventTypeSessionExpired - User session expired
Application-Specific Events
Current: Event bus integration is server-side only. The Go services (pkg/v2/) have full access to the event bus for publishing and subscribing to events.
Planned: Node.js SDK event bus helpers (publishEvent, eventBus.subscribe) are not yet implemented. Applications currently communicate via Hooks (for event-driven patterns with responses) and Activities (for request/response workflows).
See Hooks Architecture for inter-app event communication today.
Applications running as Go services can publish custom domain events directly via the event bus:
Example: Publishing Events from Go:
// From within a Go service
event := event.NewBaseEvent(
"invoice.created",
invoiceID,
"invoice",
)
event.SetPayload(map[string]interface{}{
"invoiceID": "inv-123",
"customerID": "cust-456",
"amount": 1000.00,
"currency": "EUR",
})
if err := eventBus.Publish(ctx, event); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
Event Subscriptions
Applications subscribe to events they care about. Currently, event subscriptions are available for server-side Go services that integrate directly with the platform's event bus.
Subscription Patterns (Go Services)
Exact Match
Subscribe to a specific event type:
// Subscribe to exact event type
_, err := eventBus.Subscribe(event.EventTypeAppInstalled, func(ctx context.Context, evt event.Event) error {
appEvt := evt.(*event.AppInstalled)
log.Info("App installed", "appName", appEvt.AppName)
return sendNotification(appEvt)
})
Wildcard Patterns (RabbitMQ Only)
RabbitMQ supports topic-based wildcards when binding queues directly:
Patterns:
invoice.*- All invoice events (created, updated, paid, etc.)*.created- All creation events across all entitiesapp.#- All application events (multi-level wildcard)#- All events (use with caution, high volume)
Note: The domain event.Bus interface requires exact event types. Wildcard subscriptions work at the RabbitMQ infrastructure level but are not exposed through the standard Bus interface.
Server-Side Subscription Example
Platform services subscribe during initialization:
// Example from settings service
func (s *SettingsService) Start(ctx context.Context) error {
// Subscribe to app lifecycle events
_, err := s.eventBus.Subscribe(event.EventTypeAppUninstalled, func(ctx context.Context, evt event.Event) error {
appEvt := evt.(*event.AppUninstalled)
return s.handleAppUninstalled(ctx, appEvt.AppID)
})
if err != nil {
return fmt.Errorf("failed to subscribe: %w", err)
}
return nil
}
Planned: Node.js SDK event subscription APIs (eventBus.subscribe) for application-level event handling. For now, applications should use:
- Hooks - For subscribing to events from other apps with synchronous responses
- Activities - For request/response patterns between apps
Event Handling
Handler Implementation (Go Services)
Event handlers process events asynchronously:
func handleUserCreated(ctx context.Context, evt event.Event) error {
userEvt := evt.(*event.UserCreated)
// Extract payload
logger.Info("New user created",
"userID", userEvt.UserID,
"email", userEvt.Email)
// Create user profile in local database
if err := db.UserProfiles.Create(ctx, &UserProfile{
UserID: userEvt.UserID,
Email: userEvt.Email,
CreatedAt: userEvt.OccurredAt(),
}); err != nil {
return fmt.Errorf("failed to create profile: %w", err)
}
// Send welcome email
if err := emailService.Send(ctx, WelcomeEmail{
To: userEvt.Email,
Template: "welcome",
}); err != nil {
logger.Warn("Failed to send welcome email", telemetry.Error(err))
// Don't fail the handler for non-critical operations
}
// Publish follow-up event
profileEvent := event.NewBaseEvent("profile.initialized", userEvt.UserID, "user")
if err := eventBus.Publish(ctx, profileEvent); err != nil {
logger.Warn("Failed to publish follow-up event", telemetry.Error(err))
}
return nil
}
Error Handling
Handlers should handle errors gracefully:
func handleInvoiceCreated(ctx context.Context, evt event.Event) error {
invoiceEvt := evt.(*event.InvoiceCreated)
// Process event
if err := processInvoice(ctx, invoiceEvt.InvoiceID); err != nil {
// Log error
logger.Error("Failed to process invoice event",
"eventID", evt.EventID(),
"invoiceID", invoiceEvt.InvoiceID,
telemetry.Error(err))
// Publish error event for monitoring
errorEvent := event.NewBaseEvent("invoice.processing.failed", invoiceEvt.InvoiceID, "invoice")
errorEvent.SetPayload(map[string]interface{}{
"originalEventID": evt.EventID(),
"error": err.Error(),
})
if pubErr := eventBus.Publish(ctx, errorEvent); pubErr != nil {
logger.Warn("Failed to publish error event", telemetry.Error(pubErr))
}
// Return error to trigger retry (RabbitMQ will requeue)
return err
}
return nil
}
Retry Strategies
Automatic Retry (RabbitMQ):
- Failed messages are requeued
- Configurable retry count
- Exponential backoff (via TTL + DLQ)
Manual Retry (Go):
func handleEventWithRetry(ctx context.Context, evt event.Event) error {
maxRetries := 3
attempt := 0
for attempt < maxRetries {
if err := processEvent(ctx, evt); err != nil {
attempt++
if attempt >= maxRetries {
// Max retries exceeded - will move to DLQ
logger.Error("Max retries exceeded",
"eventID", evt.EventID(),
"attempts", attempt)
return err
}
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * time.Second
time.Sleep(backoff)
continue
}
return nil // Success
}
return fmt.Errorf("unexpected retry loop exit")
}
Dead Letter Queues
Failed messages are routed to dead letter queues for manual intervention:
Message Processing Flow:
├─ Attempt 1: Process message
│ └─ Failure: Requeue
├─ Attempt 2: Process message
│ └─ Failure: Requeue
├─ Attempt 3: Process message
│ └─ Failure: Move to DLQ
└─ Dead Letter Queue:
├─ Manual inspection
├─ Fix root cause
└─ Replay or discard
Idempotency
Event handlers should be idempotent (safe to run multiple times):
func handleUserCreatedIdempotent(ctx context.Context, evt event.Event) error {
userEvt := evt.(*event.UserCreated)
// Check if already processed (using event ID)
processed, err := db.ProcessedEvents.Exists(ctx, evt.EventID())
if err != nil {
return fmt.Errorf("failed to check processed events: %w", err)
}
if processed {
logger.Info("Event already processed, skipping", "eventID", evt.EventID())
return nil
}
// Check if user profile already exists
existingProfile, err := db.UserProfiles.FindByUserID(ctx, userEvt.UserID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("failed to check existing profile: %w", err)
}
if existingProfile != nil {
logger.Info("User profile already exists, skipping", "userID", userEvt.UserID)
// Mark as processed
if err := db.ProcessedEvents.Create(ctx, evt.EventID(), time.Now()); err != nil {
return fmt.Errorf("failed to mark as processed: %w", err)
}
return nil
}
// Create user profile
if err := db.UserProfiles.Create(ctx, &UserProfile{
UserID: userEvt.UserID,
Email: userEvt.Email,
}); err != nil {
return fmt.Errorf("failed to create profile: %w", err)
}
// Mark as processed
if err := db.ProcessedEvents.Create(ctx, evt.EventID(), time.Now()); err != nil {
logger.Warn("Failed to mark event as processed", telemetry.Error(err))
// Don't fail - profile was created successfully
}
return nil
}
Cache Invalidation
The event bus is the primary mechanism for distributed cache invalidation.
Invalidation Flow
Cache Invalidation Pattern:
├─ 1. Service updates data in database
│ (e.g., marketplace service updates app state)
├─ 2. Service publishes event
│ (e.g., app.installed event)
├─ 3. Event bus distributes to all instances
│ (RabbitMQ routes to all subscribers)
├─ 4. Each instance receives event
│ (Including the publishing instance)
├─ 5. Subscribers invalidate cache entries
│ (cache.Invalidate(appID))
└─ 6. Next read fetches fresh data from database
Settings Cache Invalidation
Code Reference: pkg/v2/application/settings/settings_service.go:47
// After updating settings, publish event
if err := s.eventBus.Publish(ctx, event.NewSettingsUpdated(appID, appName, changedKeys, updatedBy)); err != nil {
s.logger.Warn("Failed to publish settings.updated event", telemetry.Error(err))
}
// Subscribers invalidate cache
s.eventBus.Subscribe(event.EventTypeSettingsUpdated, func(ctx context.Context, evt event.Event) error {
settingsEvt := evt.(*event.SettingsUpdated)
s.cache.Invalidate(settingsEvt.AppID)
return nil
})
Permission Cache Invalidation
Code Reference: pkg/v2/infrastructure/permission/cache/cache.go:12
// Subscribe to permission invalidation events
s.eventBus.Subscribe(event.PermissionInvalidateEventType, func(ctx context.Context, evt event.Event) error {
permEvt := evt.(*event.PermissionInvalidateEvent)
// Invalidate L1 cache (in-memory)
s.l1Cache.InvalidatePattern(permEvt.Pattern)
// Invalidate L2 cache (Redis)
s.l2Cache.DeleteByPattern(permEvt.Pattern)
return nil
})
The
permission.invalidateevent type is defined inpkg/v2/domain/event/permission_events.go. Some consumers (like the UI service) still subscribe to the legacy stringpermission.invalidated, so those handlers will not fire until they are updated.
Asset Cache Invalidation
When lifecycle events occur, the UI service clears asset caches. It subscribes
to the wildcard topic app.* (works with RabbitMQ bindings; the in-memory bus
does not support wildcards) and drops cache entries for the affected app.
_, err := s.eventBus.Subscribe("app.*", s.handleAppLifecycleEvent)
if err != nil {
return err
}
GraphQL Subscriptions
GraphQL subscriptions bridge the event bus to real-time UI updates.
Code Reference: pkg/v2/presentation/graphql/resolvers/subscriptions.resolvers.go:20
AppStateChanged Subscription
GraphQL Subscription Flow:
├─ 1. Client subscribes via WebSocket
│ subscription { appStateChanged { name state } }
├─ 2. Resolver subscribes to event bus
│ Subscribe to: app.registered, app.installed, app.uninstalled
├─ 3. Event published to event bus
│ Event: app.installed (appName: "de.easy-m.todos")
├─ 4. Event handler receives event
│ Extract appName, check filters
├─ 5. Fetch full app data
│ Query marketplace service for complete app object
├─ 6. Convert to GraphQL type
│ Transform domain.App → generated.App
├─ 7. Send to GraphQL channel
│ Channel pushes to WebSocket
└─ 8. Client receives real-time update
UI updates automatically
Implementation:
func (r *subscriptionResolver) AppStateChanged(ctx context.Context, filter *generated.AppStateFilter) (<-chan *generated.App, error) {
// Create channel for sending app updates
appChan := make(chan *generated.App, 10)
// Determine which event types to subscribe to
eventTypes := []string{
event.EventTypeAppRegistered,
event.EventTypeAppInstalled,
event.EventTypeAppUninstalled,
}
// Create event handler
handler := func(eventCtx context.Context, evt event.Event) error {
// Extract app name from event
var appName string
switch e := evt.(type) {
case *event.AppInstalled:
appName = e.AppName
}
// Apply filters
if filter != nil && filter.AppNames != nil {
if !contains(filter.AppNames, appName) {
return nil // Skip filtered app
}
}
// Fetch full app data
domainApp, err := r.marketplaceService.GetApp(eventCtx, marketplace.GetAppQuery{AppName: &appName})
// Convert and send to channel
gqlApp := convertAppToGraphQL(domainApp)
select {
case appChan <- gqlApp:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// Subscribe to all relevant event types
for _, eventType := range eventTypes {
r.eventBus.Subscribe(eventType, handler)
}
// Cleanup on context cancellation
go func() {
<-ctx.Done()
close(appChan)
}()
return appChan, nil
}
Client Usage
Frontend Subscription:
import { useSubscription, gql } from '@apollo/client';
const APP_STATE_SUBSCRIPTION = gql`
subscription OnAppStateChanged {
appStateChanged {
name
state
version
installedAt
}
}
`;
function AppList() {
const { data, loading } = useSubscription(APP_STATE_SUBSCRIPTION, {
onData: ({ data }) => {
console.log('App state changed:', data.appStateChanged);
// UI automatically re-renders
}
});
// ...
}
Event Ordering
Per-Partition Ordering
RabbitMQ guarantees message order within a single queue:
Queue: app.lifecycle
├─ Event 1: app.registered (app-a) - Delivered first
├─ Event 2: app.installed (app-a) - Delivered second
└─ Event 3: app.uninstalled (app-a) - Delivered third
Order preserved within same queue/topic
Global Ordering
For strict global ordering across all event types:
Use Case: Audit log reconstruction
Solution: Single queue with all events (sacrifices parallelism)
Global Event Queue:
├─ Event 1: app.registered (app-a)
├─ Event 2: user.created (user-1)
├─ Event 3: app.installed (app-a)
├─ Event 4: settings.updated (app-a)
└─ Event 5: user.updated (user-1)
All events ordered by occurrence time
Idempotency Requirements
Due to potential redelivery, all handlers must be idempotent:
// Use event IDs to detect duplicates (in-memory example)
type EventTracker struct {
processed sync.Map
}
func (t *EventTracker) handleEvent(ctx context.Context, evt event.Event) error {
eventID := evt.EventID().String()
// Check if already processed
if _, exists := t.processed.LoadOrStore(eventID, true); exists {
logger.Info("Event already processed, skipping", "eventID", eventID)
return nil // Already processed
}
// Do work
if err := doWork(ctx, evt); err != nil {
t.processed.Delete(eventID) // Remove on failure to allow retry
return err
}
return nil
}
Eventual Consistency
The platform embraces eventual consistency for distributed operations.
Saga Pattern
Multi-step distributed transactions coordinated via events:
Order Fulfillment Saga:
├─ 1. Order Service: Publish order.placed
├─ 2. Inventory Service: Reserve stock → Publish inventory.reserved
├─ 3. Payment Service: Charge customer → Publish payment.captured
├─ 4. Shipping Service: Create shipment → Publish shipment.created
└─ 5. Order Service: Mark order complete → Publish order.fulfilled
If any step fails:
├─ Publish compensation events
├─ inventory.reservation.cancelled
├─ payment.refunded
└─ order.cancelled
Implementation (Go):
type OrderSaga struct {
eventBus event.Bus
logger telemetry.Logger
}
func (s *OrderSaga) Execute(ctx context.Context, orderID uuid.UUID) error {
// Step 1: Reserve inventory
if err := s.publishCommand(ctx, "inventory.reserve", orderID); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("reserve failed: %w", err))
}
if err := s.waitForEvent(ctx, "inventory.reserved", orderID, 30*time.Second); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("inventory timeout: %w", err))
}
// Step 2: Capture payment
if err := s.publishCommand(ctx, "payment.capture", orderID); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("payment failed: %w", err))
}
if err := s.waitForEvent(ctx, "payment.captured", orderID, 30*time.Second); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("payment timeout: %w", err))
}
// Step 3: Create shipment
if err := s.publishCommand(ctx, "shipment.create", orderID); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("shipment failed: %w", err))
}
if err := s.waitForEvent(ctx, "shipment.created", orderID, 30*time.Second); err != nil {
return s.compensate(ctx, orderID, fmt.Errorf("shipment timeout: %w", err))
}
// Success: Mark order fulfilled
evt := event.NewBaseEvent("order.fulfilled", orderID, "order")
return s.eventBus.Publish(ctx, evt)
}
func (s *OrderSaga) compensate(ctx context.Context, orderID uuid.UUID, reason error) error {
s.logger.Warn("Compensating saga", "orderID", orderID, telemetry.Error(reason))
// Compensate in reverse order
_ = s.publishCommand(ctx, "inventory.unreserve", orderID)
_ = s.publishCommand(ctx, "payment.refund", orderID)
cancelEvt := event.NewBaseEvent("order.cancelled", orderID, "order")
_ = s.eventBus.Publish(ctx, cancelEvt)
return reason
}
Read Models
Maintain denormalized read models updated via events:
Write Model (Source of Truth):
├─ Invoice created → DB: invoices table
└─ Publish: invoice.created event
Read Model (Optimized for Queries):
├─ Subscribe to: invoice.created
├─ Update: customer_invoices_view
└─ Faster customer-specific queries
Configuration
Event bus configuration via environment variables:
RabbitMQ Configuration
# Enable RabbitMQ event bus
APPSERVER_EVENTBUS_ENABLED=true
# RabbitMQ connection
APPSERVER_RABBITMQ_URL=amqp://user:pass@localhost:5672/
APPSERVER_RABBITMQ_VHOST=/
# Connection retry
APPSERVER_RABBITMQ_MAX_RETRIES=5
APPSERVER_RABBITMQ_RETRY_DELAY=5s
# Consumer settings
APPSERVER_RABBITMQ_PREFETCH_COUNT=10 # Messages fetched at once
APPSERVER_RABBITMQ_CONSUMER_TIMEOUT=30s
# Queue settings
APPSERVER_RABBITMQ_QUEUE_DURABLE=true
APPSERVER_RABBITMQ_QUEUE_AUTO_DELETE=false
Test Mode
For testing with ephemeral queues:
# Test mode (exclusive, auto-delete queues)
APPSERVER_EVENTBUS_TEST_MODE=true
In-Memory Fallback
Disable RabbitMQ for local development:
# Use in-memory event bus
APPSERVER_EVENTBUS_ENABLED=false
Best Practices
Event Design
Use Past Tense:
Good:
- user.created
- invoice.paid
- order.shipped
Bad:
- create.user
- pay.invoice
- ship.order
Include Correlation IDs:
evt := event.NewBaseEvent("order.placed", orderID, "order")
evt.SetPayload(map[string]interface{}{
"orderID": orderID.String(),
})
// Set metadata with correlation tracking
evt.SetMetadata(map[string]interface{}{
"correlationID": requestID, // Track request flow
"causationID": parentEventID, // Event that caused this event
})
if err := eventBus.Publish(ctx, evt); err != nil {
return fmt.Errorf("failed to publish: %w", err)
}
Keep Payloads Small:
// Good: Reference IDs
payload := map[string]interface{}{
"invoiceID": "inv-123",
"paymentID": "pay-456",
"amount": 1000.00,
}
// Bad: Full objects (avoid this)
payload := map[string]interface{}{
"invoice": fullInvoiceObject, // 500 fields
"payment": fullPaymentObject, // 300 fields
"customer": fullCustomerObject, // 200 fields
}
Version Event Schemas:
type UserCreatedPayloadV2 struct {
UserID string `json:"userID"`
Email string `json:"email"`
Name string `json:"name"` // Added in v2
PhoneNumber string `json:"phoneNumber"` // Added in v2
}
evt := event.NewBaseEvent("user.created", userID, "user")
evt.SetVersion(2) // Incremented when schema changes
evt.SetPayload(UserCreatedPayloadV2{
UserID: userID.String(),
Email: "user@example.com",
Name: "John Doe",
PhoneNumber: "+1234567890",
})
Performance
Batch Publishing:
// Good: Batch publish (if supported by implementation)
events := make([]event.Event, len(orders))
for i, order := range orders {
events[i] = createOrderEvent(order)
}
// Note: Batch publish may not be implemented - check your event bus
for _, evt := range events {
if err := eventBus.Publish(ctx, evt); err != nil {
logger.Warn("Failed to publish event", telemetry.Error(err))
}
}
// Process in parallel for better performance
var wg sync.WaitGroup
for _, order := range orders {
wg.Add(1)
go func(o Order) {
defer wg.Done()
evt := createOrderEvent(o)
_ = eventBus.Publish(ctx, evt)
}(order)
}
wg.Wait()
Tune Prefetch Count:
Low prefetch (1-5):
- Better fairness across consumers
- Lower memory usage
- Higher latency
High prefetch (50-100):
- Better throughput
- Higher memory usage
- Potential message loss on failure
Monitor Queue Depths:
Healthy: Queue depth < 1000
Warning: Queue depth > 10000
Critical: Queue depth > 100000 (backlog growing)
Security
Validate Payloads:
func handleUserCreated(ctx context.Context, evt event.Event) error {
userEvt := evt.(*event.UserCreated)
// Validate required fields
if userEvt.Email == "" || userEvt.UserID == uuid.Nil {
logger.Error("Invalid event payload - missing required fields",
"eventID", evt.EventID())
return nil // Skip invalid events (or return error to requeue)
}
// Sanitize data
sanitized := sanitizeEmail(userEvt.Email)
return processUser(ctx, userEvt.UserID, sanitized)
}
Check Permissions:
func handleSensitiveEvent(ctx context.Context, evt event.Event) error {
// Verify event source has permission
if evt.SourceAppName() == nil {
logger.Warn("Event missing source app", "eventID", evt.EventID())
return nil
}
hasPermission, err := checkPermission(ctx,
*evt.SourceAppName(),
"publish",
evt.EventType(),
)
if err != nil {
return fmt.Errorf("permission check failed: %w", err)
}
if !hasPermission {
logger.Warn("Unauthorized event source",
"source", *evt.SourceAppName(),
"type", evt.EventType())
return nil // Skip unauthorized events
}
return processEvent(ctx, evt)
}
Audit Sensitive Events:
func handlePaymentCaptured(ctx context.Context, evt event.Event) error {
paymentEvt := evt.(*event.PaymentCaptured)
// Audit log
if err := auditLog.Create(ctx, &AuditEntry{
EventType: evt.EventType(),
EventID: evt.EventID(),
Timestamp: evt.OccurredAt(),
Actor: evt.SourceAppName(),
Action: "payment.captured",
Resource: paymentEvt.PaymentID,
Amount: paymentEvt.Amount,
}); err != nil {
logger.Error("Failed to create audit log", telemetry.Error(err))
// Continue processing even if audit fails
}
return processPayment(ctx, paymentEvt)
}
Monitoring
Key Metrics
Queue Metrics:
- Queue depth (messages waiting)
- Message rate (messages/second)
- Consumer lag (time since oldest message)
- Dead letter queue size
Processing Metrics:
- Event processing latency (ms)
- Handler success rate (%)
- Handler error rate (errors/minute)
- Retry count
System Metrics:
- RabbitMQ connection status
- Consumer count per queue
- Memory usage
- Disk usage (message persistence)
Alerts
Critical Alerts:
- RabbitMQ connection down
- Dead letter queue size > 1000
- Queue depth growing > 10 min
- Handler error rate > 10%
Warning Alerts:
- Queue depth > 1000
- Processing latency > 1s
- Consumer count = 0 (no consumers)
- Disk usage > 80%
Dashboards
Monitor event bus health in real-time:
RabbitMQ Dashboard:
├─ Queue depths (graph over time)
├─ Message rates (in/out per second)
├─ Consumer counts per queue
├─ Connection status
└─ Memory/disk usage
Application Dashboard:
├─ Event publish rate by type
├─ Handler success/error rates
├─ Processing latency percentiles (p50, p95, p99)
└─ Dead letter queue size
Troubleshooting
Messages Not Being Delivered
Problem: Published events not received by subscribers
Diagnosis:
1. Check RabbitMQ connection: Is RabbitMQ running?
2. Verify subscription: Is handler registered for event type?
3. Check routing: Does event topic match subscription pattern?
4. Review logs: Are there subscription errors?
Solution:
- Restart RabbitMQ if connection lost
- Re-register subscription after reconnect
- Fix topic/pattern mismatch
- Check firewall/network connectivity
Dead Letter Queue Growing
Problem: Failed messages accumulating in DLQ
Diagnosis:
1. Check DLQ messages: What events are failing?
2. Review handler logs: What errors are occurring?
3. Identify pattern: Is it a specific event type?
4. Check dependencies: Are downstream services healthy?
Solution:
- Fix handler bug causing failures
- Restore downstream service if unavailable
- Replay DLQ messages after fix
- Update event schema if incompatible
High Queue Depth
Problem: Queue depth growing, messages not processing fast enough
Diagnosis:
1. Check consumer count: Are enough consumers running?
2. Review processing latency: Are handlers slow?
3. Check resource usage: CPU/memory bottleneck?
4. Verify prefetch count: Is it configured appropriately?
Solution:
- Scale up consumer count (horizontal scaling)
- Optimize slow handlers (async processing)
- Increase resources (CPU/memory)
- Tune prefetch count for better throughput
Related Concepts
- Hooks Architecture - Event-driven hooks with responses
- Caching Strategy - Event-based cache invalidation
- GraphQL API & Subscriptions - Real-time UI updates
- Settings Management - Settings update events
Further Reading
- RabbitMQ Documentation - Message broker
- Event-Driven Architecture - Martin Fowler
- Saga Pattern - Distributed transactions
- Event Sourcing - Event-driven persistence