Last active
August 7, 2025 19:51
-
-
Save ForbesLindesay/5b3d1206243b8282ac538fe16f5c49bb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Requires a stream marked as `type "bytes"`, this can support zero-copy mode, | |
| * which might make it slightly more efficient, but it's a little more complex | |
| * than the code for not using `{ mode: "byob" }` | |
| */ | |
| async function peekByob(input: ReadableStream<Uint8Array>, length: number) { | |
| const [peekable, output] = input.tee() | |
| const reader = peekable.getReader({ mode: "byob" }) | |
| const peeked = await readByobBytes(reader, length) | |
| reader.cancel() | |
| return { peeked, output } | |
| } | |
| async function readByobBytes(reader: ReadableStreamBYOBReader, length: number) { | |
| let buffer = new ArrayBuffer(length) | |
| let bytesReceived = 0 | |
| while (bytesReceived < length) { | |
| const { done, value } = await reader.read( | |
| new Uint8Array(buffer, bytesReceived, length - bytesReceived) | |
| ) | |
| if (done) throw new Error(`Stream shorter than ${length} bytes`) | |
| buffer = value.buffer | |
| bytesReceived += value.byteLength | |
| } | |
| return new Uint8Array(buffer, 0, bytesReceived) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Works with streams marked as `type "bytes"` or streams in the default object mode, | |
| * providing it's a stream of Uint8Array or Buffer objects. It might be a tiny bit less | |
| * efficient than the byob mode, but I suspect it would be too small a difference to measure, | |
| * it might even not make any difference at all. | |
| */ | |
| async function peekBytes(input: ReadableStream<Uint8Array>, length: number) { | |
| const [peekable, output] = input.tee() | |
| const reader = peekable.getReader() | |
| const peeked = await readBytes(reader, length) | |
| reader.cancel() | |
| return { peeked, output } | |
| } | |
| async function readBytes( | |
| reader: ReadableStreamDefaultReader<Uint8Array>, | |
| length: number | |
| ) { | |
| const output = new Uint8Array(length) | |
| let bytesReceived = 0 | |
| while (bytesReceived < length) { | |
| const { done, value } = await reader.read() | |
| if (done) throw new Error(`Stream shorter than ${length} bytes`) | |
| output.set(value.subarray(0, length - bytesReceived), bytesReceived) | |
| bytesReceived += value.byteLength | |
| } | |
| return output | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Peek the first n objects in a stream of objects. | |
| * returning the an array of the peeked objects as `peeked` | |
| * and a stream that's as if the objects have not yet been | |
| * read as `output` | |
| */ | |
| async function peekObjects<T>(input: ReadableStream<T>, length: number) { | |
| const [peekable, output] = input.tee() | |
| const reader = peekable.getReader() | |
| const peeked = await readObjects(reader, length) | |
| reader.cancel() | |
| return { peeked, output } | |
| } | |
| async function readObjects<T>(reader: ReadableStreamDefaultReader<T>, length: number) { | |
| const output: T[] = [] | |
| while (output.length < length) { | |
| const { done, value } = await reader.read() | |
| if (done) throw new Error(`Stream shorter than ${length} bytes`) | |
| output.push(value) | |
| } | |
| return output | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Testing getByobStream with peekByob | |
| peeked = Uint8Array(3) [ 0, 1, 2 ] | |
| chunk = Uint8Array(2) [ 0, 1 ] | |
| chunk = Uint8Array(1) [ 2 ] | |
| chunk = Uint8Array(2) [ 3, 4 ] | |
| chunk = Uint8Array(2) [ 5, 6 ] | |
| chunk = Uint8Array(2) [ 7, 8 ] | |
| Testing getByobStream with peekBytes | |
| peeked = Uint8Array(3) [ 0, 1, 2 ] | |
| chunk = Uint8Array(2) [ 0, 1 ] | |
| chunk = Uint8Array(2) [ 2, 3 ] | |
| chunk = Uint8Array(2) [ 4, 5 ] | |
| chunk = Uint8Array(2) [ 6, 7 ] | |
| chunk = Uint8Array(2) [ 8, 9 ] | |
| Testing getDataChunksStream with peekByob | |
| Error thrown: The argument 'stream' must be a byte stream. Received ReadableStream { locked: false, state: 'readable', supportsBYOB: false } | |
| Testing getDataChunksStream with peekBytes | |
| peeked = Uint8Array(3) [ 0, 1, 2 ] | |
| chunk = Uint8Array(2) [ 0, 1 ] | |
| chunk = Uint8Array(2) [ 2, 3 ] | |
| chunk = Uint8Array(2) [ 4, 5 ] | |
| chunk = Uint8Array(2) [ 6, 7 ] | |
| chunk = Uint8Array(2) [ 8, 9 ] | |
| Testing getObjectStream with peekObjects | |
| peeked = [ { id: 0 }, { id: 1 }, { id: 2 } ] | |
| chunk = { id: 0 } | |
| chunk = { id: 1 } | |
| chunk = { id: 2 } | |
| chunk = { id: 3 } | |
| chunk = { id: 4 } | |
| chunk = { id: 5 } | |
| chunk = { id: 6 } | |
| chunk = { id: 7 } | |
| chunk = { id: 8 } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Stream supporting zero-copy reading using | |
| * the BYOB (Bring Your Own Buffer) API. | |
| */ | |
| function getByobStream() { | |
| let nextByte = 0 | |
| return new ReadableStream({ | |
| type: "bytes", | |
| async pull(controller) { | |
| if (nextByte > 8) { | |
| controller.close() | |
| return | |
| } | |
| if (controller.byobRequest) { | |
| const v = controller.byobRequest!.view! | |
| const bytesToRead = Math.min(v.byteLength, 2) | |
| const result = new Uint8Array(v.buffer, v.byteOffset, v.byteLength) | |
| for (let i = 0; i < bytesToRead; i++) { | |
| result[i] = nextByte++ | |
| } | |
| controller.byobRequest!.respond(bytesToRead) | |
| } else { | |
| controller.enqueue(new Uint8Array([nextByte++, nextByte++])) | |
| } | |
| }, | |
| }) | |
| } | |
| /** | |
| * Stream of chunks that does not support BYOB reading. | |
| */ | |
| function getDataChunksStream() { | |
| let nextByte = 0 | |
| return new ReadableStream({ | |
| async pull(controller) { | |
| if (nextByte > 8) { | |
| controller.close() | |
| return | |
| } | |
| controller.enqueue(new Uint8Array([nextByte++, nextByte++])) | |
| }, | |
| }) | |
| } | |
| /** | |
| * Stream of objects that does not support BYOB reading. | |
| */ | |
| function getObjectStream() { | |
| let nextId = 0 | |
| return new ReadableStream({ | |
| async pull(controller) { | |
| if (nextId > 8) { | |
| controller.close() | |
| return | |
| } | |
| controller.enqueue({ id: nextId++ }) | |
| }, | |
| }) | |
| } | |
| for (const [getStreamFn, peekFn] of [ | |
| [getByobStream, peekByob], | |
| [getByobStream, peekBytes], | |
| [getDataChunksStream, peekByob], // <- will throw an error | |
| [getDataChunksStream, peekBytes], | |
| [getObjectStream, peekObjects], | |
| ] as const) { | |
| console.log(`Testing ${getStreamFn.name} with ${peekFn.name}`) | |
| try { | |
| const stream = getStreamFn() | |
| const { peeked, output } = await peekFn(stream, 3) | |
| console.log(`peeked = `, peeked) | |
| for await (const chunk of output) { | |
| console.log(`chunk = `, chunk) | |
| } | |
| } catch (error) { | |
| console.error(`Error thrown: ${error.message}`) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment