Skip to content

Instantly share code, notes, and snippets.

@vegarsti
Last active March 13, 2026 13:40
Show Gist options
  • Select an option

  • Save vegarsti/60445c78cb898d8021b50e8266e51aaf to your computer and use it in GitHub Desktop.

Select an option

Save vegarsti/60445c78cb898d8021b50e8266e51aaf to your computer and use it in GitHub Desktop.
Elwing: Session state migration plan

Session State Migration Plan

Move all session state out of Absurd checkpoint tables into our own database, enabling conversation replay and forking from conversation_records + a new session table.

Phase 1 — Refactors (no DB changes)

1a. Move DB recording inside ctx.step

Currently messages are written to conversation_records after the step returns. Move recordMessagesToDb inside the step body, right after tool execution. This way, by the time ctx.step commits its checkpoint, the messages are already in the DB — making the step result and DB consistent atomically.

This also means on retry, if the step is replayed from cache, the DB records already exist (no double-write risk since message_id is generated fresh — but you'd add the message_number sequence as the idempotency key).

Files: runner.ts — move the recordMessagesToDb call and lastStepHadComposeEmail / internal marking into the step body.

1b. Track messageNumber through the loop

Add a messageNumberBefore / messageNumberAfter counter to the loop. Before each step, snapshot the current number. After the step writes to DB, capture the new high-water mark. Return toMessageNumber in the step result alongside the existing fields.

This is additive — nothing reads it yet, but it establishes the pointer.

Files: runner.ts — add counter, include in AgentLoopStepResult.

1c. Extract appendedMessages consumers into a pure function

The post-step code reads stepResult.appendedMessages for three things:

  1. Concat to messages (LLM context)
  2. Detect lastStepHadComposeEmail (sentEmail details)
  3. File recording

Extract a function like processStepMessages(stepResult, messages, recordingSession) that returns { updatedMessages, lastStepHadComposeEmail }. This isolates the code that would later switch from "read from step result" to "hydrate from DB."

Files: runner.ts — extract function, no behavior change.

1d. Make conversationUpdates derivable from tool results

Currently conversationUpdates are drained from state.pendingUpdates (side-effected by tool execution) and returned in the step result so they can be re-applied on replay. Instead, tools could record their state mutations as metadata on the tool result message itself (they already partially do — ComposeEmail returns details: { type: "sentEmail", ... }).

Write a function deriveConversationUpdates(messages: Message[]): ConversationStateUpdate[] that scans tool results for known patterns. This doesn't replace the current mechanism yet, but proves the derivation works (add tests comparing derived vs drained).

Files: new conversation/derive.ts, tests.

1e. Stop returning full appendedMessages from the step

Once 1a–1d are done, change AgentLoopStepResult to:

type AgentLoopStepResult = {
  toMessageNumber: number;
  finishReason: string;
  usage: AgentLoopUsage;
  executedLocalToolCount: number;
  conversationUpdates?: ConversationStateUpdate[];
};

The post-step code loads messages from an in-memory buffer populated inside the step (not from the step result). On normal execution this is a no-op change. On Absurd replay, the step result is now tiny — but you still need appendedMessages for the messages array, so the replay path loads them from conversation_records using toMessageNumber.

Files: runner.ts — shrink step result, add hydration fallback.


Phase 2 — Session table

2a. Add conversation_sessions and session_seed_messages tables

Two tables — the session row is queried frequently (status, usage, metadata), while seed messages are large and only needed for replay/hydration.

CREATE TABLE conversation_sessions (
  session_id            TEXT PRIMARY KEY,
  conversation_id       TEXT NOT NULL,
  space_id              TEXT NOT NULL,
  mailbox_id            TEXT NOT NULL,
  trigger_type          TEXT NOT NULL,    -- email | chat | cron | message | herald | herald-chat
  trigger_id            TEXT NOT NULL,
  model_id              TEXT NOT NULL,
  model_context_window  INTEGER NOT NULL,
  status                TEXT NOT NULL DEFAULT 'running',  -- running | completed | failed
  policy                JSONB,            -- policy extraction result (loop suspicion, custom recipients, web policy)
  state                 JSONB,            -- serialized ConversationState, updated each step
  tool_names            TEXT[],           -- tool registry snapshot (names only, schemas are code-versioned)
  seed_message_count    INTEGER NOT NULL DEFAULT 0,
  to_message_number     INTEGER NOT NULL DEFAULT 0, -- high-water mark in conversation_records
  total_turns           INTEGER NOT NULL DEFAULT 0,
  total_input_tokens    INTEGER NOT NULL DEFAULT 0,
  total_output_tokens   INTEGER NOT NULL DEFAULT 0,
  total_cached_tokens   INTEGER NOT NULL DEFAULT 0,
  total_cost_cents      INTEGER NOT NULL DEFAULT 0,
  started_at            TIMESTAMPTZ NOT NULL DEFAULT now(),
  completed_at          TIMESTAMPTZ,
  error                 TEXT              -- failure reason, if status = 'failed'
);

CREATE INDEX idx_sessions_conversation ON conversation_sessions (conversation_id);
CREATE INDEX idx_sessions_space ON conversation_sessions (space_id, mailbox_id, started_at DESC);

-- Seed messages are large (email threads can be 50KB+), stored separately.
-- One row per seed message, ordered. Only read during replay/hydration.
CREATE TABLE session_seed_messages (
  session_id      TEXT NOT NULL REFERENCES conversation_sessions(session_id),
  ordinal         INTEGER NOT NULL,
  role            TEXT NOT NULL,       -- system | user | assistant
  content         TEXT NOT NULL,
  is_new          BOOLEAN NOT NULL DEFAULT false,
  timestamp       BIGINT,
  details         JSONB,
  PRIMARY KEY (session_id, ordinal)
);

session_seed_messages captures exactly what the LLM saw at the start — the rendered system prompt, annotated email thread, prior chat messages, etc. This makes replay trigger-agnostic: every trigger type stores its fully-rendered seeds.

Files: new migration, new session/store.ts with insert/update/load functions.

2b. Write session row at agent loop start

After policy extraction and seed message construction (both already computed), insert the session row and seed messages in a single ctx.step("create-session", ...). This step is idempotent via the session_id.

// In runAgentLoop, after policy extraction and buildSeedMessages:
const sessionId = newTaggedId("sess");

await ctx.step("create-session", async () => {
  await insertSession({
    sessionId,
    conversationId,
    spaceId: state.domainId,
    mailboxId: state.mailboxId,
    triggerType: trigger.type,
    triggerId: trigger.triggerId,
    modelId: model.id,
    modelContextWindow: model.contextWindow,
    policy: policyResult,
    initialState: serializeConversationState(state),
    toolNames: Object.keys(tools),
    seedMessageCount: seedMessages.length,
  });
  await insertSeedMessages(sessionId, seedMessages);
});

This is purely additive — nothing reads from it yet.

Files: runner.ts, session/store.ts.

2c. Update session after each step

After each agent-loop step, update the session row with running totals and the high-water mark. This replaces the in-memory debugMeta accumulation as the source of truth.

// After the existing post-step debugMeta accumulation:
await updateSessionStep(sessionId, {
  toMessageNumber: currentMessageNumber,
  turnsDelta: 1,
  inputTokensDelta: stepResult.usage.inputTokens,
  outputTokensDelta: stepResult.usage.outputTokens,
  cachedTokensDelta: stepResult.usage.cachedInputTokens,
  costCentsDelta: stepResult.usage.costCents,
});

Also update status and completed_at when the loop exits (success or failure).

This is still additive — debugMeta accumulation stays for now, the session row is a parallel write. Tests can assert they agree.

Files: runner.ts, session/store.ts.

2d. Store conversation state on session after each step

After applying conversationUpdates, serialize the current state and write it to the state column on the session row.

Written at session creation (initial snapshot) and updated after each step. On resume, load the latest state from here instead of replaying conversationUpdates from Absurd checkpoints.

Files: runner.ts, session/store.ts.


Phase 3 — Hydration

3a. Build hydrateSession function

The core primitive: given a session_id, reconstruct everything needed to resume or inspect a run.

interface HydratedSession {
  session: SessionRow;
  seedMessages: SeedMessage[];
  messages: Message[];           // from conversation_records, after seed
  state: ConversationState;      // deserialized from session.state
  debugMeta: DebugMeta;          // derived from session totals
}

async function hydrateSession(sessionId: string): Promise<HydratedSession> {
  const session = await loadSession(sessionId);
  const seedMessages = await loadSeedMessages(sessionId);
  const records = await loadConversationRecords(
    session.spaceId,
    session.mailboxId,
    session.conversationId,
  );
  // Only messages produced by the agent loop (after seeds)
  const messages = records
    .filter(r => r.message_number > session.seedMessageCount)
    .map(deserializeMessage);
  const state = deserializeConversationState(session.state);
  const debugMeta = deriveDebugMeta(session);
  return { session, seedMessages, messages, state, debugMeta };
}

Test this against live data: hydrate a completed session and assert the reconstructed messages array matches what Absurd's c_default step results contain.

Files: new session/hydrate.ts, tests.

3b. Replace Redis chat cache with session hydration

The chat path currently does:

  • Load: getChatConversation(userId, conversationId) → Redis → { messages, state }
  • Save: saveChatConversation(userId, conversationId, { messages, state }) → Redis

Replace with:

  • Load: look up latest conversation_sessions row by conversation_id, then hydrateSession{ messages, state } from Postgres.
  • Save: already handled by 2c/2d (session row is updated each step, messages are in conversation_records).

saveChatConversation in Redis becomes a no-op or a thin cache layer over the DB. getChatConversation becomes a hydrateSession call.

Run both paths in parallel initially (read from session, fall back to Redis for existing conversations), then remove Redis once all active conversations have migrated or expired (24h TTL handles this naturally).

Files: worker.ts (chat task), chat/store.ts, agent/chat.ts.

3c. Use hydration for messenger (Discord/Telegram) prior messages

The messenger path already loads from conversation_records via getMessengerMessages. Migrate it to load from the session instead:

  1. When a new message arrives for a messenger conversation, look up the latest session for that conversation_id.
  2. If one exists, use hydrateSession to get the state, then continue the conversation.
  3. If not (first message), create a fresh session as usual.

This is a small change since messenger already reads from conversation_records — it just adds the session lookup for state and metadata.

Files: processMessage.ts, light refactor of how state is constructed.

3d. Use hydration for email prior-session context (optional enrichment)

Email runs don't need prior agent messages for the LLM prompt — they re-annotate from Mailhook. But the session table now lets you:

  • Show prior agent runs in the UI (load all sessions for a conversation_id chain)
  • Link parent → child sessions via conversation_counters.parent_conversation_id
  • Debug what the agent did on previous emails in the thread

No change to the email agent loop itself — this is a read-side enhancement for the UI and debugging tools.

Files: new API routes or UI queries, no runner changes.


Phase 4 — Slim step results and replay

4a. Shrink the Absurd step result

Now that session state lives in Postgres, change AgentLoopStepResult to only store the pointer and loop-control metadata:

type AgentLoopStepResult = {
  toMessageNumber: number;
  finishReason: string;
  usage: AgentLoopUsage;
  executedLocalToolCount: number;
};

Drop appendedMessages and conversationUpdates from the step result. The post-step code reads messages from the in-memory buffer (populated inside the step during normal execution). On Absurd replay (retry), load from conversation_records using toMessageNumber and derive state from the session row.

const stepResult = await ctx.step("agent-loop", async () => {
  // ... LLM call, tool execution, record to DB ...
  return { toMessageNumber, finishReason, usage, executedLocalToolCount };
});

// On normal execution: messages are in a local buffer from inside the step
// On replay: buffer is empty, hydrate from DB
if (stepMessages.length === 0) {
  stepMessages = await loadMessageRange(
    conversationId, previousMessageNumber, stepResult.toMessageNumber
  );
  applyDerivedStateUpdates(state, stepMessages);
}

Files: runner.ts — change step return type, add hydration fallback.

4b. Implement fork-from-session

Given a session_id and a message_number cutoff, create a new session that:

  1. Copies the seed messages from the original session
  2. Creates a new conversation_id with parent_conversation_id → original
  3. Copies conversation_records up to the cutoff into the new conversation
  4. Starts a new agent loop from there (with potentially different model, tools, or prompt modifications)
async function forkSession(
  sourceSessionId: string,
  atMessageNumber: number,
  overrides?: { modelId?: string; systemPromptPatch?: string },
): Promise<string> // new session_id

Files: new session/fork.ts.

4c. Local replay CLI

A dev tool that loads a session via hydrateSession, reconstructs the full LLM context, and re-runs the agent loop with a local model or modified prompt. Writes to a forked conversation so the original is untouched.

make replay SESSION_ID=sess_xxx [MODEL=claude-sonnet-4-20250514] [FROM_MESSAGE=4]

The replay flow

Either way, the local replay script works the same:

  # 1. Download from production
  make export-session SESSION_ID=sess_xxx
  # or: curl -H "X-Internal-Token: ..." https://api.../sessions/sess_xxx/export > export.json

  # 2. Replay locally (fork)
  make replay SESSION=sess_xxx [MODEL=claude-sonnet-4-20250514] [FROM_MESSAGE=4]

The replay script:

  1. Loads the exported bundle from disk
  2. Inserts the session + seed messages + conversation records into the local DB (with a new conversation_id to avoid collisions)
  3. Calls runAgentLoop with tools in dry-run mode (no actual email sending — ComposeEmail returns the composed body but doesn't call SES)
  4. The loop picks up from FROM_MESSAGE (or replays from scratch), writing new messages to the local conversation_records
  5. Diff the original vs replayed messages to see what changed

4c-i. Export endpoint / script — the production → local bridge. Add the internal API endpoint. Also add a CLI wrapper that calls it via curl + SSM tunnel.

4c-ii. Import-and-replay script — takes an exported bundle, imports it into the local DB, and runs the agent loop in dry-run mode. This reuses hydrateSession from 3a.

4c-iii. Dry-run tool mode — a flag on ToolExecutionContext (or ToolRuntime) that makes side-effecting tools (ComposeEmail, WriteFile) return their result without actually executing. Some tools are already safe (ReadFile, CodeExecution); only a few need gating.

Files: new scripts/replay.ts or packages/elwing/src/cli/replay.ts.


Dependency graph

Phase 1 (no DB changes):

  1a ──┐
  1b ──┼──→ 1e
  1c ──┘
  1d (independent)

Phase 2 (session table):

  2a ──→ 2b ──→ 2c ──→ 2d

Phase 3 (hydration):

              2d
               ↓
              3a ──→ 3b (chat Redis → DB migration)
                ├──→ 3c (messenger, small change)
                └──→ 3d (email, read-side only)

Phase 4 (slim steps + replay):

              3a
               ↓
              4a ──→ 4b ──→ 4c

Phase 1 can proceed in parallel with 2a. Steps 3b, 3c, 3d are independent of each other. Phase 4 requires 3a.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment