Skip to content

Instantly share code, notes, and snippets.

@terrisgit
Last active June 2, 2023 20:19
Show Gist options
  • Select an option

  • Save terrisgit/1e4594ab14cc7c911f654b05d0b32916 to your computer and use it in GitHub Desktop.

Select an option

Save terrisgit/1e4594ab14cc7c911f654b05d0b32916 to your computer and use it in GitHub Desktop.
AWS-SDK v3 Streaming Example
const { S3Client } = require('@aws-sdk/client-s3');
const { Upload } = require('@aws-sdk/lib-storage');
const { PassThrough } = require('stream');
/**
* A custom PassThrough stream for piping any sort of data in a
* backpressure-friendly way using Promises.
*
* Derives from PassThrough instead of Readble; otherwise, piping to
* files etc. won't work; for example, push(null) won't cause finished
* events to fire.
*
* Usage:
* - Call push(obj) to write obj to the stream
* - When push() returns false, call wait() to avoid exceeding the buffer
* - Call push(null) to indicate EOF
*/
class ResumePush extends stream.PassThrough {
constructor(options) { // Specify at least {highWaterMark} (number of objects)
super({ ...options, objectMode: true });
}
// eslint-disable-next-line no-underscore-dangle
_read() {
const {resume} = this;
if (!resume) {
this.ready = true;
return;
}
this.resume = undefined;
resume();
}
/**
* Returns a Promise that is settled when data has been flushed and push() can be called again
* @returns {Promise<undefined>}
*/
async wait() {
if (this.resume) {
// This is a bug. If it can happen in the wild (eg lost network connection), multiplexing needs to be added.
throw new Error('not ready');
}
if (this.ready) this.ready = false;
else return new Promise((resume) => { this.resume = resume; });
}
}
// ======
// Set up
const stream = new ResumePush();
const body = new PassThrough(); // Why??
stream.pipe(body);
const upload = new Upload({
client: new S3Client({ region: ... }),
params: {
Bucket: ...,
Key: ...,
Body: body,
},
});
// ===================
// Write to the stream
for (const line of ['one', 'two']) {
if (!stream.push(line)) await stream.wait();
}
// =========================
// Wait for upload to finish
stream.push(null);
await upload.done();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment