Skip to content

Instantly share code, notes, and snippets.

@dmost714
Created January 6, 2023 14:55
Show Gist options
  • Select an option

  • Save dmost714/054961e90d579a8c6961470416d7a3ea to your computer and use it in GitHub Desktop.

Select an option

Save dmost714/054961e90d579a8c6961470416d7a3ea to your computer and use it in GitHub Desktop.
Batch write to dynamodb, with retry and exponential backoff
import { DynamoDBClient, PutItemCommand, PutRequest } from "@aws-sdk/client-dynamodb"
import { BatchWriteCommandInput, BatchWriteCommandOutput, DynamoDBDocumentClient, QueryCommand, QueryCommandInput, QueryCommandOutput } from "@aws-sdk/lib-dynamodb"
import { BatchWriteCommand } from "@aws-sdk/lib-dynamodb"
import internal from 'node:stream'
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html
const ddbClient = new DynamoDBClient({})
const marshallOptions = {
convertEmptyValues: false, // false, by default.
removeUndefinedValues: false, // false, by default.
convertClassInstanceToMap: false, // false, by default.
}
const unmarshallOptions = {
wrapNumbers: false, // false, by default.
}
const translateConfig = { marshallOptions, unmarshallOptions }
const ddbDocClient = DynamoDBDocumentClient.from(ddbClient, translateConfig)
const MAX_RETRY_COUNT = 12
const MAX_DDB_BATCH_SIZE = 25 // documented AWS limit (https://go.aws/3uNqj1l)
const wait = (ms: number) => new Promise((res) => setTimeout(res, ms))
const ddbBatchWrite = async (tablename: string, remainingItems: any[], unprocessedItems: any = [], retryCount: number = 0) => {
if (retryCount > 0) {
console.log(`ddbBatchWrite: retryCount`, retryCount)
console.log(`remainingItems`, JSON.stringify(remainingItems, null, 2))
console.log(`unprocessedItems`, JSON.stringify(unprocessedItems, null, 2))
}
const allItems = [...unprocessedItems, ...remainingItems]
const twentyFiveItems = allItems.splice(0, MAX_DDB_BATCH_SIZE) // REMOVES items from allItems
const params: BatchWriteCommandInput = {
RequestItems: {
[tablename]: twentyFiveItems
}
}
// console.log('params', JSON.stringify(params, null, 2))
try {
const res = await ddbDocClient.send(new BatchWriteCommand(params)) as BatchWriteCommandOutput
// console.log(`res`, JSON.stringify(res, null, 2))
if (res.UnprocessedItems && res.UnprocessedItems[tablename]?.length > 0) {
if (retryCount > MAX_RETRY_COUNT) {
console.log('EXCEEDED MAX RETRY COUNT')
console.log('res.UnprocessedItems', JSON.stringify(res.UnprocessedItems, null, 2))
throw new Error(`Exceeded max retry count, ${res.UnprocessedItems[tablename].length} records unprocessed in this batch`)
}
console.log(`wait ${2 ** retryCount * 10} milliseconds`)
await wait(2 ** retryCount * 10)
console.log(`retry with ${res.UnprocessedItems[tablename].length} unprocessed items`)
await ddbBatchWrite(tablename, allItems, res.UnprocessedItems[tablename], retryCount + 1)
} else {
console.log('processed all items')
}
} catch (error) {
console.log('ddBatchWrite ERROR:', error)
if (retryCount > MAX_RETRY_COUNT) {
console.log('EXCEEDED MAX RETRY COUNT')
throw new Error(`*** Exceeded max retry count, ${twentyFiveItems.length} records unprocessed in this batch`)
}
console.log(('sleep, try again...'))
console.log(`wait ${2 ** retryCount * 10} milliseconds`)
await wait(2 ** retryCount * 10)
console.log('reprocess 25')
await ddbBatchWrite(tablename, twentyFiveItems, [], retryCount + 1)
}
if (allItems.length > 0) {
console.log('process remaining items:', allItems.length)
await ddbBatchWrite(tablename, allItems)
}
}
// Takes an array of POJO and batch writes them to DynamoDB
export async function writeObjectArrayToDatabase(
tablename: string,
readable: internal.Duplex
) {
console.log('BEGIN writing data to', tablename)
for await (const batchOfRows of readable) {
const itemsToInsert = batchOfRows.map((row: object) => {
return {
PutRequest: {
Item: {
...{
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
...row
}
}
}
}
})
await ddbBatchWrite(tablename, itemsToInsert)
}
console.log('FINISHED writing data to', tablename)
}
export const fetchRecords = async (
tablename: string,
pk: string,
sk: string,
campaignId: string,
startKey: QueryCommandInput["ExclusiveStartKey"]) => {
console.log('FETCHING RECORDS')
const params: QueryCommandInput = {
TableName: tablename,
KeyConditionExpression: "pk = :pk and sk >= :sk",
FilterExpression: "campaignId = :campaignId",
ProjectionExpression: "sk, #url",
ExpressionAttributeNames: { "#url": "url" },
ExpressionAttributeValues: {
":pk": pk,
":sk": sk,
":campaignId": campaignId,
},
ExclusiveStartKey: startKey,
ScanIndexForward: true
}
console.log('params', JSON.stringify(params, null, 2))
const res = await ddbDocClient.send(new QueryCommand(params)) // as QueryCommandOutput
// console.log('QueryCommandOutput', res)
console.log('response metadata', res.$metadata)
console.log('response count', res.Count)
console.log('response scanned', res.ScannedCount)
return res
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment