- Overview
- What is Workflow DevKit?
- Core Concepts
- Postgres World Setup
- Using Workflow with Bun
- Creating Workflows
- Creating Steps
- Control Flow Patterns
- Error Handling & Retries
- Observability & Debugging
- Best Practices
- References
This guide documents Workflow DevKit integration for building durable, reliable workflow execution with automatic state persistence, built-in retry mechanisms, and execution tracking.
Key Benefits:
- Durability: Workflows survive application restarts and failures
- Reliability: Automatic retry handling for transient errors
- State Management: Persistent workflow execution state in PostgreSQL
- Observability: Built-in inspection and monitoring tools
- Concurrency: Controlled parallel execution with queue management
Workflow DevKit is a framework for building durable, long-running workflows that can survive application interruptions. Unlike traditional application code that loses state on restart, workflows maintain execution context and can resume exactly where they left off.
Core Features:
- Durable execution with state persistence
- Automatic retry mechanisms with configurable policies
- Built-in concurrency control
- Real-time event distribution
- Observability through CLI and Web UI
- Framework-agnostic design (works with Next.js, Express, Hono, Bun, etc.)
How it Differs from Regular Application Code:
- Regular code: Lost state on crash, manual retry logic, no execution tracking
- Workflow code: Preserved state, automatic retries, built-in observability
A workflow is the top-level orchestration unit that coordinates multiple steps to achieve a goal. Workflows are marked with the 'use workflow' directive.
Key Characteristics:
- Durable: State persists across restarts
- Resumable: Can continue from last checkpoint
- Tracked: Full execution history maintained
- Isolated: Each run has independent state
Example:
export async function dataProcessingWorkflow(params: { workflowRunId: string }) {
'use workflow';
// Orchestrate multiple steps
const items = await fetchItems({ limit: 100 });
const processed = await processItems(items);
const results = await saveResults(processed);
return { items, processed, results };
}A step is a unit of work within a workflow. Steps are marked with the 'use step' directive and represent atomic operations that can be retried independently.
Key Characteristics:
- Atomic: Executes as a single unit
- Retryable: Failures trigger automatic retries
- Trackable: Individual execution status
- Idempotent: Safe to retry multiple times
Example:
export async function fetchItems(config: FetchItemsConfig) {
'use step';
try {
const items = await fetchFromAPI();
return items;
} catch (error) {
// Workflow will automatically retry this step
throw new RetryableError('API request failed', { retryAfter: '30s' });
}
}'use workflow': Marks a function as a durable workflow'use step': Marks a function as a retryable step- These directives are used by the Workflow compiler during build
The Postgres World implementation provides persistent storage for workflow state using PostgreSQL.
bun add @workflow/world-postgres workflowAdd to .env:
# Workflow Configuration
WORKFLOW_TARGET_WORLD="@workflow/world-postgres"
WORKFLOW_POSTGRES_URL="postgresql://user:password@localhost:5432/myapp"
WORKFLOW_POSTGRES_JOB_PREFIX="myapp"
WORKFLOW_POSTGRES_WORKER_CONCURRENCY=10Variable Descriptions:
WORKFLOW_TARGET_WORLD: Specifies the world implementation to useWORKFLOW_POSTGRES_URL: PostgreSQL connection string (can reuseDATABASE_URL)WORKFLOW_POSTGRES_JOB_PREFIX: Prefix for queue jobs (helps identify jobs in shared DB)WORKFLOW_POSTGRES_WORKER_CONCURRENCY: Number of concurrent workers (default: 10)
Initialize the required tables:
bun x workflow-postgres-setupTables Created:
- workflow_runs: Stores workflow execution state
- workflow_events: Event history for workflows
- workflow_steps: Individual step execution data
- workflow_hooks: Webhook configurations
- workflow_streams: Streaming data outputs
Important Notes:
- These tables are separate from your application schema
- Managed entirely by Workflow DevKit
- Do not manually modify these tables
The Postgres World must be initialized during application startup:
import { createWorld } from '@workflow/world-postgres';
import type { World } from '@workflow/world';
let workflowWorld: World | undefined;
export async function initializeWorkflow(): Promise<World> {
if (workflowWorld) {
return workflowWorld;
}
// Create the PostgreSQL world
workflowWorld = createWorld({
connectionString: process.env.DATABASE_URL,
jobPrefix: 'myapp',
queueConcurrency: 10,
});
// Start the world (begins queue processing)
if (workflowWorld.start) {
await workflowWorld.start();
}
return workflowWorld;
}Call this once at application startup:
// In your main entry point
import { initializeWorkflow } from './workflows/runtime.js';
async function main() {
// Initialize workflow runtime
await initializeWorkflow();
// ... rest of your application
}Workflow DevKit supports Bun runtime with some specific considerations.
Workflows must be compiled before execution:
# Compile workflows (generates handler files)
bun run build:workflows
# Or use the combined build script
bun run buildWhat happens during build:
- Workflow compiler scans for
'use workflow'and'use step'directives - Generates handler files in
.well-known/workflow/v1/ - Creates a workflow registry with all discovered workflows
Build Scripts in package.json:
{
"scripts": {
"build": "tsc && npx workflow build",
"build:workflows": "npx workflow build"
}
}Workflow DevKit requires three HTTP endpoints for internal communication. This can be implemented with Bun.serve:
import { createWorkflowServer } from './workflows/server.js';
// Start the workflow HTTP server
const server = await createWorkflowServer(3001);
// Server exposes these required endpoints:
// - POST /.well-known/workflow/v1/flow (orchestration)
// - POST /.well-known/workflow/v1/step (step execution)
// - POST/GET /.well-known/workflow/v1/webhook/:token (webhook delivery)Bun-Specific Implementation:
const server = Bun.serve({
port: 3001,
async fetch(req) {
const url = new URL(req.url);
const path = url.pathname;
// Route to workflow endpoints
if (path === '/.well-known/workflow/v1/flow' && req.method === 'POST') {
return flowHandler.POST(req);
}
if (path === '/.well-known/workflow/v1/step' && req.method === 'POST') {
return stepHandler.POST(req);
}
if (path.startsWith('/.well-known/workflow/v1/webhook/')) {
return webhookHandler.POST(req);
}
return new Response('Not Found', { status: 404 });
},
});import { start } from 'workflow';
import { dataProcessingWorkflow } from './workflows/data-processing-workflow.js';
// Start a workflow run
const run = await start(dataProcessingWorkflow, {
workflowRunId: 'run-123',
});
// Wait for completion
const result = await run.result();
console.log('Workflow completed:', result);Workflows orchestrate multiple steps to achieve a complex goal.
/**
* Workflow: Data Processing
*
* Orchestrates data discovery, processing, and result creation.
*/
export async function dataProcessingWorkflow(params: {
workflowRunId: string;
}): Promise<WorkflowResult> {
'use workflow';
const startTime = Date.now();
// Step 1: Fetch items
const items = await fetchItems({ limit: 100 });
// Step 2: Process items
const processed = await processItems(items);
// Step 3: Save results
const results = await saveResults(processed);
return {
duration: Date.now() - startTime,
itemsFound: items.length,
resultsCreated: results.length,
};
}Workflows receive parameters as a single object:
export async function myWorkflow(params: {
userId: string;
options: MyOptions;
}): Promise<Result> {
'use workflow';
// Use params
const data = await fetchUserData(params.userId);
return processData(data, params.options);
}Workflows can return any serializable data:
interface WorkflowResult {
status: 'success' | 'partial' | 'failed';
processed: number;
errors: string[];
}
export async function myWorkflow(): Promise<WorkflowResult> {
'use workflow';
// ... implementation
return { status: 'success', processed: 100, errors: [] };
}For dependencies like agents or database clients, use a module-level initialization pattern:
// Store dependencies at module level
let dependencies: WorkflowDependencies | undefined;
export interface WorkflowDependencies {
agent: Agent;
db: DatabaseClient;
}
// Initialize before starting workflows
export function initializeWorkflowDependencies(deps: WorkflowDependencies) {
dependencies = deps;
}
// Access in workflow
function getDependencies(): WorkflowDependencies {
if (!dependencies) {
throw new Error('Dependencies not initialized');
}
return dependencies;
}
export async function myWorkflow() {
'use workflow';
const { agent, db } = getDependencies();
// Use dependencies
}Steps are the building blocks of workflows - atomic units of work that can be retried independently.
/**
* Step: Fetch Items
*
* Fetches items from external API.
*/
export async function fetchItems(
config: FetchItemsConfig = {}
): Promise<Item[]> {
'use step';
const limit = config.limit || 100;
try {
// Perform work
const items = await fetchItemsFromAPI(limit);
return items;
} catch (error) {
// Throw retryable error for automatic retry
throw new RetryableError(
'Failed to fetch items',
{ retryAfter: '30s' }
);
}
}1. Keep Steps Atomic
Each step should do one logical thing:
// ❌ Bad: Too much in one step
export async function processEverything() {
'use step';
const items = await fetchItems();
const processed = await processItems(items);
const results = await saveResults(processed);
return results;
}
// ✅ Good: Separate steps
export async function fetchItems() {
'use step';
return await fetchItemsFromAPI();
}
export async function processItems(items: Item[]) {
'use step';
return await performProcessing(items);
}2. Make Steps Idempotent
Steps may be retried, so they should be safe to run multiple times:
export async function createRecord(recordId: string) {
'use step';
// ✅ Idempotent: Check if exists first
const existing = await db.record.findUnique({ where: { id: recordId } });
if (existing) return existing;
// Create only if doesn't exist
return await db.record.create({ data: { id: recordId } });
}3. Use Appropriate Error Types
import { RetryableError, FatalError } from '@workflow/errors';
export async function myStep() {
'use step';
try {
const result = await externalAPI.call();
return result;
} catch (error) {
if (error.code === 'RATE_LIMIT') {
// Retry after delay
throw new RetryableError('Rate limited', { retryAfter: '60s' });
}
if (error.code === 'INVALID_INPUT') {
// Don't retry - permanent failure
throw new FatalError('Invalid input - cannot proceed');
}
// Default: retry with backoff
throw new RetryableError('Temporary failure', { retryAfter: '30s' });
}
}Steps accept parameters and return values just like regular functions:
export interface StepConfig {
maxRetries: number;
timeout: number;
}
export async function myStep(
input: string,
config: StepConfig
): Promise<ProcessedResult> {
'use step';
const result = await processWithConfig(input, config);
return result;
}Workflows support various control flow patterns for complex orchestration.
Steps run one after another:
export async function sequentialWorkflow() {
'use workflow';
const step1Result = await step1();
const step2Result = await step2(step1Result);
const step3Result = await step3(step2Result);
return step3Result;
}Execute multiple steps concurrently:
export async function parallelWorkflow() {
'use workflow';
// Run steps in parallel
const [result1, result2, result3] = await Promise.all([
step1(),
step2(),
step3(),
]);
// Combine results
return combineResults(result1, result2, result3);
}Process items in parallel with concurrency limits:
export async function batchWorkflow(items: Item[]) {
'use workflow';
// Process items with controlled concurrency
const results = await Promise.allSettled(
items.map(async (item) => {
return await processItem(item);
})
);
// Handle successes and failures
const successes = results.filter(r => r.status === 'fulfilled');
const failures = results.filter(r => r.status === 'rejected');
return { successes, failures };
}Branch based on conditions:
export async function conditionalWorkflow(itemType: string) {
'use workflow';
if (itemType === 'typeA') {
return await processTypeA();
} else if (itemType === 'typeB') {
return await processTypeB();
} else {
throw new FatalError(`Unsupported item type: ${itemType}`);
}
}Iterate over data:
export async function iterativeWorkflow(items: Item[]) {
'use workflow';
const results = [];
for (const item of items) {
const result = await processItem(item);
results.push(result);
// Early exit condition
if (results.length >= 10) break;
}
return results;
}Workflow DevKit provides sophisticated error handling and retry mechanisms.
1. RetryableError
Use for transient failures that should be retried:
import { RetryableError } from '@workflow/errors';
export async function fetchData() {
'use step';
try {
return await api.fetch();
} catch (error) {
// Retry after 30 seconds
throw new RetryableError('API temporarily unavailable', {
retryAfter: '30s'
});
}
}Retry Delay Options:
retryAfter: '30s'- Retry after 30 secondsretryAfter: '5m'- Retry after 5 minutesretryAfter: '1h'- Retry after 1 hour
2. FatalError
Use for permanent failures that should not be retried:
import { FatalError } from '@workflow/errors';
export async function validateInput(data: unknown) {
'use step';
if (!isValid(data)) {
// Don't retry - input is fundamentally wrong
throw new FatalError('Invalid input data - cannot proceed');
}
return processValidData(data);
}Automatic Retry with Backoff:
export async function resilientStep() {
'use step';
try {
return await unstableOperation();
} catch (error) {
// Workflow DevKit handles exponential backoff automatically
throw new RetryableError('Operation failed, retrying...');
}
}Custom Retry Logic:
export async function stepWithCustomRetry() {
'use step';
try {
return await operation();
} catch (error) {
if (error.statusCode === 429) {
// Rate limit - wait longer
throw new RetryableError('Rate limited', { retryAfter: '5m' });
} else {
// Other errors - retry sooner
throw new RetryableError('Temporary error', { retryAfter: '30s' });
}
}
}export async function workflowWithErrorHandling() {
'use workflow';
try {
const result = await riskyStep();
return result;
} catch (error) {
// Handle workflow-level errors
console.error('Workflow failed:', error);
// Return partial result or re-throw
return { status: 'failed', error: error.message };
}
}export async function batchProcessingWorkflow(items: Item[]) {
'use workflow';
const results = await Promise.allSettled(
items.map(item => processItem(item))
);
// Separate successes and failures
const successes = results
.filter(r => r.status === 'fulfilled')
.map(r => r.value);
const failures = results
.filter(r => r.status === 'rejected')
.map(r => ({ error: r.reason }));
return {
processed: successes.length,
failed: failures.length,
successes,
failures,
};
}Workflow DevKit provides powerful tools for inspecting and debugging workflows.
List Recent Workflow Runs:
bunx workflow inspect runsInspect Specific Run:
bunx workflow inspect runs <run-id>Launch Web UI:
bunx workflow inspect runs --webGet Help:
bunx workflow inspect --helpThe Web UI provides a graphical interface for exploring workflows:
bunx workflow inspect runs --webFeatures:
- Visual workflow execution timeline
- Step-by-step execution details
- Error messages and stack traces
- Retry history
- Input/output data for each step
For different deployment environments:
# Local development (default)
bunx workflow inspect runs
# Production (e.g., Vercel)
bunx workflow inspect runs --backend vercelVercel Backend Setup:
- Authenticate with Vercel CLI:
vercel login - Link your project:
vercel link - Run inspection:
bunx workflow inspect runs --backend vercel
Add structured logging to workflows and steps:
export async function fetchItems() {
'use step';
console.log('[fetchItems] Starting item fetch');
const items = await fetchItemsFromAPI();
console.log('[fetchItems] Items fetched:', {
count: items.length,
timestamp: new Date().toISOString(),
});
return items;
}Recommended Logging Format:
- Prefix with step/workflow name:
[stepName] - Include relevant metrics: counts, durations, IDs
- Log at decision points: before/after external calls, conditional branches
- Use structured data (objects) for rich context
1. Check Workflow Build:
# Rebuild workflows
bun run build:workflows
# Verify generated files exist
ls -la .well-known/workflow/v1/2. Verify Runtime Initialization:
// Check that workflow world is started
import { getWorkflowWorld } from './workflows/runtime.js';
const world = getWorkflowWorld();
console.log('Workflow world active:', !!world);3. Inspect Database State:
-- Check recent workflow runs
SELECT * FROM workflow_runs ORDER BY started_at DESC LIMIT 10;
-- Check failed runs
SELECT * FROM workflow_runs WHERE status = 'failed';
-- Check step execution
SELECT * FROM workflow_steps WHERE workflow_run_id = 'run-123';4. Enable Debug Logging:
LOG_LEVEL=debugKeep Workflows Focused:
// ✅ Good: Single responsibility
export async function dataProcessingWorkflow() {
'use workflow';
const items = await fetchItems();
const processed = await processItems(items);
return processed;
}
// ❌ Bad: Too many responsibilities
export async function doEverythingWorkflow() {
'use workflow';
await fetchItems();
await processItems();
await saveResults();
await sendNotifications();
await updateDashboard();
// ... too much!
}Use Descriptive Names:
// ✅ Good
export async function dataProcessingWorkflow() {}
export async function userOnboardingWorkflow() {}
// ❌ Bad
export async function workflow1() {}
export async function processStuff() {}Make Steps Atomic:
// ✅ Good: One logical operation
export async function fetchItemData(itemId: string) {
'use step';
return await api.getItem(itemId);
}
// ❌ Bad: Multiple operations
export async function fetchAndProcessAndSave(itemId: string) {
'use step';
const data = await api.getItem(itemId);
const processed = processData(data);
await db.save(processed);
return processed;
}Ensure Idempotency:
// ✅ Good: Idempotent
export async function createItem(itemId: string) {
'use step';
const existing = await db.item.findUnique({ where: { id: itemId } });
if (existing) return existing;
return await db.item.create({ data: { id: itemId } });
}Choose Error Types Carefully:
export async function apiCall() {
'use step';
try {
return await externalAPI.call();
} catch (error) {
// Transient errors → RetryableError
if (error.code === 'NETWORK_ERROR') {
throw new RetryableError('Network issue', { retryAfter: '30s' });
}
// Permanent errors → FatalError
if (error.code === 'INVALID_API_KEY') {
throw new FatalError('Invalid API key - check configuration');
}
// Unknown errors → RetryableError (safe default)
throw new RetryableError('Unexpected error', { retryAfter: '1m' });
}
}Handle Partial Failures Gracefully:
export async function processBatch(items: Item[]) {
'use workflow';
const results = await Promise.allSettled(
items.map(item => processItem(item))
);
// Continue with successes, log failures
const successes = results.filter(r => r.status === 'fulfilled');
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
console.warn('Some items failed:', failures.length);
}
return successes.map(r => r.value);
}Use Dependency Injection:
// Module-level dependencies
let deps: { agent: Agent; db: DB } | undefined;
export function initDependencies(dependencies: Dependencies) {
deps = dependencies;
}
function getDeps() {
if (!deps) throw new Error('Dependencies not initialized');
return deps;
}
// Use in workflows
export async function myWorkflow() {
'use workflow';
const { agent, db } = getDeps();
// Use dependencies
}Test Steps Independently:
import { describe, it, expect } from 'bun:test';
import { fetchItems } from './steps/fetch-items.js';
describe('fetchItems', () => {
it('should return items', async () => {
const items = await fetchItems({ limit: 10 });
expect(items.length).toBeGreaterThan(0);
expect(items[0]).toHaveProperty('id');
});
});Test Workflow Logic:
describe('dataProcessingWorkflow', () => {
it('should complete successfully', async () => {
// Initialize dependencies with mocks/stubs
initializeWorkflowDependencies({
agent: mockAgent,
db: mockDb,
});
const result = await dataProcessingWorkflow({
workflowRunId: 'test-run',
});
expect(result.status).toBe('success');
});
});Add Structured Logging:
export async function myStep(input: Input) {
'use step';
console.log('[myStep] Starting', { input });
try {
const result = await process(input);
console.log('[myStep] Completed', { resultSize: result.length });
return result;
} catch (error) {
console.error('[myStep] Failed', { error: error.message });
throw error;
}
}Track Important Metrics:
export async function workflow() {
'use workflow';
const startTime = Date.now();
const result = await performWork();
const duration = Date.now() - startTime;
console.log('[workflow] Metrics', {
duration,
itemsProcessed: result.length,
timestamp: new Date().toISOString(),
});
return result;
}Use Parallel Execution:
// ✅ Good: Parallel when independent
const [items, config, metadata] = await Promise.all([
fetchItems(),
fetchConfig(),
fetchMetadata(),
]);
// ❌ Bad: Sequential when could be parallel
const items = await fetchItems();
const config = await fetchConfig();
const metadata = await fetchMetadata();Control Concurrency:
// Process in batches to avoid overwhelming external APIs
async function processBatch(items: Item[], batchSize: number) {
const results = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchResults = await Promise.all(
batch.map(item => processItem(item))
);
results.push(...batchResults);
}
return results;
}- Workflow DevKit Documentation: https://useworkflow.dev/docs
- Getting Started: https://useworkflow.dev/docs/getting-started
- Foundations: https://useworkflow.dev/docs/foundations
- Observability: https://useworkflow.dev/docs/observability
- Postgres World: https://useworkflow.dev/docs/deploying/world/postgres-world
- GitHub Repository: https://github.com/vercel/workflow-examples
- Kitchen Sink: Comprehensive reference implementation
- Custom Adapter: Bun integration example
- AI SDK Integration: AI workflow patterns
- Flight Booking: Real-world application
- @workflow/world-postgres: PostgreSQL storage backend
- workflow: Core workflow runtime
- @workflow/errors: Error types (RetryableError, FatalError)
To extend workflow functionality:
- Add New Workflows: Create new workflow files in
workflows/ - Add New Steps: Create reusable steps in
steps/ - Improve Error Handling: Add more granular retry strategies
- Add Webhooks: Integrate external event triggers
- Enhance Observability: Add custom metrics and dashboards
- Optimize Performance: Tune concurrency settings
- Add Tests: Create comprehensive workflow test suite
For questions or issues, refer to:
- Workflow DevKit GitHub Issues
- Project documentation
- Team discussions and code reviews