Skip to content

Instantly share code, notes, and snippets.

@saeta-eth
Created December 9, 2025 20:57
Show Gist options
  • Select an option

  • Save saeta-eth/513101ab082f8d95e3dd2e2dc1633d0f to your computer and use it in GitHub Desktop.

Select an option

Save saeta-eth/513101ab082f8d95e3dd2e2dc1633d0f to your computer and use it in GitHub Desktop.

Workflow DevKit Integration Guide

Table of Contents

  1. Overview
  2. What is Workflow DevKit?
  3. Core Concepts
  4. Postgres World Setup
  5. Using Workflow with Bun
  6. Creating Workflows
  7. Creating Steps
  8. Control Flow Patterns
  9. Error Handling & Retries
  10. Observability & Debugging
  11. Best Practices
  12. References

Overview

This guide documents Workflow DevKit integration for building durable, reliable workflow execution with automatic state persistence, built-in retry mechanisms, and execution tracking.

Key Benefits:

  • Durability: Workflows survive application restarts and failures
  • Reliability: Automatic retry handling for transient errors
  • State Management: Persistent workflow execution state in PostgreSQL
  • Observability: Built-in inspection and monitoring tools
  • Concurrency: Controlled parallel execution with queue management

What is Workflow DevKit?

Workflow DevKit is a framework for building durable, long-running workflows that can survive application interruptions. Unlike traditional application code that loses state on restart, workflows maintain execution context and can resume exactly where they left off.

Core Features:

  • Durable execution with state persistence
  • Automatic retry mechanisms with configurable policies
  • Built-in concurrency control
  • Real-time event distribution
  • Observability through CLI and Web UI
  • Framework-agnostic design (works with Next.js, Express, Hono, Bun, etc.)

How it Differs from Regular Application Code:

  • Regular code: Lost state on crash, manual retry logic, no execution tracking
  • Workflow code: Preserved state, automatic retries, built-in observability

Core Concepts

Workflows

A workflow is the top-level orchestration unit that coordinates multiple steps to achieve a goal. Workflows are marked with the 'use workflow' directive.

Key Characteristics:

  • Durable: State persists across restarts
  • Resumable: Can continue from last checkpoint
  • Tracked: Full execution history maintained
  • Isolated: Each run has independent state

Example:

export async function dataProcessingWorkflow(params: { workflowRunId: string }) {
  'use workflow';

  // Orchestrate multiple steps
  const items = await fetchItems({ limit: 100 });
  const processed = await processItems(items);
  const results = await saveResults(processed);

  return { items, processed, results };
}

Steps

A step is a unit of work within a workflow. Steps are marked with the 'use step' directive and represent atomic operations that can be retried independently.

Key Characteristics:

  • Atomic: Executes as a single unit
  • Retryable: Failures trigger automatic retries
  • Trackable: Individual execution status
  • Idempotent: Safe to retry multiple times

Example:

export async function fetchItems(config: FetchItemsConfig) {
  'use step';

  try {
    const items = await fetchFromAPI();
    return items;
  } catch (error) {
    // Workflow will automatically retry this step
    throw new RetryableError('API request failed', { retryAfter: '30s' });
  }
}

The Directives

  • 'use workflow': Marks a function as a durable workflow
  • 'use step': Marks a function as a retryable step
  • These directives are used by the Workflow compiler during build

Postgres World Setup

The Postgres World implementation provides persistent storage for workflow state using PostgreSQL.

Installation

bun add @workflow/world-postgres workflow

Environment Configuration

Add to .env:

# Workflow Configuration
WORKFLOW_TARGET_WORLD="@workflow/world-postgres"
WORKFLOW_POSTGRES_URL="postgresql://user:password@localhost:5432/myapp"
WORKFLOW_POSTGRES_JOB_PREFIX="myapp"
WORKFLOW_POSTGRES_WORKER_CONCURRENCY=10

Variable Descriptions:

  • WORKFLOW_TARGET_WORLD: Specifies the world implementation to use
  • WORKFLOW_POSTGRES_URL: PostgreSQL connection string (can reuse DATABASE_URL)
  • WORKFLOW_POSTGRES_JOB_PREFIX: Prefix for queue jobs (helps identify jobs in shared DB)
  • WORKFLOW_POSTGRES_WORKER_CONCURRENCY: Number of concurrent workers (default: 10)

Database Schema Setup

Initialize the required tables:

bun x workflow-postgres-setup

Tables Created:

  1. workflow_runs: Stores workflow execution state
  2. workflow_events: Event history for workflows
  3. workflow_steps: Individual step execution data
  4. workflow_hooks: Webhook configurations
  5. workflow_streams: Streaming data outputs

Important Notes:

  • These tables are separate from your application schema
  • Managed entirely by Workflow DevKit
  • Do not manually modify these tables

Runtime Initialization

The Postgres World must be initialized during application startup:

import { createWorld } from '@workflow/world-postgres';
import type { World } from '@workflow/world';

let workflowWorld: World | undefined;

export async function initializeWorkflow(): Promise<World> {
  if (workflowWorld) {
    return workflowWorld;
  }

  // Create the PostgreSQL world
  workflowWorld = createWorld({
    connectionString: process.env.DATABASE_URL,
    jobPrefix: 'myapp',
    queueConcurrency: 10,
  });

  // Start the world (begins queue processing)
  if (workflowWorld.start) {
    await workflowWorld.start();
  }

  return workflowWorld;
}

Call this once at application startup:

// In your main entry point
import { initializeWorkflow } from './workflows/runtime.js';

async function main() {
  // Initialize workflow runtime
  await initializeWorkflow();

  // ... rest of your application
}

Using Workflow with Bun

Workflow DevKit supports Bun runtime with some specific considerations.

Build Process

Workflows must be compiled before execution:

# Compile workflows (generates handler files)
bun run build:workflows
# Or use the combined build script
bun run build

What happens during build:

  1. Workflow compiler scans for 'use workflow' and 'use step' directives
  2. Generates handler files in .well-known/workflow/v1/
  3. Creates a workflow registry with all discovered workflows

Build Scripts in package.json:

{
  "scripts": {
    "build": "tsc && npx workflow build",
    "build:workflows": "npx workflow build"
  }
}

HTTP Server for Workflow Endpoints

Workflow DevKit requires three HTTP endpoints for internal communication. This can be implemented with Bun.serve:

import { createWorkflowServer } from './workflows/server.js';

// Start the workflow HTTP server
const server = await createWorkflowServer(3001);

// Server exposes these required endpoints:
// - POST /.well-known/workflow/v1/flow (orchestration)
// - POST /.well-known/workflow/v1/step (step execution)
// - POST/GET /.well-known/workflow/v1/webhook/:token (webhook delivery)

Bun-Specific Implementation:

const server = Bun.serve({
  port: 3001,
  async fetch(req) {
    const url = new URL(req.url);
    const path = url.pathname;

    // Route to workflow endpoints
    if (path === '/.well-known/workflow/v1/flow' && req.method === 'POST') {
      return flowHandler.POST(req);
    }

    if (path === '/.well-known/workflow/v1/step' && req.method === 'POST') {
      return stepHandler.POST(req);
    }

    if (path.startsWith('/.well-known/workflow/v1/webhook/')) {
      return webhookHandler.POST(req);
    }

    return new Response('Not Found', { status: 404 });
  },
});

Starting Workflows in Bun

import { start } from 'workflow';
import { dataProcessingWorkflow } from './workflows/data-processing-workflow.js';

// Start a workflow run
const run = await start(dataProcessingWorkflow, {
  workflowRunId: 'run-123',
});

// Wait for completion
const result = await run.result();
console.log('Workflow completed:', result);

Creating Workflows

Workflows orchestrate multiple steps to achieve a complex goal.

Basic Workflow Structure

/**
 * Workflow: Data Processing
 *
 * Orchestrates data discovery, processing, and result creation.
 */
export async function dataProcessingWorkflow(params: {
  workflowRunId: string;
}): Promise<WorkflowResult> {
  'use workflow';

  const startTime = Date.now();

  // Step 1: Fetch items
  const items = await fetchItems({ limit: 100 });

  // Step 2: Process items
  const processed = await processItems(items);

  // Step 3: Save results
  const results = await saveResults(processed);

  return {
    duration: Date.now() - startTime,
    itemsFound: items.length,
    resultsCreated: results.length,
  };
}

Workflow Parameters

Workflows receive parameters as a single object:

export async function myWorkflow(params: {
  userId: string;
  options: MyOptions;
}): Promise<Result> {
  'use workflow';

  // Use params
  const data = await fetchUserData(params.userId);
  return processData(data, params.options);
}

Workflow Return Values

Workflows can return any serializable data:

interface WorkflowResult {
  status: 'success' | 'partial' | 'failed';
  processed: number;
  errors: string[];
}

export async function myWorkflow(): Promise<WorkflowResult> {
  'use workflow';
  // ... implementation
  return { status: 'success', processed: 100, errors: [] };
}

Dependency Injection Pattern

For dependencies like agents or database clients, use a module-level initialization pattern:

// Store dependencies at module level
let dependencies: WorkflowDependencies | undefined;

export interface WorkflowDependencies {
  agent: Agent;
  db: DatabaseClient;
}

// Initialize before starting workflows
export function initializeWorkflowDependencies(deps: WorkflowDependencies) {
  dependencies = deps;
}

// Access in workflow
function getDependencies(): WorkflowDependencies {
  if (!dependencies) {
    throw new Error('Dependencies not initialized');
  }
  return dependencies;
}

export async function myWorkflow() {
  'use workflow';

  const { agent, db } = getDependencies();
  // Use dependencies
}

Creating Steps

Steps are the building blocks of workflows - atomic units of work that can be retried independently.

Basic Step Structure

/**
 * Step: Fetch Items
 *
 * Fetches items from external API.
 */
export async function fetchItems(
  config: FetchItemsConfig = {}
): Promise<Item[]> {
  'use step';

  const limit = config.limit || 100;

  try {
    // Perform work
    const items = await fetchItemsFromAPI(limit);
    return items;
  } catch (error) {
    // Throw retryable error for automatic retry
    throw new RetryableError(
      'Failed to fetch items',
      { retryAfter: '30s' }
    );
  }
}

Step Best Practices

1. Keep Steps Atomic

Each step should do one logical thing:

// ❌ Bad: Too much in one step
export async function processEverything() {
  'use step';
  const items = await fetchItems();
  const processed = await processItems(items);
  const results = await saveResults(processed);
  return results;
}

// ✅ Good: Separate steps
export async function fetchItems() {
  'use step';
  return await fetchItemsFromAPI();
}

export async function processItems(items: Item[]) {
  'use step';
  return await performProcessing(items);
}

2. Make Steps Idempotent

Steps may be retried, so they should be safe to run multiple times:

export async function createRecord(recordId: string) {
  'use step';

  // ✅ Idempotent: Check if exists first
  const existing = await db.record.findUnique({ where: { id: recordId } });
  if (existing) return existing;

  // Create only if doesn't exist
  return await db.record.create({ data: { id: recordId } });
}

3. Use Appropriate Error Types

import { RetryableError, FatalError } from '@workflow/errors';

export async function myStep() {
  'use step';

  try {
    const result = await externalAPI.call();
    return result;
  } catch (error) {
    if (error.code === 'RATE_LIMIT') {
      // Retry after delay
      throw new RetryableError('Rate limited', { retryAfter: '60s' });
    }

    if (error.code === 'INVALID_INPUT') {
      // Don't retry - permanent failure
      throw new FatalError('Invalid input - cannot proceed');
    }

    // Default: retry with backoff
    throw new RetryableError('Temporary failure', { retryAfter: '30s' });
  }
}

Step Parameters and Return Values

Steps accept parameters and return values just like regular functions:

export interface StepConfig {
  maxRetries: number;
  timeout: number;
}

export async function myStep(
  input: string,
  config: StepConfig
): Promise<ProcessedResult> {
  'use step';

  const result = await processWithConfig(input, config);
  return result;
}

Control Flow Patterns

Workflows support various control flow patterns for complex orchestration.

Sequential Execution

Steps run one after another:

export async function sequentialWorkflow() {
  'use workflow';

  const step1Result = await step1();
  const step2Result = await step2(step1Result);
  const step3Result = await step3(step2Result);

  return step3Result;
}

Parallel Execution

Execute multiple steps concurrently:

export async function parallelWorkflow() {
  'use workflow';

  // Run steps in parallel
  const [result1, result2, result3] = await Promise.all([
    step1(),
    step2(),
    step3(),
  ]);

  // Combine results
  return combineResults(result1, result2, result3);
}

Controlled Concurrency

Process items in parallel with concurrency limits:

export async function batchWorkflow(items: Item[]) {
  'use workflow';

  // Process items with controlled concurrency
  const results = await Promise.allSettled(
    items.map(async (item) => {
      return await processItem(item);
    })
  );

  // Handle successes and failures
  const successes = results.filter(r => r.status === 'fulfilled');
  const failures = results.filter(r => r.status === 'rejected');

  return { successes, failures };
}

Conditional Execution

Branch based on conditions:

export async function conditionalWorkflow(itemType: string) {
  'use workflow';

  if (itemType === 'typeA') {
    return await processTypeA();
  } else if (itemType === 'typeB') {
    return await processTypeB();
  } else {
    throw new FatalError(`Unsupported item type: ${itemType}`);
  }
}

Loops and Iterations

Iterate over data:

export async function iterativeWorkflow(items: Item[]) {
  'use workflow';

  const results = [];

  for (const item of items) {
    const result = await processItem(item);
    results.push(result);

    // Early exit condition
    if (results.length >= 10) break;
  }

  return results;
}

Error Handling & Retries

Workflow DevKit provides sophisticated error handling and retry mechanisms.

Error Types

1. RetryableError

Use for transient failures that should be retried:

import { RetryableError } from '@workflow/errors';

export async function fetchData() {
  'use step';

  try {
    return await api.fetch();
  } catch (error) {
    // Retry after 30 seconds
    throw new RetryableError('API temporarily unavailable', {
      retryAfter: '30s'
    });
  }
}

Retry Delay Options:

  • retryAfter: '30s' - Retry after 30 seconds
  • retryAfter: '5m' - Retry after 5 minutes
  • retryAfter: '1h' - Retry after 1 hour

2. FatalError

Use for permanent failures that should not be retried:

import { FatalError } from '@workflow/errors';

export async function validateInput(data: unknown) {
  'use step';

  if (!isValid(data)) {
    // Don't retry - input is fundamentally wrong
    throw new FatalError('Invalid input data - cannot proceed');
  }

  return processValidData(data);
}

Retry Strategies

Automatic Retry with Backoff:

export async function resilientStep() {
  'use step';

  try {
    return await unstableOperation();
  } catch (error) {
    // Workflow DevKit handles exponential backoff automatically
    throw new RetryableError('Operation failed, retrying...');
  }
}

Custom Retry Logic:

export async function stepWithCustomRetry() {
  'use step';

  try {
    return await operation();
  } catch (error) {
    if (error.statusCode === 429) {
      // Rate limit - wait longer
      throw new RetryableError('Rate limited', { retryAfter: '5m' });
    } else {
      // Other errors - retry sooner
      throw new RetryableError('Temporary error', { retryAfter: '30s' });
    }
  }
}

Error Propagation in Workflows

export async function workflowWithErrorHandling() {
  'use workflow';

  try {
    const result = await riskyStep();
    return result;
  } catch (error) {
    // Handle workflow-level errors
    console.error('Workflow failed:', error);

    // Return partial result or re-throw
    return { status: 'failed', error: error.message };
  }
}

Handling Partial Failures

export async function batchProcessingWorkflow(items: Item[]) {
  'use workflow';

  const results = await Promise.allSettled(
    items.map(item => processItem(item))
  );

  // Separate successes and failures
  const successes = results
    .filter(r => r.status === 'fulfilled')
    .map(r => r.value);

  const failures = results
    .filter(r => r.status === 'rejected')
    .map(r => ({ error: r.reason }));

  return {
    processed: successes.length,
    failed: failures.length,
    successes,
    failures,
  };
}

Observability & Debugging

Workflow DevKit provides powerful tools for inspecting and debugging workflows.

CLI Tools

List Recent Workflow Runs:

bunx workflow inspect runs

Inspect Specific Run:

bunx workflow inspect runs <run-id>

Launch Web UI:

bunx workflow inspect runs --web

Get Help:

bunx workflow inspect --help

Web UI

The Web UI provides a graphical interface for exploring workflows:

bunx workflow inspect runs --web

Features:

  • Visual workflow execution timeline
  • Step-by-step execution details
  • Error messages and stack traces
  • Retry history
  • Input/output data for each step

Backend Configuration

For different deployment environments:

# Local development (default)
bunx workflow inspect runs

# Production (e.g., Vercel)
bunx workflow inspect runs --backend vercel

Vercel Backend Setup:

  1. Authenticate with Vercel CLI: vercel login
  2. Link your project: vercel link
  3. Run inspection: bunx workflow inspect runs --backend vercel

Logging Best Practices

Add structured logging to workflows and steps:

export async function fetchItems() {
  'use step';

  console.log('[fetchItems] Starting item fetch');

  const items = await fetchItemsFromAPI();

  console.log('[fetchItems] Items fetched:', {
    count: items.length,
    timestamp: new Date().toISOString(),
  });

  return items;
}

Recommended Logging Format:

  • Prefix with step/workflow name: [stepName]
  • Include relevant metrics: counts, durations, IDs
  • Log at decision points: before/after external calls, conditional branches
  • Use structured data (objects) for rich context

Debugging Workflow Issues

1. Check Workflow Build:

# Rebuild workflows
bun run build:workflows

# Verify generated files exist
ls -la .well-known/workflow/v1/

2. Verify Runtime Initialization:

// Check that workflow world is started
import { getWorkflowWorld } from './workflows/runtime.js';

const world = getWorkflowWorld();
console.log('Workflow world active:', !!world);

3. Inspect Database State:

-- Check recent workflow runs
SELECT * FROM workflow_runs ORDER BY started_at DESC LIMIT 10;

-- Check failed runs
SELECT * FROM workflow_runs WHERE status = 'failed';

-- Check step execution
SELECT * FROM workflow_steps WHERE workflow_run_id = 'run-123';

4. Enable Debug Logging:

LOG_LEVEL=debug

Best Practices

1. Workflow Design

Keep Workflows Focused:

// ✅ Good: Single responsibility
export async function dataProcessingWorkflow() {
  'use workflow';
  const items = await fetchItems();
  const processed = await processItems(items);
  return processed;
}

// ❌ Bad: Too many responsibilities
export async function doEverythingWorkflow() {
  'use workflow';
  await fetchItems();
  await processItems();
  await saveResults();
  await sendNotifications();
  await updateDashboard();
  // ... too much!
}

Use Descriptive Names:

// ✅ Good
export async function dataProcessingWorkflow() {}
export async function userOnboardingWorkflow() {}

// ❌ Bad
export async function workflow1() {}
export async function processStuff() {}

2. Step Design

Make Steps Atomic:

// ✅ Good: One logical operation
export async function fetchItemData(itemId: string) {
  'use step';
  return await api.getItem(itemId);
}

// ❌ Bad: Multiple operations
export async function fetchAndProcessAndSave(itemId: string) {
  'use step';
  const data = await api.getItem(itemId);
  const processed = processData(data);
  await db.save(processed);
  return processed;
}

Ensure Idempotency:

// ✅ Good: Idempotent
export async function createItem(itemId: string) {
  'use step';

  const existing = await db.item.findUnique({ where: { id: itemId } });
  if (existing) return existing;

  return await db.item.create({ data: { id: itemId } });
}

3. Error Handling

Choose Error Types Carefully:

export async function apiCall() {
  'use step';

  try {
    return await externalAPI.call();
  } catch (error) {
    // Transient errors → RetryableError
    if (error.code === 'NETWORK_ERROR') {
      throw new RetryableError('Network issue', { retryAfter: '30s' });
    }

    // Permanent errors → FatalError
    if (error.code === 'INVALID_API_KEY') {
      throw new FatalError('Invalid API key - check configuration');
    }

    // Unknown errors → RetryableError (safe default)
    throw new RetryableError('Unexpected error', { retryAfter: '1m' });
  }
}

Handle Partial Failures Gracefully:

export async function processBatch(items: Item[]) {
  'use workflow';

  const results = await Promise.allSettled(
    items.map(item => processItem(item))
  );

  // Continue with successes, log failures
  const successes = results.filter(r => r.status === 'fulfilled');
  const failures = results.filter(r => r.status === 'rejected');

  if (failures.length > 0) {
    console.warn('Some items failed:', failures.length);
  }

  return successes.map(r => r.value);
}

4. Dependency Management

Use Dependency Injection:

// Module-level dependencies
let deps: { agent: Agent; db: DB } | undefined;

export function initDependencies(dependencies: Dependencies) {
  deps = dependencies;
}

function getDeps() {
  if (!deps) throw new Error('Dependencies not initialized');
  return deps;
}

// Use in workflows
export async function myWorkflow() {
  'use workflow';
  const { agent, db } = getDeps();
  // Use dependencies
}

5. Testing Workflows

Test Steps Independently:

import { describe, it, expect } from 'bun:test';
import { fetchItems } from './steps/fetch-items.js';

describe('fetchItems', () => {
  it('should return items', async () => {
    const items = await fetchItems({ limit: 10 });
    expect(items.length).toBeGreaterThan(0);
    expect(items[0]).toHaveProperty('id');
  });
});

Test Workflow Logic:

describe('dataProcessingWorkflow', () => {
  it('should complete successfully', async () => {
    // Initialize dependencies with mocks/stubs
    initializeWorkflowDependencies({
      agent: mockAgent,
      db: mockDb,
    });

    const result = await dataProcessingWorkflow({
      workflowRunId: 'test-run',
    });

    expect(result.status).toBe('success');
  });
});

6. Monitoring and Observability

Add Structured Logging:

export async function myStep(input: Input) {
  'use step';

  console.log('[myStep] Starting', { input });

  try {
    const result = await process(input);
    console.log('[myStep] Completed', { resultSize: result.length });
    return result;
  } catch (error) {
    console.error('[myStep] Failed', { error: error.message });
    throw error;
  }
}

Track Important Metrics:

export async function workflow() {
  'use workflow';

  const startTime = Date.now();

  const result = await performWork();

  const duration = Date.now() - startTime;
  console.log('[workflow] Metrics', {
    duration,
    itemsProcessed: result.length,
    timestamp: new Date().toISOString(),
  });

  return result;
}

7. Performance Optimization

Use Parallel Execution:

// ✅ Good: Parallel when independent
const [items, config, metadata] = await Promise.all([
  fetchItems(),
  fetchConfig(),
  fetchMetadata(),
]);

// ❌ Bad: Sequential when could be parallel
const items = await fetchItems();
const config = await fetchConfig();
const metadata = await fetchMetadata();

Control Concurrency:

// Process in batches to avoid overwhelming external APIs
async function processBatch(items: Item[], batchSize: number) {
  const results = [];

  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    const batchResults = await Promise.all(
      batch.map(item => processItem(item))
    );
    results.push(...batchResults);
  }

  return results;
}

References

Official Documentation

Examples

  • GitHub Repository: https://github.com/vercel/workflow-examples
    • Kitchen Sink: Comprehensive reference implementation
    • Custom Adapter: Bun integration example
    • AI SDK Integration: AI workflow patterns
    • Flight Booking: Real-world application

Package Documentation

  • @workflow/world-postgres: PostgreSQL storage backend
  • workflow: Core workflow runtime
  • @workflow/errors: Error types (RetryableError, FatalError)

Next Steps

To extend workflow functionality:

  1. Add New Workflows: Create new workflow files in workflows/
  2. Add New Steps: Create reusable steps in steps/
  3. Improve Error Handling: Add more granular retry strategies
  4. Add Webhooks: Integrate external event triggers
  5. Enhance Observability: Add custom metrics and dashboards
  6. Optimize Performance: Tune concurrency settings
  7. Add Tests: Create comprehensive workflow test suite

For questions or issues, refer to:

  • Workflow DevKit GitHub Issues
  • Project documentation
  • Team discussions and code reviews
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment