Skip to content

Instantly share code, notes, and snippets.

@rebolyte
Last active February 28, 2026 07:50
Show Gist options
  • Select an option

  • Save rebolyte/e4c5f9e6a34fa82d1ac3ae6289c5ca5d to your computer and use it in GitHub Desktop.

Select an option

Save rebolyte/e4c5f9e6a34fa82d1ac3ae6289c5ca5d to your computer and use it in GitHub Desktop.
Actor
export const Done = Symbol("Done");
export type CrashDirective = "resume" | "reset" | "stop" | "escalate";
export type ActorRef<M> = {
send(message: M): void;
stop(): void;
readonly name: string;
};
export type Ctx<M> = {
self: ActorRef<M>;
spawn: <S2, M2>(config: SpawnConfig<S2, M2>) => ActorRef<M2>;
lookup: <M2>(name: string) => ActorRef<M2>;
};
export type SpawnConfig<S, M> = {
name: string;
initialState: S;
onMessage: (state: S, message: M, ctx: Ctx<M>) => Promise<S | typeof Done>;
onCrash?: (error: unknown, message: M, ctx: Ctx<M>) => CrashDirective;
onDone?: (ctx: Ctx<M>) => void;
};
export type ActorSystem = {
spawn: <S, M>(config: SpawnConfig<S, M>) => ActorRef<M>;
lookup: <M>(name: string) => ActorRef<M>;
stop(): void;
};
export function createSystem(): ActorSystem {
const registry = new Map<string, ActorRef<any>>();
const topLevel = new Set<ActorRef<any>>();
let stopped = false;
function lookup<M>(name: string): ActorRef<M> {
const ref = registry.get(name);
if (!ref) throw new Error(`Actor not found: ${name}`);
return ref;
}
function spawnActor<S, M>(
config: SpawnConfig<S, M>,
parent: Set<ActorRef<any>>,
onEscalate?: (error: unknown) => void,
): ActorRef<M> {
if (registry.has(config.name)) {
throw new Error(`Actor already exists: ${config.name}`);
}
let state = config.initialState;
let processing = false;
let alive = true;
const queue: M[] = [];
const children = new Set<ActorRef<any>>();
function handleChildEscalation(error: unknown) {
// child escalated to us — run our own onCrash as if we crashed
const directive = config.onCrash?.(error, undefined as any, ctx) ?? "stop";
applyDirective(directive, error);
}
function applyDirective(directive: CrashDirective, error: unknown) {
switch (directive) {
case "resume":
// keep state, keep going — drop the bad message
break;
case "reset":
// restart with initial state, keep children, keep mailbox
state = config.initialState;
break;
case "stop":
self.stop();
break;
case "escalate":
if (onEscalate) {
onEscalate(error);
} else {
// top-level with no handler — stop
console.error(`Unhandled escalation in actor "${config.name}":`, error);
self.stop();
}
break;
}
}
const self: ActorRef<M> = {
name: config.name,
send(message: M) {
if (!alive) return; // dead letter
queue.push(message);
drain();
},
stop() {
if (!alive) return;
alive = false;
for (const child of children) child.stop();
children.clear();
queue.length = 0;
registry.delete(config.name);
parent.delete(self);
},
};
const ctx: Ctx<M> = {
self,
spawn: <S2, M2>(childConfig: SpawnConfig<S2, M2>) =>
spawnActor(childConfig, children, handleChildEscalation),
lookup,
};
async function drain() {
if (processing || !alive) return;
processing = true;
while (queue.length > 0 && alive) {
const msg = queue.shift()!;
try {
const result = await config.onMessage(state, msg, ctx);
if (result === Done) {
config.onDone?.(ctx);
self.stop();
return;
}
state = result as S;
} catch (error) {
const directive = config.onCrash?.(error, msg, ctx) ?? "stop";
applyDirective(directive, error);
if (!alive) return;
}
}
processing = false;
}
registry.set(config.name, self);
parent.add(self);
return self;
}
return {
spawn: <S, M>(config: SpawnConfig<S, M>) => {
if (stopped) throw new Error("System is stopped");
return spawnActor(config, topLevel);
},
lookup,
stop() {
stopped = true;
for (const ref of topLevel) ref.stop();
topLevel.clear();
},
};
}
// ask: send a message expecting a reply, using a temp actor
export function ask<M, R>(
system: ActorSystem,
target: ActorRef<M>,
factory: (replyTo: ActorRef<R>) => M,
timeoutMs = 5000,
): Promise<R> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
ref.stop();
reject(new Error("Ask timed out"));
}, timeoutMs);
const ref = system.spawn<undefined, R>({
name: `_ask_${crypto.randomUUID()}`,
initialState: undefined,
onMessage: async (_state, reply) => {
clearTimeout(timer);
resolve(reply);
return Done;
},
});
target.send(factory(ref));
});
}
import { createSystem, ask, Done, type ActorRef } from "./actors";
const system = createSystem();
// -- Example: ping-pong with a counter that stops after 3 --
type PingMsg = { type: "ping"; replyTo: ActorRef<PongMsg> };
type PongMsg = { type: "pong"; count: number };
const ponger = system.spawn<null, PingMsg>({
name: "ponger",
initialState: null,
onMessage: async (_state, msg) => {
// just reply — stateless
msg.replyTo.send({ type: "pong", count: 1 });
return null;
},
});
const pinger = system.spawn<number, PongMsg>({
name: "pinger",
initialState: 0,
onMessage: async (count, _msg, ctx) => {
const next = count + 1;
console.log(`pinger: received pong #${next}`);
if (next >= 3) return Done;
ponger.send({ type: "ping", replyTo: ctx.self });
return next;
},
onDone: () => console.log("pinger: done after 3 pongs"),
});
// kick it off
ponger.send({ type: "ping", replyTo: pinger });
// -- Example: ask pattern --
type MathMsg = { op: "add"; a: number; b: number; replyTo: ActorRef<number> };
const calculator = system.spawn<null, MathMsg>({
name: "calculator",
initialState: null,
onMessage: async (_state, msg) => {
msg.replyTo.send(msg.a + msg.b);
return null;
},
});
const result = await ask<MathMsg, number>(system, calculator, (replyTo) => ({
op: "add",
a: 17,
b: 25,
replyTo,
}));
console.log(`ask result: 17 + 25 = ${result}`);
// -- Example: parent-child hierarchy --
const parent = system.spawn<null, string>({
name: "parent",
initialState: null,
onMessage: async (_state, msg, ctx) => {
if (msg === "spawn-child") {
ctx.spawn({
name: "child",
initialState: 0,
onMessage: async (n, _msg) => {
console.log(`child: message #${n + 1}`);
return n + 1;
},
});
}
return null;
},
});
parent.send("spawn-child");
// stopping parent also stops child
setTimeout(() => {
parent.stop();
console.log("parent stopped (child stopped with it)");
system.stop();
}, 100);
// -- fromCallback: wrap setInterval --
const ticker = fromCallback(system, "ticker", ({ send }) => {
let i = 0;
const id = setInterval(() => send(++i), 1000);
return () => clearInterval(id); // cleanup
});
setTimeout(() => ticker.stop(), 3500);
// -- fromCallback: bidirectional (WebSocket-style) --
type WsOut = { type: "send"; data: string };
type WsIn = { type: "message"; data: string };
const ws = fromCallback<WsIn, WsOut>(system, "ws", ({ send, receive }) => {
// simulate a socket
const fakeSocket = {
onmessage: null as ((data: string) => void) | null,
send(data: string) { console.log("socket sent:", data); },
close() { console.log("socket closed"); },
};
// inbound: socket → actor system
fakeSocket.onmessage = (data) => send({ type: "message", data });
// outbound: actor system → socket
receive((msg) => {
if (msg.type === "send") fakeSocket.send(msg.data);
});
return () => fakeSocket.close();
});
// other actors can now send TO the websocket actor
ws.send({ type: "send", data: "hello server" });
// -- fromPromise: one-shot async work --
const logger = system.spawn<null, any>({
name: "logger",
initialState: null,
onMessage: async (_s, msg) => {
console.log("logger received:", msg);
return null;
},
});
fromPromise(
system,
"fetch-user",
async () => {
// simulate API call
return { id: 1, name: "James" };
},
logger,
);
// logger receives: { type: "resolved", value: { id: 1, name: "James" } }
// -- fromObservable: any Subscribable shape --
// minimal observable (works with RxJS, wonka, zen-observable, etc)
const fakeObservable = {
subscribe(
next: (v: string) => void,
_error?: (e: unknown) => void,
complete?: () => void,
) {
next("hello");
next("world");
setTimeout(() => {
next("async one");
complete?.();
}, 100);
return { unsubscribe() {} };
},
};
fromObservable(system, "words", fakeObservable, logger);
// logger receives: "hello", "world", "async one"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment