Last active
September 5, 2022 05:44
-
-
Save af-inet/48ec34faa6c2deb9b8fbece5c5599e78 to your computer and use it in GitHub Desktop.
A working example of DynamoDB Stream usage.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # | |
| # A working example of DynamoDB Stream usage. | |
| # | |
| # Given a DynamoDB table name, this script will go find the first shard in the first stream and | |
| # perform 1 iteration every 10 seconds. Resulting records will be dumped to stdout. | |
| # | |
| import argparse | |
| import time | |
| import boto3 | |
| import sys | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description="Watch records flow through a DynamoDB Stream (polls stream every 10 seconds).") | |
| parser.add_argument('table', | |
| action='store', | |
| type=str, | |
| help='DynamoDB table you want to watch.') | |
| return parser.parse_args() | |
| def main(): | |
| # https://boto3.readthedocs.io/en/latest/reference/services/dynamodb.html | |
| # https://boto3.readthedocs.io/en/latest/reference/services/dynamodbstreams.html | |
| # TODO: what if there are more than 100 streams? | |
| args = parse_args() | |
| client = boto3.client('dynamodbstreams') | |
| response = client.list_streams(TableName=args.table, Limit=100) | |
| if not response['Streams']: | |
| print("Fatal: no stream found for DynamoDB Table '{table}'.\n" | |
| "Ensure streams are enabled for your table:\n" | |
| "\thttps://console.aws.amazon.com/dynamodb/home\n" | |
| .format(table=args.table)) | |
| sys.exit(1) | |
| # take the first stream we find... not sure if there are any caveats in doing this. | |
| arn = response['Streams'][0]['StreamArn'] | |
| print("Using DynamoDB Stream ARN: {arn}".format(arn=arn)) | |
| response = client.describe_stream(StreamArn=arn, Limit=100) | |
| if not response['StreamDescription']['Shards']: | |
| print("Fatal: no shards found for DynamoDB stream '{arn}'.".format(arn=arn)) | |
| sys.exit(1) | |
| shard_id = response['StreamDescription']['Shards'][0]['ShardId'] | |
| print("Using DynamoDB Stream shard id: {shard_id}".format(shard_id=shard_id)) | |
| response = client.get_shard_iterator(StreamArn=arn, ShardId=shard_id, ShardIteratorType='LATEST') | |
| shard_iterator = response['ShardIterator'] | |
| response = client.get_records(ShardIterator=shard_iterator, Limit=100) | |
| print(response) | |
| time.sleep(10) | |
| while True: | |
| shard_iterator = response['NextShardIterator'] | |
| response = client.get_records( | |
| ShardIterator=shard_iterator, | |
| Limit=100 | |
| ) | |
| print(response) | |
| time.sleep(10) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment