Skip to content

Instantly share code, notes, and snippets.

@af-inet
Last active September 5, 2022 05:44
Show Gist options
  • Select an option

  • Save af-inet/48ec34faa6c2deb9b8fbece5c5599e78 to your computer and use it in GitHub Desktop.

Select an option

Save af-inet/48ec34faa6c2deb9b8fbece5c5599e78 to your computer and use it in GitHub Desktop.
A working example of DynamoDB Stream usage.
#!/usr/bin/env python
#
# A working example of DynamoDB Stream usage.
#
# Given a DynamoDB table name, this script will go find the first shard in the first stream and
# perform 1 iteration every 10 seconds. Resulting records will be dumped to stdout.
#
import argparse
import time
import boto3
import sys
def parse_args():
parser = argparse.ArgumentParser(description="Watch records flow through a DynamoDB Stream (polls stream every 10 seconds).")
parser.add_argument('table',
action='store',
type=str,
help='DynamoDB table you want to watch.')
return parser.parse_args()
def main():
# https://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html
# https://boto3.readthedocs.io/en/latest/reference/services/dynamodbstreams.html
# TODO: what if there are more than 100 streams?
args = parse_args()
client = boto3.client('dynamodbstreams')
response = client.list_streams(TableName=args.table, Limit=100)
if not response['Streams']:
print("Fatal: no stream found for DynamoDB Table '{table}'.\n"
"Ensure streams are enabled for your table:\n"
"\thttps://console.aws.amazon.com/dynamodb/home\n"
.format(table=args.table))
sys.exit(1)
# take the first stream we find... not sure if there are any caveats in doing this.
arn = response['Streams'][0]['StreamArn']
print("Using DynamoDB Stream ARN: {arn}".format(arn=arn))
response = client.describe_stream(StreamArn=arn, Limit=100)
if not response['StreamDescription']['Shards']:
print("Fatal: no shards found for DynamoDB stream '{arn}'.".format(arn=arn))
sys.exit(1)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
print("Using DynamoDB Stream shard id: {shard_id}".format(shard_id=shard_id))
response = client.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST')
shard_iterator = response['ShardIterator']
response = client.get_records(ShardIterator=shard_iterator, Limit=100)
print(response)
time.sleep(10)
while True:
shard_iterator = response['NextShardIterator']
response = client.get_records(
ShardIterator=shard_iterator,
Limit=100
)
print(response)
time.sleep(10)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment