Last active
June 2, 2023 20:19
-
-
Save terrisgit/1e4594ab14cc7c911f654b05d0b32916 to your computer and use it in GitHub Desktop.
AWS-SDK v3 Streaming Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const { S3Client } = require('@aws-sdk/client-s3'); | |
| const { Upload } = require('@aws-sdk/lib-storage'); | |
| const { PassThrough } = require('stream'); | |
| /** | |
| * A custom PassThrough stream for piping any sort of data in a | |
| * backpressure-friendly way using Promises. | |
| * | |
| * Derives from PassThrough instead of Readble; otherwise, piping to | |
| * files etc. won't work; for example, push(null) won't cause finished | |
| * events to fire. | |
| * | |
| * Usage: | |
| * - Call push(obj) to write obj to the stream | |
| * - When push() returns false, call wait() to avoid exceeding the buffer | |
| * - Call push(null) to indicate EOF | |
| */ | |
| class ResumePush extends stream.PassThrough { | |
| constructor(options) { // Specify at least {highWaterMark} (number of objects) | |
| super({ ...options, objectMode: true }); | |
| } | |
| // eslint-disable-next-line no-underscore-dangle | |
| _read() { | |
| const {resume} = this; | |
| if (!resume) { | |
| this.ready = true; | |
| return; | |
| } | |
| this.resume = undefined; | |
| resume(); | |
| } | |
| /** | |
| * Returns a Promise that is settled when data has been flushed and push() can be called again | |
| * @returns {Promise<undefined>} | |
| */ | |
| async wait() { | |
| if (this.resume) { | |
| // This is a bug. If it can happen in the wild (eg lost network connection), multiplexing needs to be added. | |
| throw new Error('not ready'); | |
| } | |
| if (this.ready) this.ready = false; | |
| else return new Promise((resume) => { this.resume = resume; }); | |
| } | |
| } | |
| // ====== | |
| // Set up | |
| const stream = new ResumePush(); | |
| const body = new PassThrough(); // Why?? | |
| stream.pipe(body); | |
| const upload = new Upload({ | |
| client: new S3Client({ region: ... }), | |
| params: { | |
| Bucket: ..., | |
| Key: ..., | |
| Body: body, | |
| }, | |
| }); | |
| // =================== | |
| // Write to the stream | |
| for (const line of ['one', 'two']) { | |
| if (!stream.push(line)) await stream.wait(); | |
| } | |
| // ========================= | |
| // Wait for upload to finish | |
| stream.push(null); | |
| await upload.done(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment