Execute independent steps simultaneously to dramatically reduce workflow completion time.
// Using the parallel() helper
import { createWorkflow, step, parallel } from "@/lib/workflows"
export const orderProcessingWorkflow = createWorkflow({
id: "order-processing",
steps: [
// These 3 steps run simultaneously
parallel({
id: "initial-validations",
steps: [
step({
id: "validate-inventory",
agent: "inventory-agent",
action: async (ctx) => {
const { items } = ctx.trigger.data
return inventory.checkAvailability(items)
}
}),
step({
id: "validate-payment",
agent: "payment-agent",
action: async (ctx) => {
const { paymentMethod } = ctx.trigger.data
return payments.validate(paymentMethod)
}
}),
step({
id: "validate-address",
agent: "shipping-agent",
action: async (ctx) => {
const { shippingAddress } = ctx.trigger.data
return shipping.validateAddress(shippingAddress)
}
})
],
// How to handle results
onComplete: "all", // Wait for all to complete
// Options: "all" | "any" | "race" | { min: 2 }
}),
// This step waits for all parallel steps
step({
id: "process-order",
dependsOn: ["initial-validations"],
action: async (ctx) => {
// Access results from parallel steps
const inventory = ctx.steps["validate-inventory"].output
const payment = ctx.steps["validate-payment"].output
const address = ctx.steps["validate-address"].output
// All validations passed, process order
return processOrder({ inventory, payment, address })
}
})
]
})Process multiple items in parallel, then aggregate results.
// Fan-out: Process multiple properties in parallel
import { fanOut, fanIn } from "@/lib/workflows"
export const bulkPropertyAnalysisWorkflow = createWorkflow({
id: "bulk-property-analysis",
steps: [
// Fan-out: Create parallel tasks for each property
fanOut({
id: "analyze-properties",
// Source array to fan out over
source: (ctx) => ctx.trigger.data.propertyIds,
// Step to run for each item
step: step({
id: "analyze-single",
agent: "property-analyst",
action: async (ctx) => {
const propertyId = ctx.fanOut.item
const index = ctx.fanOut.index
// Analyze single property
const property = await db.properties.findById(propertyId)
const marketData = await marketApi.getComps(property.address)
const valuation = await valuationService.estimate(property, marketData)
return {
propertyId,
valuation,
marketTrend: marketData.trend,
confidence: valuation.confidence
}
}
}),
// Concurrency control
concurrency: 10, // Max 10 parallel executions
// Error handling
onError: "continue", // Continue processing other items
// Options: "continue" | "stop" | "retry"
}),
// Fan-in: Aggregate all results
fanIn({
id: "aggregate-results",
dependsOn: ["analyze-properties"],
action: async (ctx) => {
const results = ctx.steps["analyze-properties"].outputs
const successful = results.filter(r => !r.error)
const failed = results.filter(r => r.error)
// Calculate portfolio metrics
const totalValue = successful.reduce((sum, r) => sum + r.valuation.amount, 0)
const avgConfidence = successful.reduce((sum, r) => sum + r.confidence, 0) / successful.length
return {
summary: {
totalProperties: results.length,
successful: successful.length,
failed: failed.length,
totalValue,
avgConfidence
},
results: successful,
errors: failed
}
}
})
]
})Get the fastest response from multiple sources.
// Race: First successful response wins
parallel({
id: "get-property-data",
steps: [
step({
id: "source-mls",
agent: "mls-agent",
action: async (ctx) => {
return await mlsApi.getProperty(ctx.trigger.data.address)
}
}),
step({
id: "source-zillow",
agent: "zillow-agent",
action: async (ctx) => {
return await zillowApi.getProperty(ctx.trigger.data.address)
}
}),
step({
id: "source-redfin",
agent: "redfin-agent",
action: async (ctx) => {
return await redfinApi.getProperty(ctx.trigger.data.address)
}
})
],
// First successful response wins
onComplete: "race",
// Cancel other requests when one succeeds
cancelOnComplete: true
})// Conditional: Only run steps that apply
parallel({
id: "property-enrichment",
steps: [
step({
id: "get-school-data",
// Only run for residential properties
condition: (ctx) => ctx.trigger.data.propertyType === "residential",
agent: "school-agent",
action: async (ctx) => {
return await schoolApi.getNearbySchools(ctx.trigger.data.location)
}
}),
step({
id: "get-zoning-data",
// Only run for commercial properties
condition: (ctx) => ctx.trigger.data.propertyType === "commercial",
agent: "zoning-agent",
action: async (ctx) => {
return await zoningApi.getZoning(ctx.trigger.data.parcelId)
}
}),
step({
id: "get-crime-data",
// Always run
agent: "crime-agent",
action: async (ctx) => {
return await crimeApi.getStats(ctx.trigger.data.location)
}
}),
step({
id: "get-walkability",
// Always run
agent: "walkability-agent",
action: async (ctx) => {
return await walkscoreApi.getScore(ctx.trigger.data.address)
}
})
],
// Wait for all applicable steps
onComplete: "all"
})// Error handling strategies
parallel({
id: "multi-source-enrichment",
steps: [...],
// Strategy 1: Fail fast (default)
onError: "stop",
// Strategy 2: Continue and collect errors
onError: "continue",
onComplete: async (results) => {
const errors = results.filter(r => r.error)
if (errors.length > results.length / 2) {
throw new Error("Too many failures")
}
return results.filter(r => !r.error)
},
// Strategy 3: Minimum success threshold
onComplete: { min: 2 }, // At least 2 must succeed
// Strategy 4: Custom error handler
onStepError: async (step, error, context) => {
// Log error but continue
await logging.warn(`Step ${step.id} failed: ${error.message}`)
// Return fallback value
return { fallback: true, error: error.message }
}
})