Skip to content

Instantly share code, notes, and snippets.

@khai-nguyen-dinh
Created November 29, 2021 09:45
Show Gist options
  • Select an option

  • Save khai-nguyen-dinh/6aa1d8f28b862770b0c0d305a50aa327 to your computer and use it in GitHub Desktop.

Select an option

Save khai-nguyen-dinh/6aa1d8f28b862770b0c0d305a50aa327 to your computer and use it in GitHub Desktop.
AWS monitor events using lambda and cloudtrail.
import json
import boto3
import gzip
ignore_event = ['Describe', 'List', 'Get']
topic_arn = "<sns arn>"
sns = boto3.client("sns")
s3_client = boto3.client('s3')
def lambda_handler(event, context):
record = event['Records'][0]
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
filename = key.split('/')[-1]
download_path = '/tmp/{}'.format(filename)
s3_client.download_file(bucket, key, download_path)
with gzip.open(download_path, mode="rt") as f:
logs = json.loads(f.read())
for element in logs['Records']:
if element['userIdentity'].get('userName'):
check = 0
for e in ignore_event:
if e in element['eventName']:
check = check + 1
if check == 0:
final_status(element['eventName'], element['eventSource'], element['eventTime'],
element['userIdentity'].get('userName'), element['sourceIPAddress'],
element['userAgent'])
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
def final_status(eventName, eventSource, eventTime, userName, sourceIPAddress, userAgent):
sub = "AWS Notification Actions."
msg = """
AWS Activity Logs.
------------------------------------------------------------------------------------
Summary of the action:
------------------------------------------------------------------------------------
EventName : {eventName}
EventSource : {eventSource}
EventTime : {eventTime}
User : {userName}
SourceIPAddress : {sourceIPAddress}
UserAgent : {userAgent}
------------------------------------------------------------------------------------
""".format(eventName=eventName, eventSource=eventSource, eventTime=eventTime, userName=userName,
sourceIPAddress=sourceIPAddress, userAgent=userAgent)
response = sns.publish(
TopicArn=topic_arn,
Message=msg,
Subject=sub
)
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment