Event Bus (RabbitMQ)
The event bus provides reliable, distributed publish-subscribe messaging for cross-application communication and state synchronization across AppServer instances.
Overview
Based on pkg/v2/infrastructure/eventbus/, the event bus implementation provides:
- Distributed Messaging: RabbitMQ-based pub/sub for cross-instance synchronization
- Event Persistence: Optional event store integration for event sourcing
- Pattern Matching: Wildcard subscriptions (e.g.,
app.*,settings.*) - Multi-Cluster Support: Optional multi-cluster configuration
- Health Monitoring: Connection health checks with auto-recovery
- Metrics: Prometheus metrics for published/consumed events
Architecture
Core Components
RabbitMQBus (rabbitmq_provider.go):
- Implements
domain/event.Businterface - Manages connections via
ConnectionManager - Handles publishing via
Publisher - Manages subscriptions via
Subscriber - Optional event store integration
ConnectionManager (connection_manager.go):
- Manages persistent RabbitMQ connections
- Auto-reconnect on connection failures
- Dedicated channels for publishing and consuming
Publisher (publisher.go):
- Thread-safe event publishing
- Persistent delivery mode
- Automatic channel recovery
Subscriber (subscriber.go):
- Pattern-based subscriptions
- Automatic message acknowledgment
- Composite handler support (multiple handlers per event type)
Configuration
type RabbitMQConfig struct {
Host string
Port int
User string
Password string
VHost string
ExchangeName string
ExchangeType string // "topic" for pattern matching
Durable bool // Survive broker restarts
}
Example Configuration
config := &RabbitMQConfig{
Host: "localhost",
Port: 5672,
User: "guest",
Password: "guest",
VHost: "/",
ExchangeName: "appserver.events",
ExchangeType: "topic",
Durable: true,
}
eventBus, err := NewRabbitMQBus(config, logger)
Publishing Events
// Publish event to all subscribers
evt := event.NewAppInstalled(appID, appName)
err := eventBus.Publish(ctx, evt)
Publishing Flow
1. Validate event bus not closed
2. Publish to RabbitMQ topic (based on event.EventType())
3. Optionally persist to event store (asynchronous)
4. Return immediately (non-blocking)
Event Store Integration
// Create event bus with persistent event store
eventBus, err := NewRabbitMQBusWithStore(config, eventStore, logger)
// Events are automatically persisted asynchronously
evt := event.NewAppInstalled(appID, appName)
eventBus.Publish(ctx, evt) // Publishes AND stores
Benefits:
- Event sourcing capability
- Audit trail
- Event replay
- Storage failures don't block publishing
Subscribing to Events
Exact Event Type
handler := func(ctx context.Context, evt event.Event) error {
// Handle event
return nil
}
ready, err := eventBus.Subscribe("app.installed", handler)
<-ready // Wait for subscription to be ready
Pattern Matching
// Subscribe to all app events
ready, err := eventBus.Subscribe("app.*", handler)
// Subscribe to all events
ready, err := eventBus.Subscribe("#", handler)
Subscription Flow
1. Store handler in internal map
2. If first handler for event type:
- Subscribe to RabbitMQ topic
- Create composite handler
- Signal ready when RabbitMQ subscription complete
3. If additional handler:
- Add to existing subscription
- Signal ready immediately
Composite Handlers
Multiple handlers can subscribe to the same event type:
// Handler 1
eventBus.Subscribe("app.installed", handler1)
// Handler 2
eventBus.Subscribe("app.installed", handler2)
// Both handlers will be called when event is published
Error Handling:
- All handlers called even if one fails
- Errors logged but don't block other handlers
- First error returned
Pattern Syntax
Based on pattern.go:
Exact Match:
"app.installed" // Only matches "app.installed"
Single Level Wildcard (*):
"app.*" // Matches "app.installed", "app.uninstalled", etc.
Multi-Level Wildcard (#):
"app.#" // Matches "app.installed", "app.settings.updated", etc.
"#" // Matches ALL events
Pattern Examples
"app.*" // app.installed, app.uninstalled
"app.settings.*" // app.settings.updated, app.settings.deleted
"#" // ALL events
"*.installed" // app.installed, container.installed
"orchestration.#" // All orchestration events
Event Types
Common event types in the system:
App Lifecycle:
app.registeredapp.installedapp.uninstalledapp.deregistered
Settings:
settings.registeredsettings.updatedsettings.validation_failedsettings.deleted
Orchestration:
orchestration.install.startedorchestration.install.completedorchestration.install.failedcontainer.createdcontainer.startedcontainer.healthycontainer.failed
Permissions:
permission.invalidated
Health Monitoring
Based on health_checker.go:
Health Check:
healthy, err := eventBus.HealthCheck(ctx)
Checks:
- RabbitMQ connection status
- Channel availability
- Exchange existence
Auto-Recovery:
- Automatic reconnection on connection loss
- Channel recreation on channel errors
- Subscription re-establishment after reconnect
Metrics
Based on metrics.go:
Published Events Counter:
event_bus_published_total{event_type="app.installed"}
Consumed Events Counter:
event_bus_consumed_total{event_type="app.installed"}
Handler Error Counter:
event_bus_handler_errors_total{event_type="app.installed"}
Multi-Cluster Support
Based on multicluster_manager.go and multicluster_config.go:
Configuration:
type MultiClusterConfig struct {
Clusters []ClusterConfig
}
type ClusterConfig struct {
Name string
Config *RabbitMQConfig
}
Usage:
// Publish to all clusters
multiClusterBus.PublishToAll(ctx, evt)
// Subscribe from specific cluster
multiClusterBus.SubscribeFromCluster(clusterName, eventType, handler)
Best Practices
Event Publishing
✅ DO:
- Publish events after state changes are persisted
- Use descriptive event types
- Include relevant data in event payload
- Handle publish errors gracefully (log warnings)
❌ DON'T:
- Block on event publishing
- Publish events before state changes
- Include sensitive data in event payloads (use event IDs for lookups)
Event Subscription
✅ DO:
- Make handlers idempotent (events may be delivered multiple times)
- Use pattern matching to subscribe to related events
- Handle errors gracefully (don't crash on event processing errors)
- Wait for ready signal before considering subscription active
❌ DON'T:
- Perform long-running operations in handlers (use async workers)
- Assume event order (events from different publishers may arrive out of order)
- Subscribe to
#(all events) unless necessary
Error Handling
✅ DO:
- Log errors with context (event type, event ID)
- Continue processing after non-critical errors
- Use dead letter queues for failed events
❌ DON'T:
- Swallow errors silently
- Retry indefinitely (use exponential backoff)
Code References
| Component | File | Purpose |
|---|---|---|
| RabbitMQBus | rabbitmq_provider.go | Main event bus implementation |
| ConnectionManager | connection_manager.go | Connection lifecycle management |
| Publisher | publisher.go | Event publishing |
| Subscriber | subscriber.go | Event subscription |
| Pattern Matching | pattern.go | Wildcard pattern support |
| Health Checker | health_checker.go | Connection health monitoring |
| Metrics | metrics.go | Prometheus metrics |
| Multi-Cluster | multicluster_manager.go | Multi-cluster support |
Related Topics
- Platform Architecture - Event-driven architecture
- Marketplace Feature - Event publishing examples
- Hooks & Activity - Event-driven patterns