Created
January 6, 2023 14:55
-
-
Save dmost714/054961e90d579a8c6961470416d7a3ea to your computer and use it in GitHub Desktop.
Batch write to dynamodb, with retry and exponential backoff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { DynamoDBClient, PutItemCommand, PutRequest } from "@aws-sdk/client-dynamodb" | |
| import { BatchWriteCommandInput, BatchWriteCommandOutput, DynamoDBDocumentClient, QueryCommand, QueryCommandInput, QueryCommandOutput } from "@aws-sdk/lib-dynamodb" | |
| import { BatchWriteCommand } from "@aws-sdk/lib-dynamodb" | |
| import internal from 'node:stream' | |
| // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html | |
| const ddbClient = new DynamoDBClient({}) | |
| const marshallOptions = { | |
| convertEmptyValues: false, // false, by default. | |
| removeUndefinedValues: false, // false, by default. | |
| convertClassInstanceToMap: false, // false, by default. | |
| } | |
| const unmarshallOptions = { | |
| wrapNumbers: false, // false, by default. | |
| } | |
| const translateConfig = { marshallOptions, unmarshallOptions } | |
| const ddbDocClient = DynamoDBDocumentClient.from(ddbClient, translateConfig) | |
| const MAX_RETRY_COUNT = 12 | |
| const MAX_DDB_BATCH_SIZE = 25 // documented AWS limit (https://go.aws/3uNqj1l) | |
| const wait = (ms: number) => new Promise((res) => setTimeout(res, ms)) | |
| const ddbBatchWrite = async (tablename: string, remainingItems: any[], unprocessedItems: any = [], retryCount: number = 0) => { | |
| if (retryCount > 0) { | |
| console.log(`ddbBatchWrite: retryCount`, retryCount) | |
| console.log(`remainingItems`, JSON.stringify(remainingItems, null, 2)) | |
| console.log(`unprocessedItems`, JSON.stringify(unprocessedItems, null, 2)) | |
| } | |
| const allItems = [...unprocessedItems, ...remainingItems] | |
| const twentyFiveItems = allItems.splice(0, MAX_DDB_BATCH_SIZE) // REMOVES items from allItems | |
| const params: BatchWriteCommandInput = { | |
| RequestItems: { | |
| [tablename]: twentyFiveItems | |
| } | |
| } | |
| // console.log('params', JSON.stringify(params, null, 2)) | |
| try { | |
| const res = await ddbDocClient.send(new BatchWriteCommand(params)) as BatchWriteCommandOutput | |
| // console.log(`res`, JSON.stringify(res, null, 2)) | |
| if (res.UnprocessedItems && res.UnprocessedItems[tablename]?.length > 0) { | |
| if (retryCount > MAX_RETRY_COUNT) { | |
| console.log('EXCEEDED MAX RETRY COUNT') | |
| console.log('res.UnprocessedItems', JSON.stringify(res.UnprocessedItems, null, 2)) | |
| throw new Error(`Exceeded max retry count, ${res.UnprocessedItems[tablename].length} records unprocessed in this batch`) | |
| } | |
| console.log(`wait ${2 ** retryCount * 10} milliseconds`) | |
| await wait(2 ** retryCount * 10) | |
| console.log(`retry with ${res.UnprocessedItems[tablename].length} unprocessed items`) | |
| await ddbBatchWrite(tablename, allItems, res.UnprocessedItems[tablename], retryCount + 1) | |
| } else { | |
| console.log('processed all items') | |
| } | |
| } catch (error) { | |
| console.log('ddBatchWrite ERROR:', error) | |
| if (retryCount > MAX_RETRY_COUNT) { | |
| console.log('EXCEEDED MAX RETRY COUNT') | |
| throw new Error(`*** Exceeded max retry count, ${twentyFiveItems.length} records unprocessed in this batch`) | |
| } | |
| console.log(('sleep, try again...')) | |
| console.log(`wait ${2 ** retryCount * 10} milliseconds`) | |
| await wait(2 ** retryCount * 10) | |
| console.log('reprocess 25') | |
| await ddbBatchWrite(tablename, twentyFiveItems, [], retryCount + 1) | |
| } | |
| if (allItems.length > 0) { | |
| console.log('process remaining items:', allItems.length) | |
| await ddbBatchWrite(tablename, allItems) | |
| } | |
| } | |
| // Takes an array of POJO and batch writes them to DynamoDB | |
| export async function writeObjectArrayToDatabase( | |
| tablename: string, | |
| readable: internal.Duplex | |
| ) { | |
| console.log('BEGIN writing data to', tablename) | |
| for await (const batchOfRows of readable) { | |
| const itemsToInsert = batchOfRows.map((row: object) => { | |
| return { | |
| PutRequest: { | |
| Item: { | |
| ...{ | |
| createdAt: new Date().toISOString(), | |
| updatedAt: new Date().toISOString(), | |
| ...row | |
| } | |
| } | |
| } | |
| } | |
| }) | |
| await ddbBatchWrite(tablename, itemsToInsert) | |
| } | |
| console.log('FINISHED writing data to', tablename) | |
| } | |
| export const fetchRecords = async ( | |
| tablename: string, | |
| pk: string, | |
| sk: string, | |
| campaignId: string, | |
| startKey: QueryCommandInput["ExclusiveStartKey"]) => { | |
| console.log('FETCHING RECORDS') | |
| const params: QueryCommandInput = { | |
| TableName: tablename, | |
| KeyConditionExpression: "pk = :pk and sk >= :sk", | |
| FilterExpression: "campaignId = :campaignId", | |
| ProjectionExpression: "sk, #url", | |
| ExpressionAttributeNames: { "#url": "url" }, | |
| ExpressionAttributeValues: { | |
| ":pk": pk, | |
| ":sk": sk, | |
| ":campaignId": campaignId, | |
| }, | |
| ExclusiveStartKey: startKey, | |
| ScanIndexForward: true | |
| } | |
| console.log('params', JSON.stringify(params, null, 2)) | |
| const res = await ddbDocClient.send(new QueryCommand(params)) // as QueryCommandOutput | |
| // console.log('QueryCommandOutput', res) | |
| console.log('response metadata', res.$metadata) | |
| console.log('response count', res.Count) | |
| console.log('response scanned', res.ScannedCount) | |
| return res | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment