---
title: Event Listener Runtime Migration Plan
description: >-
Detailed migration blueprint for moving Kimaki Discord session handling
to one long-lived OpenCode event listener per thread runtime with
centralized Zustand state updates. Hardened with feasibility review,
concrete code snippets, file-by-file refactor map, and staged
acceptance criteria.
prompt: |
[38m since last message | UTC: 2026-03-01 15:39:12 UTC | Local
(Europe/Rome): 03/01/2026, 16:39]create a detailed plan markdown file
of how the new architecture should work. reference opencode files to
take inspiration from. make it detailed enough so that new agents will
be able to do this migration. then ask oracle for a review of the plan.
---
Hardened by feasibility review: read session-handler.ts (2668 lines),
state.ts (232 lines), discord-bot.ts (1228 lines), and all 9 command
modules. Identified blockers, added concrete TypeScript snippets,
file-by-file refactor map, and staged acceptance criteria.
references:
- @discord/src/session-handler.ts
- @discord/src/session-handler/state.ts
- @discord/src/discord-bot.ts
- @discord/src/commands/queue.ts
- @discord/src/commands/abort.ts
- @discord/src/commands/action-buttons.ts
- @discord/src/commands/merge-worktree.ts
- @discord/src/commands/restart-opencode-server.ts
- @discord/src/commands/model.ts
- @discord/src/commands/unset-model.ts
- @discord/src/commands/permissions.ts
- @discord/src/commands/ask-question.ts
- @discord/src/commands/file-upload.ts
- @discord/src/thread-message-queue.e2e.test.ts
- @opensrc/repos/github.com/sst/opencode/packages/opencode/src/cli/cmd/
tui/context/sdk.tsx
- @opensrc/repos/github.com/sst/opencode/packages/opencode/src/cli/cmd/
tui/context/sync.tsx
- @opensrc/repos/github.com/sst/opencode/packages/opencode/src/cli/cmd/
tui/component/prompt/index.tsx
- @opensrc/repos/github.com/sst/opencode/packages/opencode/src/cli/cmd/
tui/routes/session/index.tsx
- @opensrc/repos/github.com/sst/opencode/packages/opencode/src/cli/cmd/
tui/worker.ts
---
Move Kimaki session orchestration from a per-message listener model to a per-thread runtime model:
- exactly one long-lived OpenCode
event.subscribestream per thread runtime - one centralized Zustand state atom per runtime
- Discord handlers become ingress only (fire-and-forget)
- all state transitions come from OpenCode events, not Discord-side guesses
- keep current behavior: abort active run on normal new user message
- keep queue behavior for
/queueand queue-command flows
Today handleOpencodeSession in @discord/src/session-handler.ts owns:
- prompt dispatch
- event stream lifecycle
- queue drain recursion
- typing timers
- permission/question/action side effects
- run completion and footer emission
Because each message can create/abort/replace an event handler, the code has to coordinate overlapping lifecycles with global mutable maps:
abortControllersmessageQueueactiveEventHandlerspendingPermissions- plus
threadMessageQueuein@discord/src/discord-bot.ts
This makes ordering and race behavior hard to reason about and hard to evolve.
OpenCode TUI keeps a long-lived event pipeline and derives UI state from events:
@.../tui/context/sdk.tsx- starts a persistent event subscription
- batches event emission
- reconnects in loop
@.../tui/context/sync.tsx- single centralized store
- event handlers update normalized state maps
@.../tui/component/prompt/index.tsx- prompt submit only dispatches calls
- stream ownership does not belong to submit call
@.../tui/routes/session/index.tsx- reads derived session state (
session_status, messages, parts)
- reads derived session state (
Kimaki should adopt the same principle:
Dispatch requests from ingress, but derive lifecycle truth from one always-on event stream per runtime.
After reading the full source (session-handler.ts 2668 lines, state.ts
232 lines, discord-bot.ts 1228 lines, and 9 command modules), the migration
is feasible. No architectural show-stopper exists. The blockers below are
all solvable within the staged plan.
B1: Monolithic handleOpencodeSession (2668 lines)
The function owns both event listener AND prompt dispatch in a single scope. All local variables (partBuffer, sentPartIds, typingInterval, subtaskSessions, usedModel, etc.) are closures over the function scope.
Resolution: Extract into a ThreadSessionRuntime class where:
- Event listener loop is a long-lived method (
startEventListener) - Prompt dispatch is a separate method (
dispatchNextQueueItem) - Closure state becomes instance fields and runtime-owned refs
B2: Queue drain via recursive handleOpencodeSession calls
Lines 2330-2374 (after run completion) and 2018-2067 (after question shown
with queued messages) call handleOpencodeSession recursively via
setImmediate. Each recursive call creates a NEW event listener.
Resolution: Queue drain becomes a state transition. When run ends or blocker
resolves, the runtime checks canDispatchNext and calls dispatchNextQueueItem
which sends the prompt through the existing listener — no recursion, no new
event subscription.
B3: Single AbortController for both listener and run
Current code (line 1186-1189) passes the run abort controller to
event.subscribe(). Aborting the run kills the listener.
Resolution: Two separate AbortControllers:
listenerController: only aborted on runtime dispose or reconnectrunController: aborted on run interrupt/finish, does NOT affect listener
The prompt/command call passes runController.signal. The event.subscribe
call passes listenerController.signal.
B4: activeEventHandlers serialization map
activeEventHandlers (line 274) ensures overlapping per-message handlers
don't collide. In the new model, there is exactly one listener per thread
runtime — this map becomes unnecessary.
Resolution: Delete the map. The runtime IS the single handler.
B5: pendingPermissions is a module-level global
pendingPermissions (line 213-226) is used inside the event handler AND
by the ingress code that auto-rejects permissions on new messages (lines
971-1011). It needs to be per-runtime, not global.
Resolution: Move to runtime refs (not Zustand — it's operational context,
not domain state). The runtime exposes getPendingPermissions() and
clearPendingPermissions() methods.
B6: threadMessageQueue vs messageQueue dual-queue confusion
threadMessageQueue in discord-bot.ts (line 123) serializes Discord message
arrival order. messageQueue in session-handler.ts (line 272)
is the /queue backlog. Both exist because the current model needs arrival-order
serialization OUTSIDE of the session handler.
Resolution: Keep threadMessageQueue as the ingress serialization layer in
discord-bot.ts during the migration. After Phase 3, it can optionally be folded
into the runtime's ingress. The messageQueue global becomes queueItems in
the runtime store.
state.tsMainRunStore is already Zustand-based — reuse directlyevent.subscribereturns an async iterable, can be kept alive indefinitely- SDK types support
{ signal: AbortSignal }option separately for subscribe and for prompt/command calls - permissions.ts, ask-question.ts, file-upload.ts have their own context maps and do NOT import session-handler globals — no migration needed for them
- model.ts only calls
abortAndRetrySession— easily adapted to runtime API
Discord Message / Slash Command
-> runtime-registry.getOrCreate(threadId)
-> runtime.enqueueIncoming(input, policy)
-> return immediately
ThreadSessionRuntime (one per thread)
-> ensureEventListenerStarted() [once]
-> dispatchLoop() [run-level abort + prompt/command dispatch]
-> onEvent(event) => setState(transition)
-> subscribe(effect reactor) => Discord side effects
These invariants are required for correctness during migration:
-
Session demux invariant
- run-scoped events must be ignored unless
event.sessionID === state.identity.sessionId - only explicitly global events bypass this guard
- run-scoped events must be ignored unless
-
Serialized mutation invariant
- all ingress actions and OpenCode events must flow through one internal
runtime action queue (
dispatchAction) to prevent interleaving writes
- all ingress actions and OpenCode events must flow through one internal
runtime action queue (
-
Idempotent output invariant
- Discord output dedupe keys are namespaced by session/run
- reconnect or stale events must not re-emit already-sent parts
-
Listener continuity invariant
- run abort never destroys listener
- listener reconnect is independent from run lifecycle
discord-bot.tsand command handlers do not inspect run internals- command handlers call runtime APIs only (
isBusy,abortActiveRun,enqueue) - runtime store is single source of truth for run and queue state
- side effects happen after transitions, not inside transition functions
Extend the existing global store at discord/src/store.ts and create runtime
modules under discord/src/session-handler/:
-
discord/src/store.ts(EXISTING — extend, do not create a new store)- Add
threads: Map<string, ThreadRunState>toKimakiState - This is the single Zustand store for the entire bot. Its header comment already says: "Future phases will move session Maps, server registry, and command pending contexts into this store."
- Add
-
discord/src/session-handler/thread-runtime-state.ts(NEW)ThreadRunStatetype definition- Pure transition functions (
updateThread,ensureThread,removeThread,enqueueItem,dequeueItem, blocker transitions, run state transitions) - Derived helpers (
isRunActive,canDispatchNext,isBusy, etc.) - All transitions operate on the global
storefrom../store.js - Read thread state inline:
store.getState().threads.get(threadId)
-
discord/src/session-handler/thread-session-runtime.ts(NEW)- Runtime class (thin): listener loop, dispatch, event handlers, resource handles
- Registry functions (
getRuntime,getOrCreateRuntime,disposeRuntime)
-
discord/src/session-handler/runtime-types.ts(NEW)- Shared types:
QueuedMessage,IngressOptions,RunFinishInfo
- Shared types:
session-handler.ts remains public adapter for backward compatibility, but most
logic moves into runtime modules.
Why extend the existing store, not create a new one. The codebase already has
a centralized Zustand store at discord/src/store.ts that holds global bot config.
Per the zustand-centralized-state pattern: one store is the single source of truth.
Creating a second store would split state, make cross-domain queries non-atomic,
and scatter subscribes. Adding threads to the existing store keeps everything
in one place. Read thread state inline with store.getState().threads.get(threadId).
The codebase already has a centralized Zustand store at discord/src/store.ts.
Extend it — do not create a second store.
// ── discord/src/store.ts (MODIFY — add threads Map) ──
import { createStore } from 'zustand/vanilla'
import type { VerbosityLevel } from './database.js'
import type { ThreadRunState } from './session-handler/thread-runtime-state.js'
export type RegisteredUserCommand = {
name: string
discordName: string
description: string
}
export type KimakiState = {
// ── Existing config state (unchanged) ──
dataDir: string | null
defaultVerbosity: VerbosityLevel
defaultMentionMode: boolean
critiqueEnabled: boolean
verboseOpencodeServer: boolean
discordBaseUrl: string
registeredUserCommands: RegisteredUserCommand[]
// ── NEW: per-thread runtime state ──
threads: Map<string, ThreadRunState>
}
export const store = createStore<KimakiState>(() => ({
dataDir: null,
defaultVerbosity: 'text-and-essential-tools' as VerbosityLevel,
defaultMentionMode: false,
critiqueEnabled: true,
verboseOpencodeServer: false,
discordBaseUrl: 'https://discord.com',
registeredUserCommands: [],
threads: new Map(),
}))// ── discord/src/session-handler/thread-runtime-state.ts (NEW) ──
// Per-thread state type, transition functions, and selectors.
// All transitions operate on the global store from ../store.js.
import { store } from '../store.js'
import type { MainRunState } from './state.js'
import type { QueuedMessage } from './runtime-types.js'
// ── Per-thread state (value inside the Map) ──────────────────────
export type ThreadRunState = {
sessionId: string | undefined
queueItems: QueuedMessage[]
blockers: {
permissionCount: number
questionCount: number
actionButtonsPending: boolean
fileUploadPending: boolean
}
// Run lifecycle state (previously a separate MainRunStore).
// Embedded here so one store is the single source of truth.
runState: MainRunState
// Co-located resource (mutable lifecycle — belongs in store per
// zustand skill rule: "mutable resources are state too").
runController: AbortController | undefined
}
// ── Initial state factory ────────────────────────────────────────
export function initialThreadState(): ThreadRunState {
return {
sessionId: undefined,
queueItems: [],
blockers: {
permissionCount: 0,
questionCount: 0,
actionButtonsPending: false,
fileUploadPending: false,
},
runState: {
phase: 'waiting-dispatch',
idleState: 'none',
baselineAssistantIds: new Set<string>(),
currentAssistantMessageId: undefined,
eventSeq: 0,
evidenceSeq: undefined,
deferredIdleSeq: undefined,
},
runController: undefined,
}
}
// ── Derived helpers (compute, never store) ───────────────────────
export function isRunActive(t: ThreadRunState): boolean {
const phase = t.runState.phase
return (
phase === 'collecting-baseline' ||
phase === 'dispatching' ||
phase === 'prompt-resolved'
)
}
export function hasQueue(t: ThreadRunState): boolean {
return t.queueItems.length > 0
}
export function hasBlockers(t: ThreadRunState): boolean {
const b = t.blockers
return (
b.permissionCount > 0 ||
b.questionCount > 0 ||
b.actionButtonsPending ||
b.fileUploadPending
)
}
export function canDispatchNext(t: ThreadRunState): boolean {
return (
t.sessionId !== undefined &&
hasQueue(t) &&
!isRunActive(t) &&
!hasBlockers(t)
)
}
export function isBusy(t: ThreadRunState): boolean {
return isRunActive(t) || hasQueue(t)
}
// ── Pure transition helpers ──────────────────────────────────────
// Immutable: produces new Map + new ThreadRunState object each time.
function updateThread(
threadId: string,
updater: (t: ThreadRunState) => ThreadRunState,
): void {
store.setState((s) => {
const existing = s.threads.get(threadId)
if (!existing) {
return s
}
const newThreads = new Map(s.threads)
newThreads.set(threadId, updater(existing))
return { threads: newThreads }
})
}
export function ensureThread(threadId: string): void {
if (store.getState().threads.has(threadId)) {
return
}
store.setState((s) => {
const newThreads = new Map(s.threads)
newThreads.set(threadId, initialThreadState())
return { threads: newThreads }
})
}
export function removeThread(threadId: string): void {
store.setState((s) => {
if (!s.threads.has(threadId)) {
return s
}
const newThreads = new Map(s.threads)
newThreads.delete(threadId)
return { threads: newThreads }
})
}
export function setSessionId(threadId: string, sessionId: string): void {
updateThread(threadId, (t) => ({ ...t, sessionId }))
}
export function enqueueItem(threadId: string, item: QueuedMessage): void {
updateThread(threadId, (t) => ({
...t,
queueItems: [...t.queueItems, item],
}))
}
export function dequeueItem(threadId: string): QueuedMessage | undefined {
const thread = store.getState().threads.get(threadId)
if (!thread || thread.queueItems.length === 0) {
return undefined
}
const [next, ...rest] = thread.queueItems
updateThread(threadId, (t) => ({ ...t, queueItems: rest }))
return next
}
export function clearQueueItems(threadId: string): void {
updateThread(threadId, (t) => ({ ...t, queueItems: [] }))
}
export function setRunController(
threadId: string,
controller: AbortController | undefined,
): void {
updateThread(threadId, (t) => ({ ...t, runController: controller }))
}
// ── Blocker transitions ──────────────────────────────────────────
export function incrementBlocker(
threadId: string,
blocker: 'permissionCount' | 'questionCount',
): void {
updateThread(threadId, (t) => ({
...t,
blockers: { ...t.blockers, [blocker]: t.blockers[blocker] + 1 },
}))
}
export function decrementBlocker(
threadId: string,
blocker: 'permissionCount' | 'questionCount',
): void {
updateThread(threadId, (t) => ({
...t,
blockers: { ...t.blockers, [blocker]: Math.max(0, t.blockers[blocker] - 1) },
}))
}
export function setBlockerFlag(
threadId: string,
flag: 'actionButtonsPending' | 'fileUploadPending',
value: boolean,
): void {
updateThread(threadId, (t) => ({
...t,
blockers: { ...t.blockers, [flag]: value },
}))
}
// ── Run state transitions ────────────────────────────────────────
export function updateRunState(
threadId: string,
updater: (rs: MainRunState) => MainRunState,
): void {
updateThread(threadId, (t) => ({
...t,
runState: updater(t.runState),
}))
}
// ── Queries ──────────────────────────────────────────────────────
export function getThreadState(threadId: string): ThreadRunState | undefined {
return store.getState().threads.get(threadId)
}
export function getThreadIds(): string[] {
return [...store.getState().threads.keys()]
}The runtime class does not own any Zustand stores. It holds resource handles
(listener controller, typing timers, part buffer) and calls transition functions
that operate on the global store from ../store.js.
// discord/src/session-handler/thread-session-runtime.ts (sketch)
import {
ensureThread,
removeThread,
getThreadState,
setRunController,
type ThreadRunState,
} from './thread-runtime-state.js'
// Runtime instances are kept in a plain Map (not Zustand — the Map
// is not reactive state, just a lookup for resource handles).
const runtimes = new Map<string, ThreadSessionRuntime>()
export function getRuntime(threadId: string): ThreadSessionRuntime | undefined {
return runtimes.get(threadId)
}
export function getOrCreateRuntime(opts: {
threadId: string
thread: ThreadChannel
projectDirectory: string
sdkDirectory: string
channelId?: string
appId?: string
}): ThreadSessionRuntime {
const existing = runtimes.get(opts.threadId)
if (existing) {
return existing
}
ensureThread(opts.threadId) // add to global store
const runtime = new ThreadSessionRuntime(opts)
runtimes.set(opts.threadId, runtime)
return runtime
}
export function disposeRuntime(threadId: string): void {
const runtime = runtimes.get(threadId)
if (!runtime) {
return
}
runtime.dispose()
runtimes.delete(threadId)
removeThread(threadId) // remove from global store
}
export function disposeRuntimesForDirectory({
directory,
channelId,
}: {
directory: string
channelId?: string
}): void {
for (const [threadId, runtime] of runtimes) {
if (runtime.projectDirectory !== directory) {
continue
}
if (channelId && runtime.channelId !== channelId) {
continue
}
runtime.dispose()
runtimes.delete(threadId)
removeThread(threadId)
}
}
class ThreadSessionRuntime {
readonly threadId: string
readonly projectDirectory: string
readonly sdkDirectory: string
readonly channelId?: string
readonly appId?: string
readonly thread: ThreadChannel
// Resource handles (not in Zustand — operational, not domain state)
private listenerController = new AbortController()
private typingInterval: NodeJS.Timeout | null = null
private typingRestartTimeout: NodeJS.Timeout | null = null
private sentPartIds = new Set<string>()
private partBuffer = new Map<string, Map<string, Part>>()
private subtaskSessions = new Map<string, { label: string }>()
// Read own state from global store
private get state(): ThreadRunState | undefined {
return getThreadState(this.threadId)
}
dispose(): void {
this.listenerController.abort()
this.state?.runController?.abort()
setRunController(this.threadId, undefined)
this.stopTyping()
}
// ... event listener loop, dispatch, ingress methods
// all call transition functions from thread-runtime-state.ts
// like enqueueItem(this.threadId, ...)
}// Inside ThreadSessionRuntime class
// discord/src/session-handler/thread-session-runtime.ts
private listenerController = new AbortController()
private runController: AbortController | null = null
async startEventListener(): Promise<void> {
const client = getOpencodeClient(this.projectDirectory)
if (!client) {
throw new Error(
`No OpenCode client for directory: ${this.projectDirectory}`,
)
}
// Reconnect loop with backoff
let backoffMs = 500
const maxBackoffMs = 30_000
while (!this.listenerController.signal.aborted) {
const subscribeResult = await errore.tryAsync(() => {
return client.event.subscribe(
{ directory: this.sdkDirectory },
{ signal: this.listenerController.signal },
)
})
if (subscribeResult instanceof Error) {
if (isAbortError(subscribeResult)) {
return // disposed
}
logger.warn(
`[LISTENER] Subscribe failed, retrying in ${backoffMs}ms:`,
subscribeResult.message,
)
await delay(backoffMs)
backoffMs = Math.min(backoffMs * 2, maxBackoffMs)
continue
}
backoffMs = 500 // reset on success
const events = subscribeResult.stream
const iterResult = await errore.tryAsync(async () => {
for await (const event of events) {
await this.handleEvent(event)
}
})
if (iterResult instanceof Error) {
if (isAbortError(iterResult)) {
return // disposed
}
logger.warn(
`[LISTENER] Stream broke, reconnecting in ${backoffMs}ms:`,
iterResult.message,
)
await delay(backoffMs)
backoffMs = Math.min(backoffMs * 2, maxBackoffMs)
}
}
}
private async handleEvent(event: OpenCodeEvent): Promise<void> {
const sessionId = this.store.getState().sessionId
// ── Session demux guard ──────────────────────────────────────
// Events scoped to a session must match the current session.
// Global events (tui.toast.show) bypass the guard.
// IMPORTANT: sessionID lives at different paths per event type:
// message.updated → event.properties.info.sessionID
// message.part.updated → event.properties.part.sessionID
// session.* → event.properties.sessionID
// permission.* → event.properties.sessionID
// question.* → event.properties.sessionID
const eventSessionId = (() => {
switch (event.type) {
case 'message.updated':
return event.properties.info?.sessionID as string | undefined
case 'message.part.updated':
return event.properties.part?.sessionID as string | undefined
default:
return event.properties?.sessionID as string | undefined
}
})()
const isGlobalEvent = event.type === 'tui.toast.show'
if (!isGlobalEvent && eventSessionId && eventSessionId !== sessionId) {
// Check subtask sessions before discarding
if (!this.subtaskSessions.has(eventSessionId)) {
return // stale event from previous session
}
}
switch (event.type) {
case 'message.updated':
await this.handleMessageUpdated(event.properties.info)
break
case 'message.part.updated':
await this.handlePartUpdated(event.properties.part)
break
case 'session.idle':
this.handleSessionIdle(event.properties.sessionID)
break
case 'session.error':
await this.handleSessionError(event.properties)
break
case 'permission.asked':
await this.handlePermissionAsked(event.properties)
break
case 'permission.replied':
this.handlePermissionReplied(event.properties)
break
case 'question.asked':
await this.handleQuestionAsked(event.properties)
break
case 'session.status':
await this.handleSessionStatus(event.properties)
break
case 'tui.toast.show':
await this.handleTuiToast(event.properties)
break
default:
break
}
}// Inside ThreadSessionRuntime class
// Ensures all mutations (ingress + events) are serialized.
private actionQueue: Array<() => Promise<void>> = []
private processingAction = false
async dispatchAction(action: () => Promise<void>): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.actionQueue.push(async () => {
const result = await errore.tryAsync(action)
if (result instanceof Error) {
reject(result)
return
}
resolve()
})
void this.processActionQueue()
})
}
private async processActionQueue(): Promise<void> {
if (this.processingAction) {
return
}
this.processingAction = true
while (this.actionQueue.length > 0) {
const next = this.actionQueue.shift()!
await next()
}
this.processingAction = false
}// discord/src/discord-bot.ts — thread message handler
// BEFORE (current):
// signalThreadInterrupt({ threadId, serverDirectory, sdkDirectory })
// handleOpencodeSession({ prompt, thread, ... })
//
// AFTER:
import { getOrCreateRuntime } from './session-handler/thread-session-runtime.js'
async function processThreadMessage() {
const resolved = await resolveWorkingDirectory({ channel: thread })
if (!resolved) {
return
}
const runtime = getOrCreateRuntime({
threadId: thread.id,
thread,
projectDirectory: resolved.projectDirectory,
sdkDirectory: resolved.workingDirectory,
channelId: parent?.id,
appId: currentAppId,
})
// Normal message: interrupt active run, then enqueue
await runtime.enqueueIncoming({
prompt: messageContent,
userId: message.author.id,
username: message.member?.displayName || message.author.displayName,
images: fileAttachments,
appId: currentAppId,
interruptActive: true,
})
}
// discord/src/commands/queue.ts — /queue command
// BEFORE: abortControllers.get(sessionId), addToQueue(...)
// AFTER:
import { getRuntime } from '../session-handler/thread-session-runtime.js'
const runtime = getRuntime(thread.id)
if (!runtime) {
// No runtime = no active session, start one
// ...existing fallback to getOrCreateRuntime + enqueue
}
runtime.enqueueIncoming({
prompt,
userId,
username,
interruptActive: false, // /queue does NOT interrupt
})
// discord/src/commands/abort.ts — /abort command
// BEFORE: abortControllers.get(sessionId)?.abort(...)
// AFTER:
const runtime = getRuntime(thread.id)
if (!runtime) {
await interaction.followUp({ content: 'No active session' })
return
}
runtime.abortActiveRun('user-abort')
// discord/src/commands/model.ts — model change mid-run
// BEFORE: abortAndRetrySession({ sessionId, thread, ... })
// AFTER:
const runtime = getRuntime(thread.id)
if (runtime) {
await runtime.retryLastUserPrompt()
}
// discord/src/commands/restart-opencode-server.ts
// BEFORE: iterate abortControllers, find matching sessions
// AFTER:
import { disposeRuntimesForDirectory } from
'../session-handler/thread-session-runtime.js'
// Pass channelId to scope disposal — don't kill runtimes in other channels
disposeRuntimesForDirectory({ directory: projectDirectory, channelId })
await restartOpencodeServer(projectDirectory)// Inside ThreadSessionRuntime class
/** Called after run finishes OR after a blocker resolves. */
private async tryDrainQueue(): Promise<void> {
const thread = getThreadState(this.threadId)
if (!thread || !canDispatchNext(thread)) {
return
}
const next = dequeueItem(this.threadId)
if (!next) {
return
}
logger.log(
`[QUEUE DRAIN] Processing queued message from ${next.username}`,
)
// Show queued message indicator
const displayText = next.command
? `/${next.command.name}`
: `${next.prompt.slice(0, 150)}${next.prompt.length > 150 ? '...' : ''}`
await sendThreadMessage(
this.thread,
`» **${next.username}:** ${displayText}`,
)
// Dispatch through the existing listener — NO new event.subscribe
await this.dispatchPrompt({
prompt: next.prompt,
images: next.images,
username: next.username,
userId: next.userId,
appId: next.appId,
command: next.command,
})
}
/** Called from event handler when session.idle arrives and run finishes. */
private onRunFinished(): void {
// Emit footer first, then try to drain queue
void this.emitFooter().then(() => {
return this.tryDrainQueue()
})
}
/** Called when a permission/question blocker resolves. */
private onBlockerResolved(): void {
void this.tryDrainQueue()
}| New file | What goes in it | Extracted from |
|---|---|---|
discord/src/session-handler/thread-runtime-state.ts |
ThreadRunState type, transition functions, derived helpers. Operates on global store from ../store.js. |
new + state.ts concepts |
discord/src/session-handler/thread-session-runtime.ts |
Runtime class (thin): listener loop, dispatch, event handlers, resource handles. Registry functions (getRuntime, getOrCreateRuntime, disposeRuntime). | session-handler.ts lines 1186-2382 |
discord/src/session-handler/runtime-types.ts |
QueuedMessage, IngressOptions, RunFinishInfo types | session-handler.ts lines 259-268 |
| File | What changes | Lines affected |
|---|---|---|
store.ts |
Add threads: Map<string, ThreadRunState> to KimakiState. Import ThreadRunState from ./session-handler/thread-runtime-state.js. |
Lines 12, 24-33, 35-43 |
session-handler.ts |
Remove: abortControllers, messageQueue, activeEventHandlers, pendingPermissions globals. handleOpencodeSession becomes thin adapter calling runtime. Keep exported API signatures (queueOrSendMessage, abortAndRetrySession, signalThreadInterrupt) as wrappers over runtime-registry calls. |
Lines 86, 213-226, 272-274 (globals), lines 783-2668 (main function) |
session-handler/state.ts |
MainRunState type reused. Transition functions (beginPromptCycle, markDispatching, etc.) adapted to work with updateRunState(threadId, updater) instead of requiring a StoreApi<MainRunState>. Keep as pure functions taking MainRunState and returning MainRunState. |
Function signatures change from (store: MainRunStore) to (state: MainRunState) => MainRunState |
discord-bot.ts |
Replace handleOpencodeSession calls with getOrCreateRuntime + enqueueIncoming. Remove signalThreadInterrupt calls (runtime handles interrupt internally). Keep threadMessageQueue as ingress serializer through Phase 3.5. |
Lines 468-496 (thread queue), 551-565 (first session), 653-669 (existing session), 833-843 (channel message) |
commands/abort.ts |
Replace abortControllers.get(sessionId) with getRuntime(threadId)?.abortActiveRun(). |
~5 lines |
commands/queue.ts |
Replace abortControllers, addToQueue, getQueueLength, clearQueue, queueOrSendMessage with runtime API calls. |
~40 lines |
commands/action-buttons.ts |
Replace abortControllers.get(sessionId) + addToQueue + handleOpencodeSession with getRuntime + enqueueIncoming. |
~15 lines |
commands/merge-worktree.ts |
Replace abortControllers.get(sessionId) + addToQueue + handleOpencodeSession with getRuntime + enqueueIncoming. |
~15 lines |
commands/restart-opencode-server.ts |
Replace abortControllers iteration with disposeRuntimesForDirectory(). |
~20 lines |
commands/model.ts |
Replace abortAndRetrySession with getRuntime(threadId)?.retryLastUserPrompt(). |
~5 lines |
commands/unset-model.ts |
Replace abortAndRetrySession with getRuntime(threadId)?.retryLastUserPrompt(). |
~5 lines |
commands/permissions.ts |
No changes needed (self-contained with own context maps). | None |
commands/ask-question.ts |
No changes needed (self-contained). | None |
commands/file-upload.ts |
No changes needed (self-contained). | None |
No files are deleted. session-handler.ts shrinks dramatically but keeps its
exported API surface as thin wrappers.
Extend the existing global store at discord/src/store.ts. Add a
threads: Map<string, ThreadRunState> field alongside the existing config
fields. One store, one source of truth for everything.
// discord/src/store.ts — KimakiState (extended)
type KimakiState = {
// ... existing config fields (dataDir, defaultVerbosity, etc.) ...
threads: Map<string, ThreadRunState> // NEW
}
// Per-thread value inside the Map
type ThreadRunState = {
sessionId?: string
queueItems: QueuedMessage[]
blockers: {
permissionCount: number
questionCount: number
actionButtonsPending: boolean
fileUploadPending: boolean
}
runState: MainRunState // embedded, not separate store
runController: AbortController | undefined // mutable resource = state
}Operational resources that don't drive reactive side effects:
- immutable thread metadata (
threadId,projectDirectory,sdkDirectory) - listener abort controller and reconnect backoff counters
- typing interval/restart timeout handles
- part buffer and dedupe sets (
sentPartIds,partBuffer) - transient subtask label/session maps
usedModel,usedProviderID,usedAgent,tokensUsedInSession(per-run caches, reset on each new prompt dispatch)lastDisplayedContextPercentage,lastRateLimitDisplayTime(per-run)- early-resolved agent/model snapshots (per-dispatch)
isRunActive(t): derived fromt.runState.phasehasQueue(t):t.queueItems.length > 0hasBlockers(t): derived fromt.blockerscanDispatchNext(t):t.sessionId && hasQueue && !isRunActive && !hasBlockersisBusy(t):isRunActive || hasQueue
Read thread state inline wherever needed:
const thread = store.getState().threads.get(threadId)Do not persist these in Zustand:
processingbooleans for queue/dispatch (derived)- run start timestamps used only for footer formatting (derive from message/event)
usedModel,usedProviderID,usedAgent,tokensUsedcounters (derive from last assistant message and provider metadata when needed)- listener status/retry counters unless required for user-visible behavior (prefer logs/metrics)
- typing status flags (derive from run + blocker state; keep only timer handles as refs)
abortControllersmapmessageQueuemapactiveEventHandlersmappendingPermissionsmapthreadMessageQueuemap indiscord-bot.ts
Expose these methods from runtime instance:
enqueueIncoming(input, options)options.interruptActive: boolean- normal messages use
true /queueusesfalse
isBusy()abortActiveRun(reason)retryLastUserPrompt(options)for model-change flowgetQueueLength()clearQueue()dispose()
This replaces direct map usage in all command modules.
- listener starts once when runtime is created or first ingress arrives
- listener remains alive across multiple runs
- listener reconnects on transient disconnects with backoff
- listener never restarts because a new user message arrived
- on ingress with
interruptActive=true:- abort run controller
- call OpenCode
session.abortbest-effort - enqueue message
- dispatch loop sends next queued message when run is idle
- completion comes from event timeline and deferred-idle state machine
Use one explicit policy and keep it stable across migration:
- default policy: block dispatch while question/permission is pending
- interrupting ingress can still abort active run and enqueue
- queue drains only when blocker is resolved or cancelled by policy action
This policy must be implemented via transition guards, not ad-hoc checks.
Behavior change note: Current code in handleQuestionAsked (session-handler.ts
line 2018) immediately drains the next queued message when a question is shown
(aborting the question). The plan changes this to block dispatch during blockers.
This is an intentional simplification — the current behavior is surprising (user
sees question, then it gets auto-dismissed by queue drain). Freeze this decision
before Phase 3 to avoid ambiguous regressions.
After listener reconnect, runtime must reconcile with authoritative APIs:
- fetch session status/messages snapshot
- repair run state if stream events were missed
- if recovery cannot prove progress, move run to terminal error path and continue queue processing
permission.asked/permission.repliedquestion.askedaction-buttonsrequest from IPC queue- these update runtime interaction state first, then effect layer renders UI
Use pure transitions per event type:
message.updated- update message evidence for current run
- update model/provider/agent/tokens
message.part.updated- buffer part
- mark run evidence
session.status- store retry status metadata (for throttled notices)
session.idle- pass through deferred-idle decision flow
- mark run finished only when evidence constraints are satisfied
session.error- mark run error, preserve payload for side-effect reporting
All side effects (Discord sends, button rendering, footer) happen in the subscribe reactor after transition commits.
Files:
@discord/src/thread-message-queue.e2e.test.ts
Tasks:
- run existing queue/interrupt e2e tests to confirm green baseline
- document current behavior as reference for parity checks
Acceptance criteria:
-
pnpm tscpasses insidediscord/ - existing e2e tests green:
pnpm vitest --run src/thread-message-queue.e2e.test.ts
Files:
@discord/src/store.ts(MODIFY — addthreadsfield)- new
@discord/src/session-handler/thread-runtime-state.ts - new
@discord/src/session-handler/thread-session-runtime.ts - new
@discord/src/session-handler/runtime-types.ts @discord/src/session-handler/state.ts@discord/src/session-handler.ts
Tasks:
- add
runtime-types.tsfirst — moveQueuedMessagetype there to avoid coupling back to session-handler.ts - extend existing
store.ts:- add
threads: Map<string, ThreadRunState>toKimakiState - import
ThreadRunStatefrom./session-handler/thread-runtime-state.js
- add
- add
thread-runtime-state.tswith transition functions and derived helpers — all operating on the globalstore- embed
MainRunStateinsideThreadRunState(one store, not two) - co-locate
runController: AbortController | undefinedper thread
- embed
- adapt
state.tstransition functions to pure form:(state: MainRunState) => MainRunStateinstead of(store: MainRunStore) => voidso they work withupdateRunState(threadId, updater) - add skeleton ThreadSessionRuntime class with empty method stubs
- add registry functions (
getRuntime,getOrCreateRuntime,disposeRuntime) inthread-session-runtime.ts - add thin adapter in
handleOpencodeSessionthat can route to runtime - keep old flow behind compatibility switch while wiring APIs
- enforce state budget rule: every store field must document why it cannot be derived; reject fields that are only cache/telemetry
Acceptance criteria:
-
pnpm tscpasses insidediscord/ -
thread-runtime-state.test.tscovers: enqueue/dequeue, derived helpers, blocker increment/decrement, canDispatchNext edge cases - no behavior change — old path still used
Files:
@discord/src/session-handler/thread-session-runtime.ts@discord/src/session-handler/thread-runtime-state.ts@discord/src/session-handler/state.ts(reuse transition logic)
Tasks:
- start one persistent
event.subscribeloop per runtime (code from §7.3) - add strict session demux guards for all run-scoped events
- implement internal serialized action queue (
dispatchAction, code from §7.4) - separate run abort controller from listener controller (blocker B3)
- move event handler logic from
session-handler.tseventHandler closure into runtime methods (handleMessageUpdated, handlePartUpdated, etc.) - add reconnect reconciliation: after listener reconnect, fetch session status/messages snapshot to repair run state if events were missed
Key implementation detail — two abort controllers:
// listenerController: on runtime class instance, lives for runtime lifetime
// runController: in global store (ThreadRunState.runController), per-prompt
setRunController(this.threadId, new AbortController())
const rc = getThreadState(this.threadId)?.runController
// prompt call passes runController.signal
await client.session.prompt({...}, { signal: rc!.signal })
// event.subscribe passes listenerController.signal (on class instance)
await client.event.subscribe({...}, { signal: this.listenerController.signal })Acceptance criteria:
-
pnpm tscpasses - runtime unit test: start listener, send 2 prompts, listener stays alive
- runtime unit test: event with wrong sessionID is dropped (demux guard)
- no overlapping per-message handlers
- new message no longer restarts listener
Files:
@discord/src/discord-bot.ts@discord/src/commands/queue.ts@discord/src/commands/action-buttons.ts@discord/src/commands/merge-worktree.ts@discord/src/session-handler.ts
Tasks:
- route thread messages and queue-command through
runtime.enqueueIncoming(code from §7.5) - route interrupt path through
runtime.abortActiveRun - implement queue drain logic in runtime (code from §7.6)
- keep
threadMessageQueueas temporary guard until parity gate passes
Acceptance criteria:
-
pnpm tscpasses - ingress paths use runtime APIs without behavior regressions
- e2e test: rapid B/C/D messages preserve order guarantees
- e2e test:
/queueadds without interrupt
Files:
@discord/src/session-handler/thread-session-runtime.ts@discord/src/thread-message-queue.e2e.test.ts
Tasks:
- add temporary runtime counters/logs:
- listener start/restart count
- stale-event drop count
- queued/dequeued counts
- interrupt counts
- run full queue/interrupt parity suite
- only after parity: remove
threadMessageQueue
Acceptance criteria:
- parity suite green with runtime-only ingress
- counters show no duplicate listener starts per run
-
threadMessageQueueremoved from discord-bot.ts
Files:
@discord/src/commands/abort.ts@discord/src/commands/restart-opencode-server.ts@discord/src/commands/queue.ts@discord/src/commands/model.ts@discord/src/commands/unset-model.ts@discord/src/commands/action-buttons.ts@discord/src/commands/merge-worktree.ts
Tasks:
- replace global map reads with runtime calls (code from §7.5)
- migrate
abortAndRetrySessionto runtime retry API
Command-to-runtime API mapping:
| Command | Current global | Runtime API call |
|---|---|---|
/abort |
abortControllers.get(sessionId) |
getRuntime(threadId)?.abortActiveRun('user-abort') |
/queue |
abortControllers, addToQueue, getQueueLength, clearQueue |
runtime.enqueueIncoming(...), runtime.getQueueLength(), runtime.clearQueue() |
/queue-command |
addToQueue, handleOpencodeSession |
runtime.enqueueIncoming({ command: {...} }) |
| action-buttons click | abortControllers.get(sessionId), addToQueue |
runtime.enqueueIncoming({ interruptActive: false }) |
| merge-worktree conflict | abortControllers.get(sessionId), addToQueue, handleOpencodeSession |
runtime.enqueueIncoming({ interruptActive: false }) |
| model change | abortAndRetrySession(...) |
runtime.retryLastUserPrompt() |
| unset-model | abortAndRetrySession(...) |
runtime.retryLastUserPrompt() |
| restart-opencode-server | iterate abortControllers |
disposeRuntimesForDirectory({ directory, channelId }) |
Acceptance criteria:
-
pnpm tscpasses - commands no longer import global mutable maps
- grep for
abortControllersin commands/ returns 0 results - grep for
messageQueuein commands/ returns 0 results
Files:
@discord/src/session-handler.ts@discord/src/discord-bot.ts
Tasks:
- delete legacy maps (
abortControllers,messageQueue,activeEventHandlers,pendingPermissions) - delete recursive queue drain calls to
handleOpencodeSession - keep exported API signatures stable where possible (wrappers over registry)
Acceptance criteria:
-
pnpm tscpasses - runtime is sole owner of queue/run/listener state
- grep for
new Mapin session-handler.ts returns 0 module-level Maps - all e2e tests green
Files:
-
@discord/src/session-handler/thread-session-runtime.tsTasks: -
ensure typing interval + restart timeout cleanup in all terminal paths
-
add reconnect backoff and stale runtime diagnostics
-
implement mandatory runtime disposal on:
- thread archive/delete
- restart-opencode-server channel scope
- bot shutdown (iterate registry, dispose all)
-
remove temporary parity counters unless they're useful as permanent metrics
Acceptance criteria:
- no stuck typing
- listener survives multiple runs in same thread
- no leaked listeners/timers/controllers after dispose paths
-
handleShutdownin discord-bot.ts disposes all runtimes
Side effects can be triggered directly from event handlers after state
transitions (simplest approach). A separate store.subscribe() reactor is
optional — only introduce it if testing or debugging requires decoupling
state transitions from effects. Either way, these responsibilities exist:
- Discord chunk sends for parts
- context usage notices and retry notices
- permission/question/action UI rendering
- final footer emission
- queue dequeue trigger when run transitions to terminal state
- typing indicator lifecycle
Avoid side effects in ingress handlers except initial ack messages.
Run during each phase:
pnpm tsc(insidediscord/)pnpm vitest --run src/thread-message-queue.e2e.test.ts- runtime unit tests once added:
pnpm vitest --run src/session-handler/thread-runtime-state.test.tspnpm vitest --run src/session-handler/thread-session-runtime.test.ts
Key scenarios:
- rapid B/C/D messages preserve order guarantees
- interrupt during tool run (
sleep 500) still lets next message complete - deferred idle cannot prematurely finish a new run
/queueadds without interrupt/abortcancels active run without killing listener- model change mid-run aborts and retries using same runtime
- reconnect during active run does not duplicate outputs and does not deadlock
- stale old-session events are ignored after new run starts
- question/permission pending + queued message follows explicit blocker policy
- restart-opencode-server aborts all matching runtimes in channel scope
- action button click while busy queues correctly
- thread archive/delete disposes runtime cleanly
| Risk | Likelihood | Impact | Mitigation |
|---|---|---|---|
| Listener reconnect storm under provider outage | Medium | High — floods provider with subscribe calls | Bounded exponential backoff (500ms → 30s), max 3 reconnects per minute, log counter per runtime |
| Stale runtimes accumulating over long bot uptime | Low | Medium — memory/connection leak | Add optional TTL-based cleanup; dispose on thread archive event; log runtime count periodically |
| Duplicate Discord output from replayed parts after reconnect | Medium | High — confusing UX | Preserve sentPartIds across reconnects (it's a runtime ref, not reset); idempotent part flush |
| Interaction deadlocks (question pending + queue grows) | Low | High — session stuck forever | Blocker guard in canDispatchNext; ingress with interruptActive: true always aborts+enqueues regardless of blockers |
| Regression during Phase 3 when ingress switches to runtime | High | High — broken queue ordering | Keep threadMessageQueue as safety net until Phase 3.5 parity gate passes with counters proving correctness |
| handleOpencodeSession callers outside session-handler.ts | Medium | Medium — compilation errors | Phase 1 keeps handleOpencodeSession as thin wrapper; callers migrated file-by-file in Phases 3-4 |
| Two-controller abort race (run controller fires, listener still open, events arrive for aborted run) | Medium | Medium — unexpected events processed | Session demux guard (§7.3) drops events for non-current sessionID. Mark runState as aborted/finished before aborting run controller. |
| Session creation race (two ingress arrive before session exists) | Low | Medium — double session creation | dispatchAction serialization (§7.4) ensures session create+set is atomic within the runtime |
| Footer emission race with queue drain | Low | Low — footer appears after next prompt starts | Footer emission is awaited in onRunFinished before calling tryDrainQueue |
| Connection budget exhaustion from long-lived SSE listeners | Medium | High — undici pool blocks regular HTTP calls | Monitor active listener count; undici pool is set to 500 connections (discord-bot.ts:113). Add runtime count metric; dispose stale runtimes proactively |
| Channel-scope restart regression | High | High — kills runtimes in wrong channels | disposeRuntimesForDirectory takes optional channelId to scope disposal (fixed in §7.1) |
| Worktree/directory drift — runtime retains stale sdkDirectory | Low | Medium — commands run in wrong directory | On each dispatch, re-resolve sdkDirectory from DB worktree metadata instead of caching at creation time |
| Unbounded queue growth under long blockers/outages | Low | Medium — memory + latency degradation | Add configurable max queue size (e.g. 50); reject with user message when exceeded |
- listener reconnect storm
- use bounded backoff and log counters per runtime
- stale runtimes over long uptime
- keep alive by design now; add optional stale cleanup later
- duplicate Discord output from replayed parts
- preserve
sentPartIdsand idempotent part flush checks
- preserve
- interaction deadlocks (question/permission + queued message)
- queue decisions come from runtime transitions only
-
Should
threadMessageQueuebe absorbed into the runtime? The plan keeps it as an external serialization layer through Phase 3.5. After parity, decide: keep it permanently (simpler, proven for arrival ordering) or move all pre-processing into runtime so serialization has one owner. Recommendation: keep external — it's simple and proven. -
Runtime lifecycle for notify-only threads. Should a runtime be created for threads that only receive notification messages (no session)? No — only create runtimes lazily on first actionable session ingress.
-
Queue policy on question.asked — block or drain? Current code immediately drains the queue when question is shown, dismissing the question. The plan changes to blocking dispatch during pending blockers. This is an intentional behavior change that should be confirmed before Phase 3.
- one long-lived event listener per thread runtime
- no per-message event subscribe in
handleOpencodeSession - no global queue/abort/handler maps
- commands use runtime APIs only
- e2e queue + interrupt behaviors unchanged
- code reads as: ingress -> transition -> effects