Created
August 5, 2025 15:16
-
-
Save samwho/92fd9bac81fc8b6751c928b47e4d44bd to your computer and use it in GitHub Desktop.
Peek a stream in TypeScript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Peek at the next chunk of data in the stream without consuming it. | |
| * @param input The input stream to read from. | |
| * @param length The number of bytes to peek. | |
| * @returns A promise that resolves to the peeked data. | |
| */ | |
| async function peek(input: Readable, length: number): Promise<Buffer> { | |
| // Try synchronous read first | |
| const immediateBuffer = input.read(length) | |
| if (immediateBuffer) { | |
| input.unshift(immediateBuffer) | |
| return immediateBuffer | |
| } | |
| // If not available immediately, we need to collect chunks | |
| return new Promise((resolve, reject) => { | |
| const chunks: Uint8Array[] = [] | |
| let totalLength = 0 | |
| const cleanup = () => { | |
| input.removeListener("readable", onReadable) | |
| input.removeListener("end", onEnd) | |
| input.removeListener("error", onError) | |
| } | |
| const onReadable = () => { | |
| let chunk: Buffer | |
| while ((chunk = input.read()) !== null) { | |
| chunks.push(new Uint8Array(chunk)) | |
| totalLength += chunk.length | |
| // If we have enough data, stop reading | |
| if (totalLength >= length) { | |
| cleanup() | |
| // Put all chunks back in reverse order | |
| for (let i = chunks.length - 1; i >= 0; i--) { | |
| input.unshift(Buffer.from(chunks[i])) | |
| } | |
| const combined = Buffer.concat(chunks) | |
| resolve(combined.subarray(0, length)) | |
| return | |
| } | |
| } | |
| // If we get here, we don't have enough data yet | |
| // The readable event will fire again when more data is available | |
| } | |
| const onEnd = () => { | |
| cleanup() | |
| if (chunks.length === 0) { | |
| reject(new Error("Stream ended with no data")) | |
| return | |
| } | |
| // Stream ended, can't put data back, just return what we could read | |
| const combined = Buffer.concat(chunks) | |
| if (combined.length < length) { | |
| reject(new Error("Stream ended with insufficient data")) | |
| return | |
| } | |
| resolve(combined) | |
| } | |
| const onError = (err: Error) => { | |
| cleanup() | |
| reject(err) | |
| } | |
| input.on("readable", onReadable) | |
| input.on("end", onEnd) | |
| input.on("error", onError) | |
| // Try reading immediately in case data is already buffered | |
| onReadable() | |
| }) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment