Skip to main content

Hooks Architecture

Hooks provide event-driven communication between applications, enabling loosely coupled integrations and workflow automation.

Overview

The hooks system allows applications to publish and subscribe to events in a type-safe, permission-controlled manner. Unlike traditional message queues, hooks support multiple execution models and provide rich metadata for event correlation.

Core Concepts

Hook Definition

A hook is a named event that applications can trigger or listen to:

Hook Name: user.created
Payload: { userId, email, timestamp }
Trigger App: auth-service
Listeners: [welcome-email, analytics, crm-sync]

Execution Models

Hooks support different execution strategies:

Code Reference: pkg/v2/application/hooks/execution_engine.go:36

EXECUTION_MODEL_UNSPECIFIED (Default)

  • Executes all listeners
  • Collects both successes and failures
  • Returns aggregate response
Use Case: Broadcast notifications where multiple subscribers need the same event

EXECUTION_MODEL_FIRST_MATCH

  • Stops after first successful listener
  • Returns that listener's response
  • Subsequent listeners not invoked
Use Case: Request/response pattern where any listener can satisfy the request

EXECUTION_MODEL_ALL_MUST_SUCCEED

  • Requires all listeners to succeed
  • Fails if any listener fails
  • Transactional-like behavior
Use Case: Critical workflows where all steps must complete

gRPC Streaming Protocol

Hooks use bidirectional streaming for real-time communication:

Code Reference: pkg/v2/application/hooks/hooks_service.go:16

Listener Registration

Applications register as listeners via Hooks.On streaming RPC:

service Hooks {
rpc On(stream HookMessage) returns (stream HookMessage);
}

Client Flow:

const stream = hooksClient.On();

// Send registration
stream.write({
type: 'REGISTER',
hookName: 'user.created',
metadata: { appName: 'welcome-email' }
});

// Receive events
stream.on('data', (message) => {
if (message.type === 'TRIGGER') {
handleEvent(message.payload);

// Send response
stream.write({
type: 'RESPONSE',
correlationId: message.correlationId,
result: { success: true }
});
}
});

Code Reference: pkg/v2/presentation/grpc/hooks_server.go:89

Authentication

Hook streams are authenticated via gRPC metadata:

Metadata Headers:
- X-App-Name: Application identifier
- X-App-Timestamp: RFC3339 timestamp (e.g., 2025-01-24T15:30:00Z)
- X-App-Signature: RSA-SHA256 signature of request

Validation:
1. Extract app certificate from database
2. Verify RSA signature matches (signed payload: METHOD\nPATH\nTIMESTAMP\nAPPNAME)
3. Check timestamp within replay window (30s)
4. Populate AuthContext for permission checks

Code Reference: pkg/v2/presentation/grpc/grpcauth/authenticator.go:85

Hook Triggering

Triggering a hook initiates event delivery to all registered listeners:

Code Reference: easy.proto/v2/protos/services.proto:29

Trigger Request

message TriggerHookRequest {
string name = 1; // Hook name
bytes payload = 2; // Event data
ExecutionModel execution_model = 3; // How to execute
int32 timeout_seconds = 4; // Max execution time
}

Trigger Flow

1. Client calls Hooks.TriggerHook(request)

2. Permission check: Can app trigger this hook?

3. Query registered listeners from stream manager

4. Create trigger correlator

5. Send event to all listeners via their streams

6. Wait for responses (with timeout)

7. Aggregate results based on execution model

8. Return TriggerHookResponse

Trigger Correlator

The correlator manages request/response matching:

Code Reference: pkg/v2/application/hooks/trigger_correlator.go

type TriggerCorrelator struct {
pending map[string]*pendingTrigger
mu sync.RWMutex
}

// Create correlation ID
correlationId := uuid.New().String()

// Register pending trigger
correlator.Register(correlationId, numListeners, timeout)

// Wait for responses
responses := correlator.Wait(correlationId)

// Cleanup
correlator.Complete(correlationId)

Stream Manager

Manages active listener streams and routes events:

Code Reference: pkg/v2/application/hooks/stream_manager.go

Stream Registration

type StreamManager struct {
streams map[string]map[string]HookStream
// hookName -> appName -> stream
}

// Register new listener
func (sm *StreamManager) Register(hookName, appName string, stream HookStream) {
sm.streams[hookName][appName] = stream
}

// Get listeners for hook
func (sm *StreamManager) GetListeners(hookName string) []HookStream {
return sm.streams[hookName]
}

Stream Health

The stream manager tracks the last activity timestamp for each listener and periodically removes stale entries. There is no dedicated heartbeat RPC; the timestamp is only refreshed when listeners register or send hook responses, so apps must regularly respond to triggers to avoid cleanup.

Execution Engine

Orchestrates hook execution across listeners:

Code Reference: pkg/v2/application/hooks/execution_engine.go:36

Serial Execution

Listeners are invoked serially. The engine walks the slice returned by streamManager.GetListeners() and invokes each listener one after another, collecting results per execution model. There is no goroutine fan-out yet, so long-running listeners delay the entire trigger.

Timeout Handling

Each execution has a deadline:

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

select {
case result := <-resultChan:
return result
case <-ctx.Done():
return timeoutError()
}

Error Handling

Errors are captured and reported:

type HookResult struct {
Success bool
Response []byte
Error string
Duration time.Duration
ListenerID string
}

Permission Enforcement

Hook operations require permissions:

Trigger Permission

Tuple: (app:backend, trigger, hook:user.created)

Check before allowing trigger

Listen Permission

Tuple: (app:notifier, listen, hook:user.created)

Check during stream registration

Code Reference: pkg/v2/infrastructure/authz/permission_checker.go:10

Permission Caching

Hook permissions are aggressively cached:

  • Check once on stream registration
  • Cache hit: < 1ms
  • Cache miss: Query OpenFGA (~10ms)

Event Payload

Hook payloads are arbitrary bytes:

message TriggerHookRequest {
string name = 1;
bytes payload = 2; // JSON, Protobuf, or custom format
}

JSON Payload Example

{
"eventType": "user.created",
"data": {
"userId": "123e4567-e89b-12d3-a456-426614174000",
"email": "user@example.com",
"createdAt": "2025-01-15T10:30:00Z"
},
"metadata": {
"source": "auth-service",
"version": "1.0"
}
}

Protobuf Payload Example

message UserCreatedEvent {
string user_id = 1;
string email = 2;
google.protobuf.Timestamp created_at = 3;
}

// Serialize to bytes
bytes payload = event.SerializeToBytes();

Hook vs Event Bus

Understanding when to use hooks vs the event bus:

Use Hooks When:

  • Synchronous response needed
  • Request/response pattern
  • Permission-controlled communication
  • Direct app-to-app communication

Use Event Bus When:

  • Asynchronous fire-and-forget
  • Platform-level events (app.installed)
  • Broadcasting to many subscribers
  • No response required

Related: Event-Driven Architecture

Monitoring and Debugging

The current implementation exposes structured logs via telemetry.Logger. Hook executions emit INFO/WARN/ERROR lines for trigger attempts, listener outcomes, and correlation failures. Metrics and tracing hooks have not been implemented yet, so external observability must rely on log aggregation.

Best Practices

For Hook Publishers

Include Correlation IDs:

{
"correlationId": "uuid",
"eventType": "user.created",
"data": { ... }
}

Use Versioned Payloads:

{
"version": "1.0",
"data": { ... }
}

Set Reasonable Timeouts:

Quick hooks: 1-2 seconds
Normal hooks: 5 seconds
Long-running: 30 seconds

For Hook Listeners

Respond Quickly:

// Process asynchronously
stream.on('data', async (message) => {
// Acknowledge immediately
stream.write({ type: 'RESPONSE', success: true });

// Process in background
processInBackground(message.payload);
});

Handle Errors Gracefully:

try {
await processHook(payload);
return { success: true };
} catch (err) {
logger.error('Hook processing failed', err);
return { success: false, error: err.message };
}

Implement Idempotency:

const processed = await isAlreadyProcessed(correlationId);
if (processed) {
return { success: true, cached: true };
}

await processHook(payload);
await markProcessed(correlationId);

Error Scenarios

Listener Offline

Problem: Listener not connected when hook triggers

Behavior:
- Hook executes only on connected listeners
- Offline listeners don't receive event
- No automatic retry

Solution:
- Implement listener reconnection
- Use event bus for guaranteed delivery

Listener Timeout

Problem: Listener doesn't respond within timeout

Behavior:
- Correlator waits up to timeout
- Returns timeout error
- Other listeners unaffected

Solution:
- Increase timeout if legitimate
- Investigate slow listener
- Consider async processing

Partial Failure (ALL_MUST_SUCCEED)

Problem: One listener fails when using ALL_MUST_SUCCEED

Behavior:
- Overall trigger fails
- Successful listeners' work may need rollback
- Trigger returns error

Solution:
- Implement compensation logic
- Consider two-phase commit
- Use default model instead

Further Reading