Skip to main content

Activities & Background Workflows

Activities provide structured request/response communication for long-running or distributed work, enabling apps to execute background tasks across multiple handlers with flexible routing and execution strategies.

Overview

The activity system allows applications to define, register, and execute structured workflows with coordinated responses. Unlike hooks which are event-driven, activities follow a request/response pattern with explicit routing strategies and execution models that determine how work is distributed and results are aggregated.

Core Concepts

Activity Definition

An activity is a named unit of work that applications can handle:

Activity Name: generate-report
Request: { reportType, dateRange, format }
Handlers: [worker-1, worker-2, worker-3]
Routing: SingleNode (load balanced)
Execution: FirstMatch (first successful response)

Routing Strategies

Activities support different routing strategies for distributing work:

Code Reference: pkg/v2/domain/activity/routing_strategy.go:4

ROUTING_STRATEGY_SINGLE_NODE (Default)

  • Routes request to a single handler instance
  • Current implementation selects the first registered handler; there is no load balancing yet
  • Most efficient for request/response patterns
Use Case: Simple RPC-style calls where any handler can fulfill the request
Example: get-user-profile, calculate-price, validate-data

ROUTING_STRATEGY_BROADCAST

  • Routes request to all registered handlers
  • Each handler processes the request independently
  • Responses are aggregated based on execution model
Use Case: Distributed processing, parallel validation, multi-region queries
Example: search-all-shards, validate-across-regions, fan-out-queries

Execution Models

When using broadcast routing, execution models determine how responses are handled:

Code Reference: pkg/v2/application/activity/execution_engine.go:12

EXECUTION_MODEL_FIRST_MATCH

  • Returns on first successful handler response
  • Subsequent handlers are ignored
  • Fastest response wins
Use Case: Distributed cache lookup where any cache hit is sufficient

EXECUTION_MODEL_ALL_MUST_SUCCEED

  • Requires all handlers to succeed
  • Fails if any handler fails
  • Transactional-like behavior
Use Case: Multi-step validation where all steps must pass

EXECUTION_MODEL_BEST_EFFORT

  • Waits for all handlers
  • Aggregates all results (success and failure)
  • Succeeds if at least one handler succeeds
Use Case: Distributed queries where partial results are acceptable

gRPC Streaming Protocol

Activities use bidirectional streaming for registration and communication:

Code Reference: pkg/v2/presentation/grpc/activity_server.go:44

Handler Registration

Applications register as activity handlers via Activity.RegisterActivity streaming RPC:

service Activity {
rpc RegisterActivity(stream StreamingActivityRequest) returns (stream StreamingActivityResponse);
rpc RequestActivity(ActivityRequest) returns (ActivityResponse);
}

Client Flow:

const stream = activityClient.RegisterActivity();

// Send registration
stream.write({
type: 'REGISTER',
registerRequest: {
name: 'generate-report'
}
});

// Receive activity requests
stream.on('data', (message) => {
if (message.activityRequest) {
const result = await handleActivity(message.activityRequest);

// Send response
stream.write({
id: message.id, // Correlation ID
activityResponse: {
requestId: message.activityRequest.requestId,
success: true,
activityResult: { data: result }
}
});
}
});

Code Reference: easy.proto/v2/protos/services.proto:267

Authentication

Activity streams are authenticated via gRPC metadata:

Metadata Headers:
- X-App-Name: Application identifier
- X-App-Timestamp: RFC3339 timestamp (e.g., 2025-01-24T15:30:00Z)
- X-App-Signature: RSA-SHA256 signature of request

Validation:
1. Extract app certificate from database
2. Verify RSA signature matches (signed payload: METHOD\nPATH\nTIMESTAMP\nAPPNAME)
3. Check timestamp within replay window (30s)
4. Populate AuthContext for permission checks

Only apps (not users) can register as activity handlers.

Activity Execution

Requesting an activity initiates execution across registered handlers:

Code Reference: pkg/v2/application/activity/activity_service.go:12

Request Structure

message ActivityRequest {
string name = 1; // Activity name
bytes data = 2; // Request payload (single blob)
string request_id = 3; // Correlation ID
RoutingStrategy routing_strategy = 4;
ExecutionModel execution_model = 5;
int64 timeout_ms = 6; // Max execution time
map<string, string> metadata = 7; // Additional context
}

Execution Flow

1. Client calls Activity.RequestActivity(request)

2. Permission check: Can requester execute this activity?

3. Query registered handlers from stream manager

4. Create pending request for correlation

5. Route request based on routing strategy

6. Wait for response(s) with timeout

7. Aggregate results based on execution model

8. Return ActivityResponse

Single Node Execution

For single-node routing:

// Select one handler (load balanced)
selectedHandler := handlers[0]

// Send request to handler via stream
streamManager.SendRequest(handlerID, activityRequest)

// Wait for single response or timeout
select {
case response := <-responseChan:
return buildResponse(response)
case <-timeoutTimer.C:
return ErrTimeout
}

Code Reference: pkg/v2/application/activity/execution_engine.go:92

Broadcast Execution

For broadcast routing with first-match model:

// Send to all handlers
for _, handler := range handlers {
streamManager.SendRequest(handler.ID, activityRequest)
}

// Wait for first successful response
for {
select {
case response := <-responseChan:
if response.Success {
return response // First match wins
}
case <-timeoutTimer.C:
return ErrTimeout
}
}

Code Reference: pkg/v2/application/activity/execution_engine.go:250

For all-must-succeed model, all responses are collected and failure of any handler causes overall failure.

For best-effort model, all responses are collected and success if at least one handler succeeds.

Stream Manager

Manages active handler streams and routes requests:

Code Reference: pkg/v2/application/activity/stream_manager.go

Handler Registration

type StreamManager struct {
// activityName -> handlerID -> handler
handlers map[string]map[string]*ActivityHandler

// handlerID -> handler (quick lookup)
handlersByID map[string]*ActivityHandler

// appName -> activityName -> handlerID
appHandlers map[string]map[string]string
}

// Register new handler
func (sm *StreamManager) RegisterHandler(
stream interface{},
appName string,
activityName string,
) (string, error) {
handlerID := uuid.New().String()
handler := &ActivityHandler{
ID: handlerID,
ActivityName: activityName,
AppName: appName,
Stream: stream,
RegisteredAt: time.Now(),
LastHeartbeat: time.Now(),
}
sm.handlers[activityName][handlerID] = handler
return handlerID, nil
}

// Get all handlers for an activity
func (sm *StreamManager) GetHandlers(activityName string) []*ActivityHandler {
return sm.handlers[activityName]
}

Heartbeat Monitoring

The stream manager exposes a cleanup loop that drops handlers whose LastHeartbeat timestamp is older than the configured timeout. Timestamps are refreshed when handlers send responses on their streams.

Implementation Status

ActivityService.UpdateHandlerHeartbeat is server-internal only (pkg/v2/application/activity/activity_service.go:88-95) and no gRPC RPC endpoint exists for clients to call this method (easy.proto/v2/protos/services.proto:265-313). Handler liveness is currently tracked via stream activity (responses) rather than dedicated heartbeat messages.

StartHeartbeatMonitor must be invoked manually by the host; it is not started automatically in server/services.go.

Request Correlator

Manages request/response correlation for in-flight activities:

Code Reference: pkg/v2/application/activity/request_correlator.go

Correlation Management

type RequestCorrelator struct {
// requestID -> pending request
pendingRequests map[string]*PendingRequest
}

type PendingRequest struct {
RequestID string
ActivityName string
RequestedBy string
RoutingStrategy RoutingStrategy
ExecutionModel ExecutionModel
ResponseChan chan *Response
ErrorChan chan error
Timeout time.Time
ExpectedHandlerCount int // For broadcast
CreatedAt time.Time
}

// Register pending request before execution
correlator.AddPendingRequest(pendingReq)

// Handler sends response
response := <-responseChan

// Cleanup after completion
correlator.RemovePendingRequest(requestID)

Response Correlation

When a handler sends a response:

1. Extract request ID from response
2. Look up pending request in correlator
3. Send response to pending request's channel
4. Execution engine receives and processes response

This decouples response handling from stream management.

Permission Enforcement

Activity operations require permissions:

Register Permission

Tuple: (app:worker, register, activity:generate-report)

Check during handler registration

Execute Permission

Tuple: (app:backend, request, activity:generate-report)

Check before allowing execution

Code Reference: pkg/v2/infrastructure/authz/permission_checker.go:10

Permission Caching

Activity permissions are cached:

  • Check once on stream registration
  • Cache hit: < 1ms
  • Cache miss: Query OpenFGA (~10ms)

Activity Payload

Activity payloads are arbitrary bytes:

message ActivityRequest {
string name = 2;
repeated bytes data = 3; // JSON, Protobuf, or custom format
}

JSON Payload Example

{
"activityType": "generate-report",
"parameters": {
"reportType": "sales",
"dateRange": {
"start": "2025-01-01",
"end": "2025-01-31"
},
"format": "pdf"
},
"metadata": {
"requestedBy": "user@example.com",
"priority": "high"
}
}

Protobuf Payload Example

message ReportRequest {
string report_type = 1;
DateRange date_range = 2;
string format = 3;
}

// Serialize to bytes
bytes payload = request.SerializeToBytes();

Activity vs Hook vs Event Bus

Understanding when to use each communication pattern:

Use Activities When:

  • Structured request/response needed
  • Long-running work with results
  • Load balancing across handlers required
  • Timeout and retry logic important

Use Hooks When:

  • Synchronous event notification
  • Multiple subscribers to same event
  • Permission-controlled communication
  • Simple trigger/response pattern

Use Event Bus When:

  • Asynchronous fire-and-forget
  • Platform-level events
  • No response required
  • Broadcasting to many subscribers

Related: Hooks Architecture, Event-Driven Architecture

Monitoring and Debugging

Activity Metrics

- Request count per activity
- Average execution time
- Timeout rate
- Error rate per handler
- Active handler count per activity
- Pending request count

Distributed Tracing

Activity requests are traced end-to-end:

Trace: activity.request
├─ Span: permission.check
├─ Span: correlator.register
├─ Span: routing.select_handler(s)
├─ Span: stream.send_request
├─ Span: wait.response (with timeout)
└─ Span: aggregate.results

Logging

Comprehensive logging for debugging:

INFO: Activity requested: generate-report by app:backend
DEBUG: Found 3 handlers: [worker-1, worker-2, worker-3]
DEBUG: Routing strategy: SINGLE_NODE (selected: worker-1)
INFO: Handler worker-1 responded in 1250ms: success
INFO: Activity completed: success

Error Scenarios

No Handlers Available

Problem: Activity not registered or all handlers offline

Behavior:
- RequestActivity returns ErrNoHandlers
- No execution attempted

Solution:
- Ensure at least one handler is registered
- Check handler connectivity
- Implement fallback logic

Handler Timeout

Problem: Handler doesn't respond within timeout

Behavior:
- Correlator waits up to timeout
- Returns ErrTimeout
- Other handlers unaffected (broadcast)

Solution:
- Increase timeout if legitimate
- Investigate slow handler
- Consider async processing with callback

Partial Failure (ALL_MUST_SUCCEED)

Problem: One handler fails when using ALL_MUST_SUCCEED

Behavior:
- Overall request fails
- All results are included in response
- Client sees aggregated error

Solution:
- Implement compensation logic
- Use BEST_EFFORT instead
- Review handler implementation

Handler Disconnection

Problem: Handler stream disconnects during request

Behavior:
- Handler is auto-unregistered
- Pending requests timeout
- Subsequent requests route to remaining handlers

Solution:
- Implement handler reconnection
- Use multiple handlers for redundancy
- Monitor handler health

Best Practices

For Activity Handlers

Respond Quickly:

stream.on('data', async (message) => {
if (message.activityRequest) {
try {
const result = await processActivity(message.activityRequest);

// Send response promptly
stream.write({
id: message.id,
activityResponse: {
success: true,
activityResult: { data: result }
}
});
} catch (err) {
// Always send response, even on error
stream.write({
id: message.id,
activityResponse: {
success: false,
errorStatus: {
code: grpc.status.INTERNAL,
message: err.message
}
}
});
}
}
});

Handler Liveness:

Handlers stay alive as long as they actively process requests and send responses. The LastHeartbeat timestamp is updated automatically when responses are sent on the stream. There is no separate heartbeat RPC available for client applications.

Handle Idempotency:

const processed = await isAlreadyProcessed(requestID);
if (processed) {
return getCachedResult(requestID);
}

const result = await processActivity(request);
await cacheResult(requestID, result);
return result;

For Activity Requesters

Set Reasonable Timeouts:

Quick activities: 1-5 seconds
Normal activities: 10-30 seconds
Long-running: 60-300 seconds

Choose Appropriate Strategy:

// Simple request/response - use single node
const response = await activityClient.RequestActivity({
name: 'get-user',
routingStrategy: RoutingStrategy.SINGLE_NODE,
executionModel: ExecutionModel.FIRST_MATCH,
timeoutMs: 5000
});

// Parallel processing - use broadcast
const response = await activityClient.RequestActivity({
name: 'search-all-shards',
routingStrategy: RoutingStrategy.BROADCAST,
executionModel: ExecutionModel.BEST_EFFORT,
timeoutMs: 30000
});

Handle Errors Gracefully:

try {
const response = await activityClient.RequestActivity(request);
if (!response.success) {
logger.error('Activity failed', response.error);
// Implement fallback or retry logic
}
} catch (err) {
if (err.code === 'DEADLINE_EXCEEDED') {
// Timeout - consider retry or alternative approach
}
}

Further Reading