Skip to content

Instantly share code, notes, and snippets.

@terrisgit
Last active June 6, 2023 15:22
Show Gist options
  • Select an option

  • Save terrisgit/7a72c4cbbd8b0ca1d240e59c66ce4572 to your computer and use it in GitHub Desktop.

Select an option

Save terrisgit/7a72c4cbbd8b0ca1d240e59c66ce4572 to your computer and use it in GitHub Desktop.
A custom ReadableStream using back pressure and Promises for pushing data to NodeJS streams
/**
* Custom ReadableStream stream for piping data in a backpressure-friendly way using Promises
*
* @see {@link https://terrislinenbach.medium.com/obeying-back-pressure-in-javascript-streams-read-stream-pipe-edition-ccd6203596bf|How ReadStream responds to back pressure}
*
* Derives from PassThrough instead of ReadbleStream; otherwise, piping to files etc. won't
* work -- for example, push(null) won't fire 'finished' events.
*
* Usage:
* - Call push(obj) to write obj to the stream
* - When push() returns false, await wait() to avoid exceeding the buffer
* - Call push(null) to indicate EOF
*/
class ResumePush extends stream.PassThrough {
/**
* Constructor
* @param {object} options Specify at least {Number} highWaterMark (number of objects)
*/
constructor(options) {
super({ ...options, objectMode: true });
}
// eslint-disable-next-line no-underscore-dangle
_read() {
const {resume} = this;
if (!resume) {
this.ready = true;
return;
}
this.resume = undefined;
resume();
}
/**
* Returns a Promise that is settled when data has been flushed and push() can be called again
* @returns {Promise<undefined>}
*/
async wait() {
if (this.resume) {
// This is a bug. If it can happen in the wild (eg lost network connection), multiplexing needs to be added.
throw new Error('not ready');
}
if (this.ready) this.ready = false;
else return new Promise((resume) => { this.resume = resume; });
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment