Last active
December 6, 2025 13:47
-
-
Save dom96/90433b2f0609f54e87f08ae3515bee67 to your computer and use it in GitHub Desktop.
A Cloudflare Worker script which implements a DO that reads from Jetstream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type JetStreamMessage = { | |
| did: string, | |
| time_us: number, | |
| type: string, | |
| kind: string, | |
| commit?: { | |
| rev: string, | |
| type: string, | |
| operation: string, | |
| collection: string, | |
| rkey: string, | |
| record: { | |
| "$type": string, | |
| createdAt: string, | |
| subject: string, | |
| list?: string, | |
| reply?: {}, | |
| } | |
| cid: string, | |
| } | |
| }; | |
| import { DurableObject } from "cloudflare:workers"; | |
| interface Env { | |
| LISTENER: DurableObjectNamespace<FirehoseListener>; | |
| } | |
| export class FirehoseListener extends DurableObject { | |
| lastEventTime: number; | |
| state: DurableObjectState; | |
| websocket: WebSocket | undefined; | |
| constructor(ctx: DurableObjectState, env: Env) { | |
| super(ctx, env); | |
| this.lastEventTime = 0; | |
| this.state = ctx; | |
| this.env = env; | |
| ctx.blockConcurrencyWhile(async () => { | |
| this.lastEventTime = (await ctx.storage.get("lastEventTime")) ?? 0; | |
| }); | |
| const onLoopError = (x: any) => { | |
| console.error("Loop failed with error: ", x); | |
| this.startLoop().catch(onLoopError); | |
| } | |
| this.startLoop().catch(onLoopError); | |
| } | |
| async startLoop() { | |
| let url = 'wss://jetstream1.us-west.bsky.network/subscribe'+ | |
| '?wantedCollections=app.bsky.graph.block&wantedCollections=app.bsky.graph.listitem'; | |
| url += "&cursor=" + this.lastEventTime; | |
| console.log("Connecting to ", url); | |
| this.websocket = new WebSocket(url); | |
| this.websocket.addEventListener('open', event => { | |
| console.log("Connected to Jetstream."); | |
| }); | |
| this.websocket.addEventListener('error', err => { | |
| console.log("Got error from WebSocket: ", err); | |
| }); | |
| this.websocket.addEventListener('close', event => { | |
| console.log("Disconnected from websocket connection to Jetstream. Resetting DO.", event); | |
| this.state.abort("Reset due to disconnect"); | |
| }); | |
| this.websocket.addEventListener('message', event => { | |
| const message: JetStreamMessage = JSON.parse(event.data as string); | |
| // Store this in the DOs storage in case the DO is restarted. | |
| this.state.storage.put("lastEventTime", message.time_us); | |
| if (message.commit?.collection == "app.bsky.graph.listitem") { | |
| // TODO: Do something with the message here... | |
| } | |
| }); | |
| await resetAlarm(); | |
| } | |
| async resetAlarm() { | |
| const alarm = await this.state.storage.getAlarm(); | |
| // If we have an alarm set that is not in the past then | |
| // don't set another one. | |
| if (alarm && (alarm - Date.now()) > 0) { | |
| return; | |
| } | |
| // Set an alarm 5 seconds in the future to ensure the DO stays alive. | |
| await this.state.storage.setAlarm(Date.now() + 5000); | |
| } | |
| async getLastEventTime() { | |
| return this.lastEventTime; | |
| } | |
| async alarm() { | |
| await this.resetAlarm(); | |
| } | |
| } | |
| async function getStub(env: Env) { | |
| const id = env.LISTENER.idFromName("main"); | |
| const stub = env.LISTENER.get(id); | |
| return stub; | |
| } | |
| export default { | |
| async fetch(request: Request, env: Env): Promise<Response> { | |
| const url = new URL(request.url); | |
| if (url.pathname == "/last") { | |
| // Requesting this endpoint will get the DO to start running | |
| // (if it isn't already) | |
| const stub = await getStub(env); | |
| return new Response((await stub.getLastEventTime()).toString()); | |
| } else { | |
| return new Response("404", { status: 404 }); | |
| } | |
| }, | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name = "listifications" | |
| main = "src/index.ts" | |
| compatibility_date = "2025-09-25" | |
| [observability.logs] | |
| enabled = true | |
| [[durable_objects.bindings]] | |
| name = "LISTENER" | |
| class_name = "FirehoseListener" | |
| [[migrations]] | |
| tag = "v1" | |
| new_classes = ["FirehoseListener"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment