Hooks & Activity Tracking
The hooks and activity systems provide event-driven and request-response patterns for cross-application communication and workflow orchestration.
Overview
Based on pkg/v2/application/hooks/ and pkg/v2/application/activity/:
- Hooks System: Pub-sub event-driven pattern with bidirectional gRPC streaming
- Activity System: Request-response pattern with routing strategies and execution models
- Execution Models: FirstMatch, AllMustSucceed, BestEffort for both systems
- Health Monitoring: Automatic stale listener/handler cleanup via heartbeats
- Correlation: Request/response matching with timeout management
Hooks System Architecture
Overview
The hooks system implements a publish-subscribe pattern where apps can:
- Register listeners on named hooks
- Trigger hooks to notify all registered listeners
- Execute synchronously or asynchronously based on listener type
Core Components
HooksService (pkg/v2/application/hooks/hooks_service.go)
- Main entry point for hook operations
- Coordinates StreamManager, TriggerCorrelator, and ExecutionEngine
- Provides
On()method for bidirectional streaming listener registration - Provides
Trigger()method to fire hooks
StreamManager (pkg/v2/application/hooks/stream_manager.go)
- Manages bidirectional gRPC streams for all registered listeners
- Data structure:
map[hookName]map[listenerID]*HookListener - Thread-safe listener registration/unregistration
- Automatic cleanup of stale listeners (default 2-minute timeout)
- Background heartbeat monitor (30-second intervals)
ExecutionEngine (pkg/v2/application/hooks/execution_engine.go)
- Executes hook triggers using configured execution models
- Invokes listeners sequentially or asynchronously
- Default timeout: 30 seconds per listener
TriggerCorrelator (pkg/v2/application/hooks/trigger_correlator.go)
- Maps in-flight hook triggers to response channels
- Tracks pending triggers with timeout information
- Background timeout monitor for cleanup
Hook Execution Models
Based on pkg/v2/application/hooks/models.go:9-21:
ExecutionModelFirstMatch (lines 13-14)
// Stops after first successful listener response
// Returns immediately when one listener succeeds
// Fails if no listener succeeds
ExecutionModelAllMustSucceed (lines 16-17)
// Requires ALL listeners to succeed
// If any listener fails, entire hook fails
// Used for critical operations requiring consensus
ExecutionModelBestEffort (lines 19-20)
// Executes all listeners regardless of success/failure
// Always returns success (collects individual results)
// Used for non-critical notifications
Hook Listener Types
Async Listeners (lines 48-49, models.go)
- Fire-and-forget pattern
- No response expected or waited for
- Trigger returns success immediately
Sync Listeners (default)
- Execution blocks until response received
- Response collected via
ResponseChan - Timeout enforced at listener level
Hook Trigger Flow
Trigger() Request
↓
ExecutionEngine.Execute()
↓
SelectExecutionModel()
↓
For each listener:
- Register PendingTrigger in correlator
- Send trigger request via stream
- If sync: wait for response on channel
- If async: fire-and-forget
↓
Correlate responses (TriggerCorrelator)
↓
Return TriggerHookResponse with all results
Hook Data Structures
HookListener (lines 37-62, models.go)
type HookListener struct {
ID string
HookName string
AppName string
Async bool
Stream interface{}
Data []byte
RegisteredAt time.Time
LastHeartbeat time.Time
}
TriggerHookRequest (lines 69-89, models.go)
type TriggerHookRequest struct {
HookName string
TriggeredBy string
Data []byte
ExecutionModel ExecutionModel
TimeoutMs int64
Metadata map[string]string
}
ListenerResult (lines 115-134, models.go)
type ListenerResult struct {
ListenerID string
AppName string
ResponseData []byte
Success bool
Error string
DurationMs int64
}
Hook Usage Example
Registering a Listener:
// App connects via bidirectional stream
stream := grpc.ServerStream
// Send listener registration
listenerReq := &StreamingHookListenerRequest{
MessageType: MessageTypeListenerRequest,
ListenerRequest: &ListenerRequest{
HookName: "user.created",
Async: false,
},
}
// Stream is kept open for receiving trigger requests
Triggering a Hook:
triggerReq := &TriggerHookRequest{
HookName: "user.created",
TriggeredBy: "auth-service",
Data: []byte(`{"userId":"123","email":"user@example.com"}`),
ExecutionModel: ExecutionModelBestEffort,
TimeoutMs: 5000,
}
response := hookService.Trigger(ctx, triggerReq)
// Response contains results from all listeners
Activity System Architecture
Overview
The activity system implements request-response patterns where apps can:
- Register handlers for named activities
- Execute activities with routing strategies (single-node or broadcast)
- Collect responses based on execution models
Core Components
ActivityService (pkg/v2/application/activity/activity_service.go)
- Main orchestrator for activity operations
- Coordinates StreamManager, RequestCorrelator, and ExecutionEngine
- Manages handler registration/unregistration
- Default timeout: 30 seconds
StreamManager (pkg/v2/application/activity/stream_manager.go)
- Manages bidirectional streams for activity handlers
- Multi-map structure:
map[activityName]map[handlerID]*ActivityHandlermap[handlerID]*ActivityHandlerfor quick lookupmap[appName]map[activityName]handlerIDfor cleanup
- Background heartbeat monitor with stale handler cleanup
ExecutionEngine (pkg/v2/application/activity/execution_engine.go)
- Routes requests based on RoutingStrategy
- Executes based on ExecutionModel
- Supports both single-node and broadcast execution paths
- Aggregates responses from multiple handlers
RequestCorrelator (pkg/v2/application/activity/request_correlator.go)
- Tracks pending activity requests
- Maps requestID to PendingRequest
- Manages request timeouts and cleanup
- Thread-safe operations
Routing Strategies
Based on pkg/v2/domain/activity/routing_strategy.go:
RoutingStrategySingleNode (line 13)
// Routes request to ONE handler
// Load-balanced across available handlers
// Default and most efficient
// Returns first successful response
RoutingStrategyBroadcast (line 18)
// Sends request to ALL registered handlers
// Useful for cache invalidation, state sync
// Combined with execution models for behavior control
// Aggregates responses from all handlers
Activity Execution Models
Same as hooks (pkg/v2/domain/activity/execution_model.go):
ExecutionModelFirstMatch (line 13)
- Stops after first successful handler
- Returns immediately on success
ExecutionModelAllMustSucceed (line 17)
- Requires ALL handlers to succeed
- Fails fast if any handler fails
- Critical operations
ExecutionModelBestEffort (line 21)
- Executes all handlers
- Succeeds if at least one succeeds
- Collects all results
Activity Execution Flow
ExecuteActivity() Request
↓
Create PendingRequest with channels
↓
Register in RequestCorrelator
↓
ExecutionEngine.ExecuteActivity()
↓
Route based on strategy:
- SingleNode: send to first handler, wait for response
- Broadcast: send to all handlers, collect responses
↓
Execute based on model:
- FirstMatch: return on first success
- AllMustSucceed: wait for all, fail on any error
- BestEffort: wait for all, succeed if >=1 succeed
↓
Correlate responses using RequestCorrelator
↓
Return Response with results and aggregated data
↓
Cleanup PendingRequest
Activity Data Structures
ActivityHandler (lines 7-32, handler.go)
type ActivityHandler struct {
ID string
ActivityName string
AppName string
Stream interface{}
RegisteredAt time.Time
LastHeartbeat time.Time
}
Request (domain, lines 15-27, request.go)
type Request struct {
ID string
ActivityID string
InputData []byte
RoutingStrategy RoutingStrategy
ExecutionModel ExecutionModel
TimeoutMs int64
Metadata map[string]string
RequestedBy string
Status RequestStatus
RequestedAt time.Time
CompletedAt *time.Time
}
Response (domain, lines 10-33, response.go)
type Response struct {
RequestID string
HandlerID string // For broadcast correlation
Data []byte // Aggregated data
Error string
Success bool
TotalDurationMs int64
Results []HandlerResult // Individual handler results
}
HandlerResult (lines 34-53, handler.go)
type HandlerResult struct {
HandlerID string
AppName string
Data []byte
Success bool
Error string
DurationMs int64
}
Activity Usage Example
Registering a Handler:
// App connects via bidirectional stream
stream := grpc.ServerStream
// Send handler registration
handlerReq := &StreamingActivityRequest{
MessageType: MessageTypeHandlerRequest,
HandlerRequest: &HandlerRequest{
ActivityName: "calculate.total",
},
}
// Stream is kept open for receiving activity requests
Executing an Activity (SingleNode):
activityReq := &ExecuteActivityRequest{
ActivityName: "calculate.total",
InputData: []byte(`{"items":[1,2,3]}`),
RoutingStrategy: RoutingStrategySingleNode,
ExecutionModel: ExecutionModelFirstMatch,
TimeoutMs: 3000,
}
response := activityService.ExecuteActivity(ctx, activityReq)
// Response contains data from selected handler
Executing an Activity (Broadcast):
activityReq := &ExecuteActivityRequest{
ActivityName: "cache.invalidate",
InputData: []byte(`{"key":"user:123"}`),
RoutingStrategy: RoutingStrategyBroadcast,
ExecutionModel: ExecutionModelBestEffort,
TimeoutMs: 5000,
}
response := activityService.ExecuteActivity(ctx, activityReq)
// Response contains results from ALL handlers
Lifecycle and Health Management
Hooks System
Registration:
- Stream opens → listener registers hook name → ID assigned
- Stored in StreamManager with current timestamp
Heartbeats:
- Updated on each message received
- Passive monitoring (no explicit heartbeat messages)
Cleanup:
- Stale check: 2-minute inactivity → automatic removal
- Cleanup interval: 1 minute background loop
- Stream close → immediate cleanup
Unregistration:
- Explicit via stream close
- Automatic via stale detection
Activity System
Registration:
- Handler registers via stream → ID assigned
- Stored in multi-map structure
Heartbeats:
- Updated on received messages
- Configurable via
StartHeartbeatMonitor()
Cleanup:
- Stale detection: default 2 minutes
- Background cleanup loop
- Per-app cleanup when needed
Unregistration:
- Manual via stream close
- Automatic via stale detection
Timeout Monitoring
Hooks:
// TriggerCorrelator.StartTimeoutMonitor() - checks every interval
// Expired triggers: Send timeout error to error channel, remove from map
Activities:
// RequestCorrelator.StartTimeoutMonitor() - similar pattern
// Expired requests: Send timeout error, cleanup
Error Handling
Hooks Errors
Based on pkg/v2/application/hooks/errors.go:
ErrHookNotFound // Hook name not registered
ErrListenerNotFound // Listener ID not found
ErrNoListeners // No listeners registered for hook
ErrListenerAlreadyRegistered // Listener already exists
ErrStreamClosed // Stream connection lost
ErrInvalidHookName // Invalid hook name format
ErrInvalidAppName // Invalid app name
ErrExecutionFailed // Hook execution failed
ErrAllListenersFailed // All listeners failed (FirstMatch model)
ErrTimeout // Listener timeout exceeded
Custom Error Types:
type HookError struct {
Code string
Message string
Details map[string]interface{}
}
type ListenerError struct {
ListenerID string
AppName string
Error error
}
Activity Errors
Based on pkg/v2/domain/activity/errors.go:
ErrActivityNotFound // Activity name not registered
ErrHandlerNotFound // Handler ID not found
ErrNoHandlers // No handlers registered
ErrTimeout // Handler timeout exceeded
ErrRequestNotFound // Request ID not found in correlator
ErrInvalidRequest // Invalid request format
ErrStreamClosed // Stream connection lost
ErrExecutionFailed // Activity execution failed
Custom Error Types:
type ActivityError struct {
Code string
Message string
Details map[string]interface{}
}
type HandlerError struct {
HandlerID string
AppName string
Error error
}
Concurrency and Thread Safety
Synchronization Mechanisms
Hooks System:
StreamManager:sync.RWMutexfor listener mapsTriggerCorrelator:sync.RWMutexfor pending triggers- Non-blocking channel sends with select default cases
Activity System:
StreamManager:sync.RWMutexfor handler mapsRequestCorrelator:sync.RWMutexfor pending requests- Similar non-blocking patterns
Channel Patterns
Both systems use:
- Response channels: 1-buffered (non-blocking send)
- Error channels: 1-buffered
- Default case in select: prevents deadlock on full channels
Request/Response Handling
Hooks Request/Response Flow
- Client sends
StreamingHookListenerRequestwith message type - Service processes:
ListenerRequest(register) orTriggerResponse(response) - For trigger: creates
StreamingHookListenerResponsewithTriggerRequest - Listener sends back
TriggerResponsewith result data - Correlator matches response to waiting channel
- ExecutionEngine collects result and continues
Activity Request/Response Flow
- Client registers handler via streaming gRPC
- Service sends
StreamingActivityResponsewithActivityRequest - Handler processes and sends back
ActivityResponse - Correlator matches response using requestID
- ExecutionEngine processes based on routing/execution model
- Response delivered to response channel
Comparison: Hooks vs Activity
| Aspect | Hooks | Activity |
|---|---|---|
| Pattern | Pub-sub events | Request-response |
| Listener/Handler | Passive subscribers | Active responders |
| Triggers | Named events fired | Named activities requested |
| Response Expected | Optional (sync/async) | Always expected |
| Correlation | TriggerCorrelator | RequestCorrelator |
| Routing | N/A (all registered) | SingleNode / Broadcast |
| Use Case | Event notifications | Service requests |
| Multiple Responses | Yes (from listeners) | Yes (from handlers) |
| Data Aggregation | Limited | Extensive (per model) |
Use Cases
Hooks System Use Cases
Event Notifications:
- user.created → notify analytics, email service, audit log
- app.installed → update UI, invalidate caches
- settings.changed → propagate to all instances
Async Workflows:
- order.placed → trigger fulfillment (async)
- payment.received → generate invoice (async)
Validation Chains:
- user.beforeDelete → check dependencies (AllMustSucceed)
- config.beforeUpdate → validate across services (AllMustSucceed)
Activity System Use Cases
Service Requests:
- calculate.total → aggregate cart totals
- format.address → standardize address format
- validate.input → run validation rules
Cache Invalidation (Broadcast):
- cache.invalidate → clear key from all instances
- cache.clear → flush entire cache cluster
Health Checks (Broadcast):
- health.check → verify all instances healthy
- status.get → collect status from all nodes
Code References
| Component | File | Key Lines |
|---|---|---|
| Hook Execution Models | hooks/models.go | 9-21, 23-35 |
| Hook Listener Struct | hooks/models.go | 37-62 |
| Hooks Service Main | hooks/hooks_service.go | 16-36 (NewService) |
| Hooks On() Entry Point | hooks/hooks_service.go | 48-208 |
| Hooks Trigger | hooks/hooks_service.go | 210-257 |
| Execution Engine Execute | hooks/execution_engine.go | 36-82 |
| First Match Execution | hooks/execution_engine.go | 84-129 |
| All Must Succeed | hooks/execution_engine.go | 131-178 |
| Best Effort Execution | hooks/execution_engine.go | 180-218 |
| Trigger Correlator | hooks/trigger_correlator.go | 46-227 |
| Activity Service | activity/activity_service.go | 157-260 |
| Activity Execution Engine | activity/execution_engine.go | 32-69 |
| Single Node Execution | activity/execution_engine.go | 91-188 |
| Broadcast Execution | activity/execution_engine.go | 190-248 |
| Broadcast First Match | activity/execution_engine.go | 250-323 |
| Broadcast All Must Succeed | activity/execution_engine.go | 325-430 |
| Broadcast Best Effort | activity/execution_engine.go | 432-540 |
| Request Correlator | activity/request_correlator.go | 13-194 |
Related Topics
- Event-Driven Architecture - Event patterns and pub/sub
- Platform Architecture - Application layer services
- Node.js SDK Hooks - SDK hook implementation
- gRPC Server - Streaming implementation