Skip to main content

Event Bus (RabbitMQ)

The event bus provides reliable, distributed publish-subscribe messaging for cross-application communication and state synchronization across AppServer instances.

Overview

Based on pkg/v2/infrastructure/eventbus/, the event bus implementation provides:

  • Distributed Messaging: RabbitMQ-based pub/sub for cross-instance synchronization
  • Event Persistence: Optional event store integration for event sourcing
  • Pattern Matching: Wildcard subscriptions (e.g., app.*, settings.*)
  • Multi-Cluster Support: Optional multi-cluster configuration
  • Health Monitoring: Connection health checks with auto-recovery
  • Metrics: Prometheus metrics for published/consumed events

Architecture

Core Components

RabbitMQBus (rabbitmq_provider.go):

  • Implements domain/event.Bus interface
  • Manages connections via ConnectionManager
  • Handles publishing via Publisher
  • Manages subscriptions via Subscriber
  • Optional event store integration

ConnectionManager (connection_manager.go):

  • Manages persistent RabbitMQ connections
  • Auto-reconnect on connection failures
  • Dedicated channels for publishing and consuming

Publisher (publisher.go):

  • Thread-safe event publishing
  • Persistent delivery mode
  • Automatic channel recovery

Subscriber (subscriber.go):

  • Pattern-based subscriptions
  • Automatic message acknowledgment
  • Composite handler support (multiple handlers per event type)

Configuration

type RabbitMQConfig struct {
Host string
Port int
User string
Password string
VHost string
ExchangeName string
ExchangeType string // "topic" for pattern matching
Durable bool // Survive broker restarts
}

Example Configuration

config := &RabbitMQConfig{
Host: "localhost",
Port: 5672,
User: "guest",
Password: "guest",
VHost: "/",
ExchangeName: "appserver.events",
ExchangeType: "topic",
Durable: true,
}

eventBus, err := NewRabbitMQBus(config, logger)

Publishing Events

// Publish event to all subscribers
evt := event.NewAppInstalled(appID, appName)
err := eventBus.Publish(ctx, evt)

Publishing Flow

1. Validate event bus not closed
2. Publish to RabbitMQ topic (based on event.EventType())
3. Optionally persist to event store (asynchronous)
4. Return immediately (non-blocking)

Event Store Integration

// Create event bus with persistent event store
eventBus, err := NewRabbitMQBusWithStore(config, eventStore, logger)

// Events are automatically persisted asynchronously
evt := event.NewAppInstalled(appID, appName)
eventBus.Publish(ctx, evt) // Publishes AND stores

Benefits:

  • Event sourcing capability
  • Audit trail
  • Event replay
  • Storage failures don't block publishing

Subscribing to Events

Exact Event Type

handler := func(ctx context.Context, evt event.Event) error {
// Handle event
return nil
}

ready, err := eventBus.Subscribe("app.installed", handler)
<-ready // Wait for subscription to be ready

Pattern Matching

// Subscribe to all app events
ready, err := eventBus.Subscribe("app.*", handler)

// Subscribe to all events
ready, err := eventBus.Subscribe("#", handler)

Subscription Flow

1. Store handler in internal map
2. If first handler for event type:
- Subscribe to RabbitMQ topic
- Create composite handler
- Signal ready when RabbitMQ subscription complete
3. If additional handler:
- Add to existing subscription
- Signal ready immediately

Composite Handlers

Multiple handlers can subscribe to the same event type:

// Handler 1
eventBus.Subscribe("app.installed", handler1)

// Handler 2
eventBus.Subscribe("app.installed", handler2)

// Both handlers will be called when event is published

Error Handling:

  • All handlers called even if one fails
  • Errors logged but don't block other handlers
  • First error returned

Pattern Syntax

Based on pattern.go:

Exact Match:

"app.installed"  // Only matches "app.installed"

Single Level Wildcard (*):

"app.*"  // Matches "app.installed", "app.uninstalled", etc.

Multi-Level Wildcard (#):

"app.#"  // Matches "app.installed", "app.settings.updated", etc.
"#" // Matches ALL events

Pattern Examples

"app.*"                    // app.installed, app.uninstalled
"app.settings.*" // app.settings.updated, app.settings.deleted
"#" // ALL events
"*.installed" // app.installed, container.installed
"orchestration.#" // All orchestration events

Event Types

Common event types in the system:

App Lifecycle:

  • app.registered
  • app.installed
  • app.uninstalled
  • app.deregistered

Settings:

  • settings.registered
  • settings.updated
  • settings.validation_failed
  • settings.deleted

Orchestration:

  • orchestration.install.started
  • orchestration.install.completed
  • orchestration.install.failed
  • container.created
  • container.started
  • container.healthy
  • container.failed

Permissions:

  • permission.invalidated

Health Monitoring

Based on health_checker.go:

Health Check:

healthy, err := eventBus.HealthCheck(ctx)

Checks:

  • RabbitMQ connection status
  • Channel availability
  • Exchange existence

Auto-Recovery:

  • Automatic reconnection on connection loss
  • Channel recreation on channel errors
  • Subscription re-establishment after reconnect

Metrics

Based on metrics.go:

Published Events Counter:

event_bus_published_total{event_type="app.installed"}

Consumed Events Counter:

event_bus_consumed_total{event_type="app.installed"}

Handler Error Counter:

event_bus_handler_errors_total{event_type="app.installed"}

Multi-Cluster Support

Based on multicluster_manager.go and multicluster_config.go:

Configuration:

type MultiClusterConfig struct {
Clusters []ClusterConfig
}

type ClusterConfig struct {
Name string
Config *RabbitMQConfig
}

Usage:

// Publish to all clusters
multiClusterBus.PublishToAll(ctx, evt)

// Subscribe from specific cluster
multiClusterBus.SubscribeFromCluster(clusterName, eventType, handler)

Best Practices

Event Publishing

DO:

  • Publish events after state changes are persisted
  • Use descriptive event types
  • Include relevant data in event payload
  • Handle publish errors gracefully (log warnings)

DON'T:

  • Block on event publishing
  • Publish events before state changes
  • Include sensitive data in event payloads (use event IDs for lookups)

Event Subscription

DO:

  • Make handlers idempotent (events may be delivered multiple times)
  • Use pattern matching to subscribe to related events
  • Handle errors gracefully (don't crash on event processing errors)
  • Wait for ready signal before considering subscription active

DON'T:

  • Perform long-running operations in handlers (use async workers)
  • Assume event order (events from different publishers may arrive out of order)
  • Subscribe to # (all events) unless necessary

Error Handling

DO:

  • Log errors with context (event type, event ID)
  • Continue processing after non-critical errors
  • Use dead letter queues for failed events

DON'T:

  • Swallow errors silently
  • Retry indefinitely (use exponential backoff)

Code References

ComponentFilePurpose
RabbitMQBusrabbitmq_provider.goMain event bus implementation
ConnectionManagerconnection_manager.goConnection lifecycle management
Publisherpublisher.goEvent publishing
Subscribersubscriber.goEvent subscription
Pattern Matchingpattern.goWildcard pattern support
Health Checkerhealth_checker.goConnection health monitoring
Metricsmetrics.goPrometheus metrics
Multi-Clustermulticluster_manager.goMulti-cluster support