Skip to content

Instantly share code, notes, and snippets.

@ForbesLindesay
Last active August 7, 2025 19:51
Show Gist options
  • Select an option

  • Save ForbesLindesay/5b3d1206243b8282ac538fe16f5c49bb to your computer and use it in GitHub Desktop.

Select an option

Save ForbesLindesay/5b3d1206243b8282ac538fe16f5c49bb to your computer and use it in GitHub Desktop.
/**
* Requires a stream marked as `type "bytes"`, this can support zero-copy mode,
* which might make it slightly more efficient, but it's a little more complex
* than the code for not using `{ mode: "byob" }`
*/
async function peekByob(input: ReadableStream<Uint8Array>, length: number) {
const [peekable, output] = input.tee()
const reader = peekable.getReader({ mode: "byob" })
const peeked = await readByobBytes(reader, length)
reader.cancel()
return { peeked, output }
}
async function readByobBytes(reader: ReadableStreamBYOBReader, length: number) {
let buffer = new ArrayBuffer(length)
let bytesReceived = 0
while (bytesReceived < length) {
const { done, value } = await reader.read(
new Uint8Array(buffer, bytesReceived, length - bytesReceived)
)
if (done) throw new Error(`Stream shorter than ${length} bytes`)
buffer = value.buffer
bytesReceived += value.byteLength
}
return new Uint8Array(buffer, 0, bytesReceived)
}
/**
* Works with streams marked as `type "bytes"` or streams in the default object mode,
* providing it's a stream of Uint8Array or Buffer objects. It might be a tiny bit less
* efficient than the byob mode, but I suspect it would be too small a difference to measure,
* it might even not make any difference at all.
*/
async function peekBytes(input: ReadableStream<Uint8Array>, length: number) {
const [peekable, output] = input.tee()
const reader = peekable.getReader()
const peeked = await readBytes(reader, length)
reader.cancel()
return { peeked, output }
}
async function readBytes(
reader: ReadableStreamDefaultReader<Uint8Array>,
length: number
) {
const output = new Uint8Array(length)
let bytesReceived = 0
while (bytesReceived < length) {
const { done, value } = await reader.read()
if (done) throw new Error(`Stream shorter than ${length} bytes`)
output.set(value.subarray(0, length - bytesReceived), bytesReceived)
bytesReceived += value.byteLength
}
return output
}
/**
* Peek the first n objects in a stream of objects.
* returning the an array of the peeked objects as `peeked`
* and a stream that's as if the objects have not yet been
* read as `output`
*/
async function peekObjects<T>(input: ReadableStream<T>, length: number) {
const [peekable, output] = input.tee()
const reader = peekable.getReader()
const peeked = await readObjects(reader, length)
reader.cancel()
return { peeked, output }
}
async function readObjects<T>(reader: ReadableStreamDefaultReader<T>, length: number) {
const output: T[] = []
while (output.length < length) {
const { done, value } = await reader.read()
if (done) throw new Error(`Stream shorter than ${length} bytes`)
output.push(value)
}
return output
}
Testing getByobStream with peekByob
peeked = Uint8Array(3) [ 0, 1, 2 ]
chunk = Uint8Array(2) [ 0, 1 ]
chunk = Uint8Array(1) [ 2 ]
chunk = Uint8Array(2) [ 3, 4 ]
chunk = Uint8Array(2) [ 5, 6 ]
chunk = Uint8Array(2) [ 7, 8 ]
Testing getByobStream with peekBytes
peeked = Uint8Array(3) [ 0, 1, 2 ]
chunk = Uint8Array(2) [ 0, 1 ]
chunk = Uint8Array(2) [ 2, 3 ]
chunk = Uint8Array(2) [ 4, 5 ]
chunk = Uint8Array(2) [ 6, 7 ]
chunk = Uint8Array(2) [ 8, 9 ]
Testing getDataChunksStream with peekByob
Error thrown: The argument 'stream' must be a byte stream. Received ReadableStream { locked: false, state: 'readable', supportsBYOB: false }
Testing getDataChunksStream with peekBytes
peeked = Uint8Array(3) [ 0, 1, 2 ]
chunk = Uint8Array(2) [ 0, 1 ]
chunk = Uint8Array(2) [ 2, 3 ]
chunk = Uint8Array(2) [ 4, 5 ]
chunk = Uint8Array(2) [ 6, 7 ]
chunk = Uint8Array(2) [ 8, 9 ]
Testing getObjectStream with peekObjects
peeked = [ { id: 0 }, { id: 1 }, { id: 2 } ]
chunk = { id: 0 }
chunk = { id: 1 }
chunk = { id: 2 }
chunk = { id: 3 }
chunk = { id: 4 }
chunk = { id: 5 }
chunk = { id: 6 }
chunk = { id: 7 }
chunk = { id: 8 }
/**
* Stream supporting zero-copy reading using
* the BYOB (Bring Your Own Buffer) API.
*/
function getByobStream() {
let nextByte = 0
return new ReadableStream({
type: "bytes",
async pull(controller) {
if (nextByte > 8) {
controller.close()
return
}
if (controller.byobRequest) {
const v = controller.byobRequest!.view!
const bytesToRead = Math.min(v.byteLength, 2)
const result = new Uint8Array(v.buffer, v.byteOffset, v.byteLength)
for (let i = 0; i < bytesToRead; i++) {
result[i] = nextByte++
}
controller.byobRequest!.respond(bytesToRead)
} else {
controller.enqueue(new Uint8Array([nextByte++, nextByte++]))
}
},
})
}
/**
* Stream of chunks that does not support BYOB reading.
*/
function getDataChunksStream() {
let nextByte = 0
return new ReadableStream({
async pull(controller) {
if (nextByte > 8) {
controller.close()
return
}
controller.enqueue(new Uint8Array([nextByte++, nextByte++]))
},
})
}
/**
* Stream of objects that does not support BYOB reading.
*/
function getObjectStream() {
let nextId = 0
return new ReadableStream({
async pull(controller) {
if (nextId > 8) {
controller.close()
return
}
controller.enqueue({ id: nextId++ })
},
})
}
for (const [getStreamFn, peekFn] of [
[getByobStream, peekByob],
[getByobStream, peekBytes],
[getDataChunksStream, peekByob], // <- will throw an error
[getDataChunksStream, peekBytes],
[getObjectStream, peekObjects],
] as const) {
console.log(`Testing ${getStreamFn.name} with ${peekFn.name}`)
try {
const stream = getStreamFn()
const { peeked, output } = await peekFn(stream, 3)
console.log(`peeked = `, peeked)
for await (const chunk of output) {
console.log(`chunk = `, chunk)
}
} catch (error) {
console.error(`Error thrown: ${error.message}`)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment