Activities & Background Workflows
Activities provide structured request/response communication for long-running or distributed work, enabling apps to execute background tasks across multiple handlers with flexible routing and execution strategies.
Overview
The activity system allows applications to define, register, and execute structured workflows with coordinated responses. Unlike hooks which are event-driven, activities follow a request/response pattern with explicit routing strategies and execution models that determine how work is distributed and results are aggregated.
Core Concepts
Activity Definition
An activity is a named unit of work that applications can handle:
Activity Name: generate-report
Request: { reportType, dateRange, format }
Handlers: [worker-1, worker-2, worker-3]
Routing: SingleNode (load balanced)
Execution: FirstMatch (first successful response)
Routing Strategies
Activities support different routing strategies for distributing work:
Code Reference: pkg/v2/domain/activity/routing_strategy.go:4
ROUTING_STRATEGY_SINGLE_NODE (Default)
- Routes request to a single handler instance
- Current implementation selects the first registered handler; there is no load balancing yet
- Most efficient for request/response patterns
Use Case: Simple RPC-style calls where any handler can fulfill the request
Example: get-user-profile, calculate-price, validate-data
ROUTING_STRATEGY_BROADCAST
- Routes request to all registered handlers
- Each handler processes the request independently
- Responses are aggregated based on execution model
Use Case: Distributed processing, parallel validation, multi-region queries
Example: search-all-shards, validate-across-regions, fan-out-queries
Execution Models
When using broadcast routing, execution models determine how responses are handled:
Code Reference: pkg/v2/application/activity/execution_engine.go:12
EXECUTION_MODEL_FIRST_MATCH
- Returns on first successful handler response
- Subsequent handlers are ignored
- Fastest response wins
Use Case: Distributed cache lookup where any cache hit is sufficient
EXECUTION_MODEL_ALL_MUST_SUCCEED
- Requires all handlers to succeed
- Fails if any handler fails
- Transactional-like behavior
Use Case: Multi-step validation where all steps must pass
EXECUTION_MODEL_BEST_EFFORT
- Waits for all handlers
- Aggregates all results (success and failure)
- Succeeds if at least one handler succeeds
Use Case: Distributed queries where partial results are acceptable
gRPC Streaming Protocol
Activities use bidirectional streaming for registration and communication:
Code Reference: pkg/v2/presentation/grpc/activity_server.go:44
Handler Registration
Applications register as activity handlers via Activity.RegisterActivity streaming RPC:
service Activity {
rpc RegisterActivity(stream StreamingActivityRequest) returns (stream StreamingActivityResponse);
rpc RequestActivity(ActivityRequest) returns (ActivityResponse);
}
Client Flow:
const stream = activityClient.RegisterActivity();
// Send registration
stream.write({
type: 'REGISTER',
registerRequest: {
name: 'generate-report'
}
});
// Receive activity requests
stream.on('data', (message) => {
if (message.activityRequest) {
const result = await handleActivity(message.activityRequest);
// Send response
stream.write({
id: message.id, // Correlation ID
activityResponse: {
requestId: message.activityRequest.requestId,
success: true,
activityResult: { data: result }
}
});
}
});
Code Reference: easy.proto/v2/protos/services.proto:267
Authentication
Activity streams are authenticated via gRPC metadata:
Metadata Headers:
- X-App-Name: Application identifier
- X-App-Timestamp: RFC3339 timestamp (e.g., 2025-01-24T15:30:00Z)
- X-App-Signature: RSA-SHA256 signature of request
Validation:
1. Extract app certificate from database
2. Verify RSA signature matches (signed payload: METHOD\nPATH\nTIMESTAMP\nAPPNAME)
3. Check timestamp within replay window (30s)
4. Populate AuthContext for permission checks
Only apps (not users) can register as activity handlers.
Activity Execution
Requesting an activity initiates execution across registered handlers:
Code Reference: pkg/v2/application/activity/activity_service.go:12
Request Structure
message ActivityRequest {
string name = 1; // Activity name
bytes data = 2; // Request payload (single blob)
string request_id = 3; // Correlation ID
RoutingStrategy routing_strategy = 4;
ExecutionModel execution_model = 5;
int64 timeout_ms = 6; // Max execution time
map<string, string> metadata = 7; // Additional context
}
Execution Flow
1. Client calls Activity.RequestActivity(request)
↓
2. Permission check: Can requester execute this activity?
↓
3. Query registered handlers from stream manager
↓
4. Create pending request for correlation
↓
5. Route request based on routing strategy
↓
6. Wait for response(s) with timeout
↓
7. Aggregate results based on execution model
↓
8. Return ActivityResponse
Single Node Execution
For single-node routing:
// Select one handler (load balanced)
selectedHandler := handlers[0]
// Send request to handler via stream
streamManager.SendRequest(handlerID, activityRequest)
// Wait for single response or timeout
select {
case response := <-responseChan:
return buildResponse(response)
case <-timeoutTimer.C:
return ErrTimeout
}
Code Reference: pkg/v2/application/activity/execution_engine.go:92
Broadcast Execution
For broadcast routing with first-match model:
// Send to all handlers
for _, handler := range handlers {
streamManager.SendRequest(handler.ID, activityRequest)
}
// Wait for first successful response
for {
select {
case response := <-responseChan:
if response.Success {
return response // First match wins
}
case <-timeoutTimer.C:
return ErrTimeout
}
}
Code Reference: pkg/v2/application/activity/execution_engine.go:250
For all-must-succeed model, all responses are collected and failure of any handler causes overall failure.
For best-effort model, all responses are collected and success if at least one handler succeeds.
Stream Manager
Manages active handler streams and routes requests:
Code Reference: pkg/v2/application/activity/stream_manager.go
Handler Registration
type StreamManager struct {
// activityName -> handlerID -> handler
handlers map[string]map[string]*ActivityHandler
// handlerID -> handler (quick lookup)
handlersByID map[string]*ActivityHandler
// appName -> activityName -> handlerID
appHandlers map[string]map[string]string
}
// Register new handler
func (sm *StreamManager) RegisterHandler(
stream interface{},
appName string,
activityName string,
) (string, error) {
handlerID := uuid.New().String()
handler := &ActivityHandler{
ID: handlerID,
ActivityName: activityName,
AppName: appName,
Stream: stream,
RegisteredAt: time.Now(),
LastHeartbeat: time.Now(),
}
sm.handlers[activityName][handlerID] = handler
return handlerID, nil
}
// Get all handlers for an activity
func (sm *StreamManager) GetHandlers(activityName string) []*ActivityHandler {
return sm.handlers[activityName]
}
Heartbeat Monitoring
The stream manager exposes a cleanup loop that drops handlers whose
LastHeartbeat timestamp is older than the configured timeout. Timestamps are refreshed when handlers send responses on their streams.
ActivityService.UpdateHandlerHeartbeat is server-internal only (pkg/v2/application/activity/activity_service.go:88-95) and no gRPC RPC endpoint exists for clients to call this method (easy.proto/v2/protos/services.proto:265-313). Handler liveness is currently tracked via stream activity (responses) rather than dedicated heartbeat messages.
StartHeartbeatMonitor must be invoked manually by the host; it is not started automatically in server/services.go.
Request Correlator
Manages request/response correlation for in-flight activities:
Code Reference: pkg/v2/application/activity/request_correlator.go
Correlation Management
type RequestCorrelator struct {
// requestID -> pending request
pendingRequests map[string]*PendingRequest
}
type PendingRequest struct {
RequestID string
ActivityName string
RequestedBy string
RoutingStrategy RoutingStrategy
ExecutionModel ExecutionModel
ResponseChan chan *Response
ErrorChan chan error
Timeout time.Time
ExpectedHandlerCount int // For broadcast
CreatedAt time.Time
}
// Register pending request before execution
correlator.AddPendingRequest(pendingReq)
// Handler sends response
response := <-responseChan
// Cleanup after completion
correlator.RemovePendingRequest(requestID)
Response Correlation
When a handler sends a response:
1. Extract request ID from response
2. Look up pending request in correlator
3. Send response to pending request's channel
4. Execution engine receives and processes response
This decouples response handling from stream management.
Permission Enforcement
Activity operations require permissions:
Register Permission
Tuple: (app:worker, register, activity:generate-report)
Check during handler registration
Execute Permission
Tuple: (app:backend, request, activity:generate-report)
Check before allowing execution
Code Reference: pkg/v2/infrastructure/authz/permission_checker.go:10
Permission Caching
Activity permissions are cached:
- Check once on stream registration
- Cache hit: < 1ms
- Cache miss: Query OpenFGA (~10ms)
Activity Payload
Activity payloads are arbitrary bytes:
message ActivityRequest {
string name = 2;
repeated bytes data = 3; // JSON, Protobuf, or custom format
}
JSON Payload Example
{
"activityType": "generate-report",
"parameters": {
"reportType": "sales",
"dateRange": {
"start": "2025-01-01",
"end": "2025-01-31"
},
"format": "pdf"
},
"metadata": {
"requestedBy": "user@example.com",
"priority": "high"
}
}
Protobuf Payload Example
message ReportRequest {
string report_type = 1;
DateRange date_range = 2;
string format = 3;
}
// Serialize to bytes
bytes payload = request.SerializeToBytes();
Activity vs Hook vs Event Bus
Understanding when to use each communication pattern:
Use Activities When:
- Structured request/response needed
- Long-running work with results
- Load balancing across handlers required
- Timeout and retry logic important
Use Hooks When:
- Synchronous event notification
- Multiple subscribers to same event
- Permission-controlled communication
- Simple trigger/response pattern
Use Event Bus When:
- Asynchronous fire-and-forget
- Platform-level events
- No response required
- Broadcasting to many subscribers
Related: Hooks Architecture, Event-Driven Architecture
Monitoring and Debugging
Activity Metrics
- Request count per activity
- Average execution time
- Timeout rate
- Error rate per handler
- Active handler count per activity
- Pending request count
Distributed Tracing
Activity requests are traced end-to-end:
Trace: activity.request
├─ Span: permission.check
├─ Span: correlator.register
├─ Span: routing.select_handler(s)
├─ Span: stream.send_request
├─ Span: wait.response (with timeout)
└─ Span: aggregate.results
Logging
Comprehensive logging for debugging:
INFO: Activity requested: generate-report by app:backend
DEBUG: Found 3 handlers: [worker-1, worker-2, worker-3]
DEBUG: Routing strategy: SINGLE_NODE (selected: worker-1)
INFO: Handler worker-1 responded in 1250ms: success
INFO: Activity completed: success
Error Scenarios
No Handlers Available
Problem: Activity not registered or all handlers offline
Behavior:
- RequestActivity returns ErrNoHandlers
- No execution attempted
Solution:
- Ensure at least one handler is registered
- Check handler connectivity
- Implement fallback logic
Handler Timeout
Problem: Handler doesn't respond within timeout
Behavior:
- Correlator waits up to timeout
- Returns ErrTimeout
- Other handlers unaffected (broadcast)
Solution:
- Increase timeout if legitimate
- Investigate slow handler
- Consider async processing with callback
Partial Failure (ALL_MUST_SUCCEED)
Problem: One handler fails when using ALL_MUST_SUCCEED
Behavior:
- Overall request fails
- All results are included in response
- Client sees aggregated error
Solution:
- Implement compensation logic
- Use BEST_EFFORT instead
- Review handler implementation
Handler Disconnection
Problem: Handler stream disconnects during request
Behavior:
- Handler is auto-unregistered
- Pending requests timeout
- Subsequent requests route to remaining handlers
Solution:
- Implement handler reconnection
- Use multiple handlers for redundancy
- Monitor handler health
Best Practices
For Activity Handlers
Respond Quickly:
stream.on('data', async (message) => {
if (message.activityRequest) {
try {
const result = await processActivity(message.activityRequest);
// Send response promptly
stream.write({
id: message.id,
activityResponse: {
success: true,
activityResult: { data: result }
}
});
} catch (err) {
// Always send response, even on error
stream.write({
id: message.id,
activityResponse: {
success: false,
errorStatus: {
code: grpc.status.INTERNAL,
message: err.message
}
}
});
}
}
});
Handler Liveness:
Handlers stay alive as long as they actively process requests and send responses. The LastHeartbeat timestamp is updated automatically when responses are sent on the stream. There is no separate heartbeat RPC available for client applications.
Handle Idempotency:
const processed = await isAlreadyProcessed(requestID);
if (processed) {
return getCachedResult(requestID);
}
const result = await processActivity(request);
await cacheResult(requestID, result);
return result;
For Activity Requesters
Set Reasonable Timeouts:
Quick activities: 1-5 seconds
Normal activities: 10-30 seconds
Long-running: 60-300 seconds
Choose Appropriate Strategy:
// Simple request/response - use single node
const response = await activityClient.RequestActivity({
name: 'get-user',
routingStrategy: RoutingStrategy.SINGLE_NODE,
executionModel: ExecutionModel.FIRST_MATCH,
timeoutMs: 5000
});
// Parallel processing - use broadcast
const response = await activityClient.RequestActivity({
name: 'search-all-shards',
routingStrategy: RoutingStrategy.BROADCAST,
executionModel: ExecutionModel.BEST_EFFORT,
timeoutMs: 30000
});
Handle Errors Gracefully:
try {
const response = await activityClient.RequestActivity(request);
if (!response.success) {
logger.error('Activity failed', response.error);
// Implement fallback or retry logic
}
} catch (err) {
if (err.code === 'DEADLINE_EXCEEDED') {
// Timeout - consider retry or alternative approach
}
}
Related Concepts
- Hooks Architecture - Event-driven communication pattern
- Event-Driven Architecture - Async event bus
- Permission Model - Activity permission enforcement
- gRPC Services - Streaming protocol details
- Health Monitoring & Recovery - Handler health monitoring
- Developer Platform & SDK - SDK activity helpers
Further Reading
- Getting Started: Building Apps - Implementing activities in your app
- API Reference: gRPC Services - Complete RPC documentation