Skip to main content

Hooks & Activity Tracking

The hooks and activity systems provide event-driven and request-response patterns for cross-application communication and workflow orchestration.

Overview

Based on pkg/v2/application/hooks/ and pkg/v2/application/activity/:

  • Hooks System: Pub-sub event-driven pattern with bidirectional gRPC streaming
  • Activity System: Request-response pattern with routing strategies and execution models
  • Execution Models: FirstMatch, AllMustSucceed, BestEffort for both systems
  • Health Monitoring: Automatic stale listener/handler cleanup via heartbeats
  • Correlation: Request/response matching with timeout management

Hooks System Architecture

Overview

The hooks system implements a publish-subscribe pattern where apps can:

  • Register listeners on named hooks
  • Trigger hooks to notify all registered listeners
  • Execute synchronously or asynchronously based on listener type

Core Components

HooksService (pkg/v2/application/hooks/hooks_service.go)

  • Main entry point for hook operations
  • Coordinates StreamManager, TriggerCorrelator, and ExecutionEngine
  • Provides On() method for bidirectional streaming listener registration
  • Provides Trigger() method to fire hooks

StreamManager (pkg/v2/application/hooks/stream_manager.go)

  • Manages bidirectional gRPC streams for all registered listeners
  • Data structure: map[hookName]map[listenerID]*HookListener
  • Thread-safe listener registration/unregistration
  • Automatic cleanup of stale listeners (default 2-minute timeout)
  • Background heartbeat monitor (30-second intervals)

ExecutionEngine (pkg/v2/application/hooks/execution_engine.go)

  • Executes hook triggers using configured execution models
  • Invokes listeners sequentially or asynchronously
  • Default timeout: 30 seconds per listener

TriggerCorrelator (pkg/v2/application/hooks/trigger_correlator.go)

  • Maps in-flight hook triggers to response channels
  • Tracks pending triggers with timeout information
  • Background timeout monitor for cleanup

Hook Execution Models

Based on pkg/v2/application/hooks/models.go:9-21:

ExecutionModelFirstMatch (lines 13-14)

// Stops after first successful listener response
// Returns immediately when one listener succeeds
// Fails if no listener succeeds

ExecutionModelAllMustSucceed (lines 16-17)

// Requires ALL listeners to succeed
// If any listener fails, entire hook fails
// Used for critical operations requiring consensus

ExecutionModelBestEffort (lines 19-20)

// Executes all listeners regardless of success/failure
// Always returns success (collects individual results)
// Used for non-critical notifications

Hook Listener Types

Async Listeners (lines 48-49, models.go)

  • Fire-and-forget pattern
  • No response expected or waited for
  • Trigger returns success immediately

Sync Listeners (default)

  • Execution blocks until response received
  • Response collected via ResponseChan
  • Timeout enforced at listener level

Hook Trigger Flow

Trigger() Request

ExecutionEngine.Execute()

SelectExecutionModel()

For each listener:
- Register PendingTrigger in correlator
- Send trigger request via stream
- If sync: wait for response on channel
- If async: fire-and-forget

Correlate responses (TriggerCorrelator)

Return TriggerHookResponse with all results

Hook Data Structures

HookListener (lines 37-62, models.go)

type HookListener struct {
ID string
HookName string
AppName string
Async bool
Stream interface{}
Data []byte
RegisteredAt time.Time
LastHeartbeat time.Time
}

TriggerHookRequest (lines 69-89, models.go)

type TriggerHookRequest struct {
HookName string
TriggeredBy string
Data []byte
ExecutionModel ExecutionModel
TimeoutMs int64
Metadata map[string]string
}

ListenerResult (lines 115-134, models.go)

type ListenerResult struct {
ListenerID string
AppName string
ResponseData []byte
Success bool
Error string
DurationMs int64
}

Hook Usage Example

Registering a Listener:

// App connects via bidirectional stream
stream := grpc.ServerStream

// Send listener registration
listenerReq := &StreamingHookListenerRequest{
MessageType: MessageTypeListenerRequest,
ListenerRequest: &ListenerRequest{
HookName: "user.created",
Async: false,
},
}

// Stream is kept open for receiving trigger requests

Triggering a Hook:

triggerReq := &TriggerHookRequest{
HookName: "user.created",
TriggeredBy: "auth-service",
Data: []byte(`{"userId":"123","email":"user@example.com"}`),
ExecutionModel: ExecutionModelBestEffort,
TimeoutMs: 5000,
}

response := hookService.Trigger(ctx, triggerReq)
// Response contains results from all listeners

Activity System Architecture

Overview

The activity system implements request-response patterns where apps can:

  • Register handlers for named activities
  • Execute activities with routing strategies (single-node or broadcast)
  • Collect responses based on execution models

Core Components

ActivityService (pkg/v2/application/activity/activity_service.go)

  • Main orchestrator for activity operations
  • Coordinates StreamManager, RequestCorrelator, and ExecutionEngine
  • Manages handler registration/unregistration
  • Default timeout: 30 seconds

StreamManager (pkg/v2/application/activity/stream_manager.go)

  • Manages bidirectional streams for activity handlers
  • Multi-map structure:
    • map[activityName]map[handlerID]*ActivityHandler
    • map[handlerID]*ActivityHandler for quick lookup
    • map[appName]map[activityName]handlerID for cleanup
  • Background heartbeat monitor with stale handler cleanup

ExecutionEngine (pkg/v2/application/activity/execution_engine.go)

  • Routes requests based on RoutingStrategy
  • Executes based on ExecutionModel
  • Supports both single-node and broadcast execution paths
  • Aggregates responses from multiple handlers

RequestCorrelator (pkg/v2/application/activity/request_correlator.go)

  • Tracks pending activity requests
  • Maps requestID to PendingRequest
  • Manages request timeouts and cleanup
  • Thread-safe operations

Routing Strategies

Based on pkg/v2/domain/activity/routing_strategy.go:

RoutingStrategySingleNode (line 13)

// Routes request to ONE handler
// Load-balanced across available handlers
// Default and most efficient
// Returns first successful response

RoutingStrategyBroadcast (line 18)

// Sends request to ALL registered handlers
// Useful for cache invalidation, state sync
// Combined with execution models for behavior control
// Aggregates responses from all handlers

Activity Execution Models

Same as hooks (pkg/v2/domain/activity/execution_model.go):

ExecutionModelFirstMatch (line 13)

  • Stops after first successful handler
  • Returns immediately on success

ExecutionModelAllMustSucceed (line 17)

  • Requires ALL handlers to succeed
  • Fails fast if any handler fails
  • Critical operations

ExecutionModelBestEffort (line 21)

  • Executes all handlers
  • Succeeds if at least one succeeds
  • Collects all results

Activity Execution Flow

ExecuteActivity() Request

Create PendingRequest with channels

Register in RequestCorrelator

ExecutionEngine.ExecuteActivity()

Route based on strategy:
- SingleNode: send to first handler, wait for response
- Broadcast: send to all handlers, collect responses

Execute based on model:
- FirstMatch: return on first success
- AllMustSucceed: wait for all, fail on any error
- BestEffort: wait for all, succeed if >=1 succeed

Correlate responses using RequestCorrelator

Return Response with results and aggregated data

Cleanup PendingRequest

Activity Data Structures

ActivityHandler (lines 7-32, handler.go)

type ActivityHandler struct {
ID string
ActivityName string
AppName string
Stream interface{}
RegisteredAt time.Time
LastHeartbeat time.Time
}

Request (domain, lines 15-27, request.go)

type Request struct {
ID string
ActivityID string
InputData []byte
RoutingStrategy RoutingStrategy
ExecutionModel ExecutionModel
TimeoutMs int64
Metadata map[string]string
RequestedBy string
Status RequestStatus
RequestedAt time.Time
CompletedAt *time.Time
}

Response (domain, lines 10-33, response.go)

type Response struct {
RequestID string
HandlerID string // For broadcast correlation
Data []byte // Aggregated data
Error string
Success bool
TotalDurationMs int64
Results []HandlerResult // Individual handler results
}

HandlerResult (lines 34-53, handler.go)

type HandlerResult struct {
HandlerID string
AppName string
Data []byte
Success bool
Error string
DurationMs int64
}

Activity Usage Example

Registering a Handler:

// App connects via bidirectional stream
stream := grpc.ServerStream

// Send handler registration
handlerReq := &StreamingActivityRequest{
MessageType: MessageTypeHandlerRequest,
HandlerRequest: &HandlerRequest{
ActivityName: "calculate.total",
},
}

// Stream is kept open for receiving activity requests

Executing an Activity (SingleNode):

activityReq := &ExecuteActivityRequest{
ActivityName: "calculate.total",
InputData: []byte(`{"items":[1,2,3]}`),
RoutingStrategy: RoutingStrategySingleNode,
ExecutionModel: ExecutionModelFirstMatch,
TimeoutMs: 3000,
}

response := activityService.ExecuteActivity(ctx, activityReq)
// Response contains data from selected handler

Executing an Activity (Broadcast):

activityReq := &ExecuteActivityRequest{
ActivityName: "cache.invalidate",
InputData: []byte(`{"key":"user:123"}`),
RoutingStrategy: RoutingStrategyBroadcast,
ExecutionModel: ExecutionModelBestEffort,
TimeoutMs: 5000,
}

response := activityService.ExecuteActivity(ctx, activityReq)
// Response contains results from ALL handlers

Lifecycle and Health Management

Hooks System

Registration:

  1. Stream opens → listener registers hook name → ID assigned
  2. Stored in StreamManager with current timestamp

Heartbeats:

  • Updated on each message received
  • Passive monitoring (no explicit heartbeat messages)

Cleanup:

  • Stale check: 2-minute inactivity → automatic removal
  • Cleanup interval: 1 minute background loop
  • Stream close → immediate cleanup

Unregistration:

  • Explicit via stream close
  • Automatic via stale detection

Activity System

Registration:

  1. Handler registers via stream → ID assigned
  2. Stored in multi-map structure

Heartbeats:

  • Updated on received messages
  • Configurable via StartHeartbeatMonitor()

Cleanup:

  • Stale detection: default 2 minutes
  • Background cleanup loop
  • Per-app cleanup when needed

Unregistration:

  • Manual via stream close
  • Automatic via stale detection

Timeout Monitoring

Hooks:

// TriggerCorrelator.StartTimeoutMonitor() - checks every interval
// Expired triggers: Send timeout error to error channel, remove from map

Activities:

// RequestCorrelator.StartTimeoutMonitor() - similar pattern
// Expired requests: Send timeout error, cleanup

Error Handling

Hooks Errors

Based on pkg/v2/application/hooks/errors.go:

ErrHookNotFound              // Hook name not registered
ErrListenerNotFound // Listener ID not found
ErrNoListeners // No listeners registered for hook
ErrListenerAlreadyRegistered // Listener already exists
ErrStreamClosed // Stream connection lost
ErrInvalidHookName // Invalid hook name format
ErrInvalidAppName // Invalid app name
ErrExecutionFailed // Hook execution failed
ErrAllListenersFailed // All listeners failed (FirstMatch model)
ErrTimeout // Listener timeout exceeded

Custom Error Types:

type HookError struct {
Code string
Message string
Details map[string]interface{}
}

type ListenerError struct {
ListenerID string
AppName string
Error error
}

Activity Errors

Based on pkg/v2/domain/activity/errors.go:

ErrActivityNotFound   // Activity name not registered
ErrHandlerNotFound // Handler ID not found
ErrNoHandlers // No handlers registered
ErrTimeout // Handler timeout exceeded
ErrRequestNotFound // Request ID not found in correlator
ErrInvalidRequest // Invalid request format
ErrStreamClosed // Stream connection lost
ErrExecutionFailed // Activity execution failed

Custom Error Types:

type ActivityError struct {
Code string
Message string
Details map[string]interface{}
}

type HandlerError struct {
HandlerID string
AppName string
Error error
}

Concurrency and Thread Safety

Synchronization Mechanisms

Hooks System:

  • StreamManager: sync.RWMutex for listener maps
  • TriggerCorrelator: sync.RWMutex for pending triggers
  • Non-blocking channel sends with select default cases

Activity System:

  • StreamManager: sync.RWMutex for handler maps
  • RequestCorrelator: sync.RWMutex for pending requests
  • Similar non-blocking patterns

Channel Patterns

Both systems use:

  • Response channels: 1-buffered (non-blocking send)
  • Error channels: 1-buffered
  • Default case in select: prevents deadlock on full channels

Request/Response Handling

Hooks Request/Response Flow

  1. Client sends StreamingHookListenerRequest with message type
  2. Service processes: ListenerRequest (register) or TriggerResponse (response)
  3. For trigger: creates StreamingHookListenerResponse with TriggerRequest
  4. Listener sends back TriggerResponse with result data
  5. Correlator matches response to waiting channel
  6. ExecutionEngine collects result and continues

Activity Request/Response Flow

  1. Client registers handler via streaming gRPC
  2. Service sends StreamingActivityResponse with ActivityRequest
  3. Handler processes and sends back ActivityResponse
  4. Correlator matches response using requestID
  5. ExecutionEngine processes based on routing/execution model
  6. Response delivered to response channel

Comparison: Hooks vs Activity

AspectHooksActivity
PatternPub-sub eventsRequest-response
Listener/HandlerPassive subscribersActive responders
TriggersNamed events firedNamed activities requested
Response ExpectedOptional (sync/async)Always expected
CorrelationTriggerCorrelatorRequestCorrelator
RoutingN/A (all registered)SingleNode / Broadcast
Use CaseEvent notificationsService requests
Multiple ResponsesYes (from listeners)Yes (from handlers)
Data AggregationLimitedExtensive (per model)

Use Cases

Hooks System Use Cases

Event Notifications:

- user.created → notify analytics, email service, audit log
- app.installed → update UI, invalidate caches
- settings.changed → propagate to all instances

Async Workflows:

- order.placed → trigger fulfillment (async)
- payment.received → generate invoice (async)

Validation Chains:

- user.beforeDelete → check dependencies (AllMustSucceed)
- config.beforeUpdate → validate across services (AllMustSucceed)

Activity System Use Cases

Service Requests:

- calculate.total → aggregate cart totals
- format.address → standardize address format
- validate.input → run validation rules

Cache Invalidation (Broadcast):

- cache.invalidate → clear key from all instances
- cache.clear → flush entire cache cluster

Health Checks (Broadcast):

- health.check → verify all instances healthy
- status.get → collect status from all nodes

Code References

ComponentFileKey Lines
Hook Execution Modelshooks/models.go9-21, 23-35
Hook Listener Structhooks/models.go37-62
Hooks Service Mainhooks/hooks_service.go16-36 (NewService)
Hooks On() Entry Pointhooks/hooks_service.go48-208
Hooks Triggerhooks/hooks_service.go210-257
Execution Engine Executehooks/execution_engine.go36-82
First Match Executionhooks/execution_engine.go84-129
All Must Succeedhooks/execution_engine.go131-178
Best Effort Executionhooks/execution_engine.go180-218
Trigger Correlatorhooks/trigger_correlator.go46-227
Activity Serviceactivity/activity_service.go157-260
Activity Execution Engineactivity/execution_engine.go32-69
Single Node Executionactivity/execution_engine.go91-188
Broadcast Executionactivity/execution_engine.go190-248
Broadcast First Matchactivity/execution_engine.go250-323
Broadcast All Must Succeedactivity/execution_engine.go325-430
Broadcast Best Effortactivity/execution_engine.go432-540
Request Correlatoractivity/request_correlator.go13-194