Move all session state out of Absurd checkpoint tables into our own database,
enabling conversation replay and forking from conversation_records + a new
session table.
Currently messages are written to conversation_records after the step
returns. Move recordMessagesToDb inside the step body, right after tool
execution. This way, by the time ctx.step commits its checkpoint, the messages
are already in the DB — making the step result and DB consistent atomically.
This also means on retry, if the step is replayed from cache, the DB records
already exist (no double-write risk since message_id is generated fresh — but
you'd add the message_number sequence as the idempotency key).
Files: runner.ts — move the recordMessagesToDb call and
lastStepHadComposeEmail / internal marking into the step body.
Add a messageNumberBefore / messageNumberAfter counter to the loop. Before
each step, snapshot the current number. After the step writes to DB, capture the
new high-water mark. Return toMessageNumber in the step result alongside the
existing fields.
This is additive — nothing reads it yet, but it establishes the pointer.
Files: runner.ts — add counter, include in AgentLoopStepResult.
The post-step code reads stepResult.appendedMessages for three things:
- Concat to
messages(LLM context) - Detect
lastStepHadComposeEmail(sentEmail details) - File recording
Extract a function like processStepMessages(stepResult, messages, recordingSession) that returns { updatedMessages, lastStepHadComposeEmail }.
This isolates the code that would later switch from "read from step result" to
"hydrate from DB."
Files: runner.ts — extract function, no behavior change.
Currently conversationUpdates are drained from state.pendingUpdates
(side-effected by tool execution) and returned in the step result so they can be
re-applied on replay. Instead, tools could record their state mutations as
metadata on the tool result message itself (they already partially do —
ComposeEmail returns details: { type: "sentEmail", ... }).
Write a function deriveConversationUpdates(messages: Message[]): ConversationStateUpdate[] that scans tool results for known patterns. This
doesn't replace the current mechanism yet, but proves the derivation works (add
tests comparing derived vs drained).
Files: new conversation/derive.ts, tests.
Once 1a–1d are done, change AgentLoopStepResult to:
type AgentLoopStepResult = {
toMessageNumber: number;
finishReason: string;
usage: AgentLoopUsage;
executedLocalToolCount: number;
conversationUpdates?: ConversationStateUpdate[];
};The post-step code loads messages from an in-memory buffer populated inside the
step (not from the step result). On normal execution this is a no-op change. On
Absurd replay, the step result is now tiny — but you still need
appendedMessages for the messages array, so the replay path loads them from
conversation_records using toMessageNumber.
Files: runner.ts — shrink step result, add hydration fallback.
Two tables — the session row is queried frequently (status, usage, metadata), while seed messages are large and only needed for replay/hydration.
CREATE TABLE conversation_sessions (
session_id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
space_id TEXT NOT NULL,
mailbox_id TEXT NOT NULL,
trigger_type TEXT NOT NULL, -- email | chat | cron | message | herald | herald-chat
trigger_id TEXT NOT NULL,
model_id TEXT NOT NULL,
model_context_window INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'running', -- running | completed | failed
policy JSONB, -- policy extraction result (loop suspicion, custom recipients, web policy)
state JSONB, -- serialized ConversationState, updated each step
tool_names TEXT[], -- tool registry snapshot (names only, schemas are code-versioned)
seed_message_count INTEGER NOT NULL DEFAULT 0,
to_message_number INTEGER NOT NULL DEFAULT 0, -- high-water mark in conversation_records
total_turns INTEGER NOT NULL DEFAULT 0,
total_input_tokens INTEGER NOT NULL DEFAULT 0,
total_output_tokens INTEGER NOT NULL DEFAULT 0,
total_cached_tokens INTEGER NOT NULL DEFAULT 0,
total_cost_cents INTEGER NOT NULL DEFAULT 0,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
completed_at TIMESTAMPTZ,
error TEXT -- failure reason, if status = 'failed'
);
CREATE INDEX idx_sessions_conversation ON conversation_sessions (conversation_id);
CREATE INDEX idx_sessions_space ON conversation_sessions (space_id, mailbox_id, started_at DESC);
-- Seed messages are large (email threads can be 50KB+), stored separately.
-- One row per seed message, ordered. Only read during replay/hydration.
CREATE TABLE session_seed_messages (
session_id TEXT NOT NULL REFERENCES conversation_sessions(session_id),
ordinal INTEGER NOT NULL,
role TEXT NOT NULL, -- system | user | assistant
content TEXT NOT NULL,
is_new BOOLEAN NOT NULL DEFAULT false,
timestamp BIGINT,
details JSONB,
PRIMARY KEY (session_id, ordinal)
);session_seed_messages captures exactly what the LLM saw at the start — the
rendered system prompt, annotated email thread, prior chat messages, etc. This
makes replay trigger-agnostic: every trigger type stores its fully-rendered
seeds.
Files: new migration, new session/store.ts with insert/update/load
functions.
After policy extraction and seed message construction (both already computed),
insert the session row and seed messages in a single
ctx.step("create-session", ...). This step is idempotent via the session_id.
// In runAgentLoop, after policy extraction and buildSeedMessages:
const sessionId = newTaggedId("sess");
await ctx.step("create-session", async () => {
await insertSession({
sessionId,
conversationId,
spaceId: state.domainId,
mailboxId: state.mailboxId,
triggerType: trigger.type,
triggerId: trigger.triggerId,
modelId: model.id,
modelContextWindow: model.contextWindow,
policy: policyResult,
initialState: serializeConversationState(state),
toolNames: Object.keys(tools),
seedMessageCount: seedMessages.length,
});
await insertSeedMessages(sessionId, seedMessages);
});This is purely additive — nothing reads from it yet.
Files: runner.ts, session/store.ts.
After each agent-loop step, update the session row with running totals and the
high-water mark. This replaces the in-memory debugMeta accumulation as the
source of truth.
// After the existing post-step debugMeta accumulation:
await updateSessionStep(sessionId, {
toMessageNumber: currentMessageNumber,
turnsDelta: 1,
inputTokensDelta: stepResult.usage.inputTokens,
outputTokensDelta: stepResult.usage.outputTokens,
cachedTokensDelta: stepResult.usage.cachedInputTokens,
costCentsDelta: stepResult.usage.costCents,
});Also update status and completed_at when the loop exits (success or
failure).
This is still additive — debugMeta accumulation stays for now, the session row
is a parallel write. Tests can assert they agree.
Files: runner.ts, session/store.ts.
After applying conversationUpdates, serialize the current state and write it to
the state column on the session row.
Written at session creation (initial snapshot) and updated after each step. On
resume, load the latest state from here instead of replaying
conversationUpdates from Absurd checkpoints.
Files: runner.ts, session/store.ts.
The core primitive: given a session_id, reconstruct everything needed to resume
or inspect a run.
interface HydratedSession {
session: SessionRow;
seedMessages: SeedMessage[];
messages: Message[]; // from conversation_records, after seed
state: ConversationState; // deserialized from session.state
debugMeta: DebugMeta; // derived from session totals
}
async function hydrateSession(sessionId: string): Promise<HydratedSession> {
const session = await loadSession(sessionId);
const seedMessages = await loadSeedMessages(sessionId);
const records = await loadConversationRecords(
session.spaceId,
session.mailboxId,
session.conversationId,
);
// Only messages produced by the agent loop (after seeds)
const messages = records
.filter(r => r.message_number > session.seedMessageCount)
.map(deserializeMessage);
const state = deserializeConversationState(session.state);
const debugMeta = deriveDebugMeta(session);
return { session, seedMessages, messages, state, debugMeta };
}Test this against live data: hydrate a completed session and assert the
reconstructed messages array matches what Absurd's c_default step results
contain.
Files: new session/hydrate.ts, tests.
The chat path currently does:
- Load:
getChatConversation(userId, conversationId)→ Redis →{ messages, state } - Save:
saveChatConversation(userId, conversationId, { messages, state })→ Redis
Replace with:
- Load: look up latest
conversation_sessionsrow byconversation_id, thenhydrateSession→{ messages, state }from Postgres. - Save: already handled by 2c/2d (session row is updated each step, messages are
in
conversation_records).
saveChatConversation in Redis becomes a no-op or a thin cache layer over the
DB. getChatConversation becomes a hydrateSession call.
Run both paths in parallel initially (read from session, fall back to Redis for existing conversations), then remove Redis once all active conversations have migrated or expired (24h TTL handles this naturally).
Files: worker.ts (chat task), chat/store.ts, agent/chat.ts.
The messenger path already loads from conversation_records via
getMessengerMessages. Migrate it to load from the session instead:
- When a new message arrives for a messenger conversation, look up the latest
session for that
conversation_id. - If one exists, use
hydrateSessionto get the state, then continue the conversation. - If not (first message), create a fresh session as usual.
This is a small change since messenger already reads from conversation_records
— it just adds the session lookup for state and metadata.
Files: processMessage.ts, light refactor of how state is constructed.
Email runs don't need prior agent messages for the LLM prompt — they re-annotate from Mailhook. But the session table now lets you:
- Show prior agent runs in the UI (load all sessions for a
conversation_idchain) - Link parent → child sessions via
conversation_counters.parent_conversation_id - Debug what the agent did on previous emails in the thread
No change to the email agent loop itself — this is a read-side enhancement for the UI and debugging tools.
Files: new API routes or UI queries, no runner changes.
Now that session state lives in Postgres, change AgentLoopStepResult to only
store the pointer and loop-control metadata:
type AgentLoopStepResult = {
toMessageNumber: number;
finishReason: string;
usage: AgentLoopUsage;
executedLocalToolCount: number;
};Drop appendedMessages and conversationUpdates from the step result. The
post-step code reads messages from the in-memory buffer (populated inside the
step during normal execution). On Absurd replay (retry), load from
conversation_records using toMessageNumber and derive state from the session
row.
const stepResult = await ctx.step("agent-loop", async () => {
// ... LLM call, tool execution, record to DB ...
return { toMessageNumber, finishReason, usage, executedLocalToolCount };
});
// On normal execution: messages are in a local buffer from inside the step
// On replay: buffer is empty, hydrate from DB
if (stepMessages.length === 0) {
stepMessages = await loadMessageRange(
conversationId, previousMessageNumber, stepResult.toMessageNumber
);
applyDerivedStateUpdates(state, stepMessages);
}Files: runner.ts — change step return type, add hydration fallback.
Given a session_id and a message_number cutoff, create a new session that:
- Copies the seed messages from the original session
- Creates a new
conversation_idwithparent_conversation_id→ original - Copies
conversation_recordsup to the cutoff into the new conversation - Starts a new agent loop from there (with potentially different model, tools, or prompt modifications)
async function forkSession(
sourceSessionId: string,
atMessageNumber: number,
overrides?: { modelId?: string; systemPromptPatch?: string },
): Promise<string> // new session_idFiles: new session/fork.ts.
A dev tool that loads a session via hydrateSession, reconstructs the full LLM
context, and re-runs the agent loop with a local model or modified prompt. Writes
to a forked conversation so the original is untouched.
make replay SESSION_ID=sess_xxx [MODEL=claude-sonnet-4-20250514] [FROM_MESSAGE=4]Either way, the local replay script works the same:
# 1. Download from production
make export-session SESSION_ID=sess_xxx
# or: curl -H "X-Internal-Token: ..." https://api.../sessions/sess_xxx/export > export.json
# 2. Replay locally (fork)
make replay SESSION=sess_xxx [MODEL=claude-sonnet-4-20250514] [FROM_MESSAGE=4]The replay script:
- Loads the exported bundle from disk
- Inserts the session + seed messages + conversation records into the local DB (with a new conversation_id to avoid collisions)
- Calls runAgentLoop with tools in dry-run mode (no actual email sending — ComposeEmail returns the composed body but doesn't call SES)
- The loop picks up from FROM_MESSAGE (or replays from scratch), writing new messages to the local conversation_records
- Diff the original vs replayed messages to see what changed
4c-i. Export endpoint / script — the production → local bridge. Add the internal API endpoint. Also add a CLI wrapper that calls it via curl + SSM tunnel.
4c-ii. Import-and-replay script — takes an exported bundle, imports it into the local DB, and runs the agent loop in dry-run mode. This reuses hydrateSession from 3a.
4c-iii. Dry-run tool mode — a flag on ToolExecutionContext (or ToolRuntime) that makes side-effecting tools (ComposeEmail, WriteFile) return their result without actually executing. Some tools are already safe (ReadFile, CodeExecution); only a few need gating.
Files: new scripts/replay.ts or packages/elwing/src/cli/replay.ts.
Phase 1 (no DB changes):
1a ──┐
1b ──┼──→ 1e
1c ──┘
1d (independent)
Phase 2 (session table):
2a ──→ 2b ──→ 2c ──→ 2d
Phase 3 (hydration):
2d
↓
3a ──→ 3b (chat Redis → DB migration)
├──→ 3c (messenger, small change)
└──→ 3d (email, read-side only)
Phase 4 (slim steps + replay):
3a
↓
4a ──→ 4b ──→ 4c
Phase 1 can proceed in parallel with 2a. Steps 3b, 3c, 3d are independent of each other. Phase 4 requires 3a.