Last active
June 6, 2023 15:22
-
-
Save terrisgit/7a72c4cbbd8b0ca1d240e59c66ce4572 to your computer and use it in GitHub Desktop.
A custom ReadableStream using back pressure and Promises for pushing data to NodeJS streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Custom ReadableStream stream for piping data in a backpressure-friendly way using Promises | |
| * | |
| * @see {@link https://terrislinenbach.medium.com/obeying-back-pressure-in-javascript-streams-read-stream-pipe-edition-ccd6203596bf|How ReadStream responds to back pressure} | |
| * | |
| * Derives from PassThrough instead of ReadbleStream; otherwise, piping to files etc. won't | |
| * work -- for example, push(null) won't fire 'finished' events. | |
| * | |
| * Usage: | |
| * - Call push(obj) to write obj to the stream | |
| * - When push() returns false, await wait() to avoid exceeding the buffer | |
| * - Call push(null) to indicate EOF | |
| */ | |
| class ResumePush extends stream.PassThrough { | |
| /** | |
| * Constructor | |
| * @param {object} options Specify at least {Number} highWaterMark (number of objects) | |
| */ | |
| constructor(options) { | |
| super({ ...options, objectMode: true }); | |
| } | |
| // eslint-disable-next-line no-underscore-dangle | |
| _read() { | |
| const {resume} = this; | |
| if (!resume) { | |
| this.ready = true; | |
| return; | |
| } | |
| this.resume = undefined; | |
| resume(); | |
| } | |
| /** | |
| * Returns a Promise that is settled when data has been flushed and push() can be called again | |
| * @returns {Promise<undefined>} | |
| */ | |
| async wait() { | |
| if (this.resume) { | |
| // This is a bug. If it can happen in the wild (eg lost network connection), multiplexing needs to be added. | |
| throw new Error('not ready'); | |
| } | |
| if (this.ready) this.ready = false; | |
| else return new Promise((resume) => { this.resume = resume; }); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment