Skip to content

Instantly share code, notes, and snippets.

@dom96
Last active December 6, 2025 13:47
Show Gist options
  • Select an option

  • Save dom96/90433b2f0609f54e87f08ae3515bee67 to your computer and use it in GitHub Desktop.

Select an option

Save dom96/90433b2f0609f54e87f08ae3515bee67 to your computer and use it in GitHub Desktop.
A Cloudflare Worker script which implements a DO that reads from Jetstream
type JetStreamMessage = {
did: string,
time_us: number,
type: string,
kind: string,
commit?: {
rev: string,
type: string,
operation: string,
collection: string,
rkey: string,
record: {
"$type": string,
createdAt: string,
subject: string,
list?: string,
reply?: {},
}
cid: string,
}
};
import { DurableObject } from "cloudflare:workers";
interface Env {
LISTENER: DurableObjectNamespace<FirehoseListener>;
}
export class FirehoseListener extends DurableObject {
lastEventTime: number;
state: DurableObjectState;
websocket: WebSocket | undefined;
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
this.lastEventTime = 0;
this.state = ctx;
this.env = env;
ctx.blockConcurrencyWhile(async () => {
this.lastEventTime = (await ctx.storage.get("lastEventTime")) ?? 0;
});
const onLoopError = (x: any) => {
console.error("Loop failed with error: ", x);
this.startLoop().catch(onLoopError);
}
this.startLoop().catch(onLoopError);
}
async startLoop() {
let url = 'wss://jetstream1.us-west.bsky.network/subscribe'+
'?wantedCollections=app.bsky.graph.block&wantedCollections=app.bsky.graph.listitem';
url += "&cursor=" + this.lastEventTime;
console.log("Connecting to ", url);
this.websocket = new WebSocket(url);
this.websocket.addEventListener('open', event => {
console.log("Connected to Jetstream.");
});
this.websocket.addEventListener('error', err => {
console.log("Got error from WebSocket: ", err);
});
this.websocket.addEventListener('close', event => {
console.log("Disconnected from websocket connection to Jetstream. Resetting DO.", event);
this.state.abort("Reset due to disconnect");
});
this.websocket.addEventListener('message', event => {
const message: JetStreamMessage = JSON.parse(event.data as string);
// Store this in the DOs storage in case the DO is restarted.
this.state.storage.put("lastEventTime", message.time_us);
if (message.commit?.collection == "app.bsky.graph.listitem") {
// TODO: Do something with the message here...
}
});
await resetAlarm();
}
async resetAlarm() {
const alarm = await this.state.storage.getAlarm();
// If we have an alarm set that is not in the past then
// don't set another one.
if (alarm && (alarm - Date.now()) > 0) {
return;
}
// Set an alarm 5 seconds in the future to ensure the DO stays alive.
await this.state.storage.setAlarm(Date.now() + 5000);
}
async getLastEventTime() {
return this.lastEventTime;
}
async alarm() {
await this.resetAlarm();
}
}
async function getStub(env: Env) {
const id = env.LISTENER.idFromName("main");
const stub = env.LISTENER.get(id);
return stub;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
if (url.pathname == "/last") {
// Requesting this endpoint will get the DO to start running
// (if it isn't already)
const stub = await getStub(env);
return new Response((await stub.getLastEventTime()).toString());
} else {
return new Response("404", { status: 404 });
}
},
};
name = "listifications"
main = "src/index.ts"
compatibility_date = "2025-09-25"
[observability.logs]
enabled = true
[[durable_objects.bindings]]
name = "LISTENER"
class_name = "FirehoseListener"
[[migrations]]
tag = "v1"
new_classes = ["FirehoseListener"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment