The orchestration engine manages workflow execution, state persistence, retries, and monitoring.
// Workflow execution states
type WorkflowState =
| "pending" // Queued, waiting to start
| "running" // Currently executing steps
| "paused" // Waiting for HITL approval or external event
| "completed" // All steps finished successfully
| "failed" // Execution failed, may be retryable
| "cancelled" // Manually cancelled by user
// State transitions
const validTransitions = {
pending: ["running", "cancelled"],
running: ["paused", "completed", "failed"],
paused: ["running", "cancelled", "failed"],
completed: [], // Terminal state
failed: ["pending"], // Can retry
cancelled: [] // Terminal state
}// lib/workflows/orchestrator.ts
import { inngest } from "@/lib/inngest/client"
export class WorkflowOrchestrator {
private workflows: Map<string, WorkflowDefinition> = new Map()
/**
* Register a workflow definition
*/
register(workflow: WorkflowDefinition) {
this.workflows.set(workflow.id, workflow)
// Create Inngest function for this workflow
inngest.createFunction(
{ id: `workflow-${workflow.id}` },
{ event: workflow.trigger.event },
async ({ event, step }) => {
return this.execute(workflow.id, event.data)
}
)
}
/**
* Execute a workflow
*/
async execute(workflowId: string, triggerData: any): Promise<WorkflowExecution> {
const workflow = this.workflows.get(workflowId)
if (!workflow) throw new Error(`Workflow not found: ${workflowId}`)
// Create execution record
const execution = await db.workflowExecutions.create({
workflowId,
state: "running",
triggerData,
context: { trigger: { data: triggerData }, steps: {} },
startedAt: new Date()
})
try {
// Execute steps in dependency order
const sortedSteps = this.topologicalSort(workflow.steps)
for (const stepDef of sortedSteps) {
// Check if dependencies are satisfied
const depsReady = this.checkDependencies(stepDef, execution.context)
if (!depsReady) continue
// Check condition
if (stepDef.condition && !stepDef.condition(execution.context)) {
execution.context.steps[stepDef.id] = { skipped: true }
continue
}
// Execute step
const result = await this.executeStep(stepDef, execution)
execution.context.steps[stepDef.id] = result
// Update execution record
await db.workflowExecutions.update(execution.id, {
context: execution.context
})
// Check for HITL pause
if (result.status === "awaiting_approval") {
await db.workflowExecutions.update(execution.id, { state: "paused" })
return execution
}
}
// All steps completed
await db.workflowExecutions.update(execution.id, {
state: "completed",
completedAt: new Date()
})
// Run onComplete hook
if (workflow.onComplete) {
await workflow.onComplete(execution.context)
}
return execution
} catch (error) {
// Handle failure
await db.workflowExecutions.update(execution.id, {
state: "failed",
error: error.message,
failedAt: new Date()
})
// Run onError hook
if (workflow.onError) {
await workflow.onError(execution.context, error)
}
throw error
}
}
/**
* Execute a single step
*/
private async executeStep(
stepDef: StepDefinition,
execution: WorkflowExecution
): Promise<StepResult> {
const startTime = Date.now()
try {
// Handle HITL steps
if (stepDef.type === "hitl") {
return this.handleHITLStep(stepDef, execution)
}
// Get agent for this step
const agent = await agentRegistry.get(stepDef.agent)
if (!agent) throw new Error(`Agent not found: ${stepDef.agent}`)
// Execute with timeout and retries
const result = await this.executeWithRetry(
() => stepDef.action(execution.context),
stepDef.retries || { maxAttempts: 3, backoff: "exponential" }
)
return {
status: "completed",
output: result,
duration: Date.now() - startTime,
completedAt: new Date()
}
} catch (error) {
return {
status: "failed",
error: error.message,
duration: Date.now() - startTime,
failedAt: new Date()
}
}
}
/**
* Topological sort for dependency resolution
*/
private topologicalSort(steps: StepDefinition[]): StepDefinition[] {
const sorted: StepDefinition[] = []
const visited = new Set<string>()
const visit = (step: StepDefinition) => {
if (visited.has(step.id)) return
visited.add(step.id)
// Visit dependencies first
for (const depId of step.dependsOn || []) {
const dep = steps.find(s => s.id === depId)
if (dep) visit(dep)
}
sorted.push(step)
}
steps.forEach(visit)
return sorted
}
}Every workflow execution is durably persisted, enabling recovery from failures and long-running processes.
// Database schema for workflow executions
CREATE TABLE workflow_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id VARCHAR(255) NOT NULL,
state VARCHAR(50) NOT NULL DEFAULT 'pending',
-- Execution context (JSON)
trigger_data JSONB NOT NULL,
context JSONB NOT NULL DEFAULT '{}',
-- Step tracking
current_step VARCHAR(255),
completed_steps TEXT[] DEFAULT '{}',
-- Timing
started_at TIMESTAMPTZ,
paused_at TIMESTAMPTZ,
resumed_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
failed_at TIMESTAMPTZ,
-- Error handling
error TEXT,
retry_count INT DEFAULT 0,
-- Metadata
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
-- Indexes for querying
INDEX idx_workflow_state (workflow_id, state),
INDEX idx_created_at (created_at DESC)
);
-- Step execution history
CREATE TABLE step_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
execution_id UUID REFERENCES workflow_executions(id),
step_id VARCHAR(255) NOT NULL,
state VARCHAR(50) NOT NULL,
input JSONB,
output JSONB,
error TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
duration_ms INT,
-- Agent that executed this step
agent_id UUID,
INDEX idx_execution_step (execution_id, step_id)
);Delays increase exponentially: 1s, 2s, 4s, 8s...
Best for rate limits and transient failures.
Fixed delay between retries: 5s, 5s, 5s...
Best for predictable recovery times.
Retry immediately without delay.
Best for race conditions.
// Retry configuration
step({
id: "api-call",
agent: "integration-agent",
retries: {
maxAttempts: 5,
strategy: "exponential",
initialDelay: 1000, // 1 second
maxDelay: 60000, // Max 1 minute
// Only retry on specific errors
retryOn: [
"RATE_LIMITED",
"TIMEOUT",
"SERVICE_UNAVAILABLE"
],
// Don't retry on these
noRetryOn: [
"INVALID_INPUT",
"UNAUTHORIZED"
]
},
action: async (context) => {
// API call that might fail
}
})// Real-time workflow monitoring
import { orchestrator } from "@/lib/workflows"
// Subscribe to execution events
orchestrator.on("execution:started", (execution) => {
console.log(`Workflow started: ${execution.workflowId}`)
metrics.increment("workflow.started", { workflow: execution.workflowId })
})
orchestrator.on("step:completed", (execution, step) => {
console.log(`Step completed: ${step.id} in ${step.duration}ms`)
metrics.timing("step.duration", step.duration, { step: step.id })
})
orchestrator.on("execution:failed", (execution, error) => {
console.error(`Workflow failed: ${execution.workflowId}`, error)
metrics.increment("workflow.failed", { workflow: execution.workflowId })
// Alert on critical failures
if (execution.workflow.critical) {
alerting.send({
severity: "high",
message: `Critical workflow failed: ${execution.workflowId}`,
error
})
}
})
// Query execution history
const recentExecutions = await orchestrator.query({
workflowId: "order-fulfillment",
state: ["completed", "failed"],
since: new Date(Date.now() - 24 * 60 * 60 * 1000), // Last 24h
limit: 100
})
// Get execution metrics
const stats = await orchestrator.getStats("order-fulfillment", {
period: "7d"
})
// { total: 1250, completed: 1180, failed: 45, avgDuration: 12500 }