Last active
September 30, 2025 21:46
-
-
Save asvinours/0a3f6d93a14e7f012f0c7e8cb6ec4cb8 to your computer and use it in GitHub Desktop.
aws-scripts for daily operations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "boto3", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| import argparse | |
| import datetime | |
| import json | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| def get_bucket_summary(client, bucket_name): | |
| summary = {} | |
| # Versioning | |
| try: | |
| resp = client.get_bucket_versioning(Bucket=bucket_name) | |
| summary["Versioning"] = resp.get("Status", "Disabled") | |
| except ClientError as e: | |
| summary["Versioning"] = f"Error: {e}" | |
| # Encryption | |
| try: | |
| resp = client.get_bucket_encryption(Bucket=bucket_name) | |
| rules = resp["ServerSideEncryptionConfiguration"]["Rules"] | |
| summary["Encryption"] = rules | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError": | |
| summary["Encryption"] = "None" | |
| else: | |
| summary["Encryption"] = f"Error: {e}" | |
| # Lifecycle | |
| try: | |
| resp = client.get_bucket_lifecycle_configuration(Bucket=bucket_name) | |
| summary["Lifecycle"] = resp["Rules"] | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "NoSuchLifecycleConfiguration": | |
| summary["Lifecycle"] = "None" | |
| else: | |
| summary["Lifecycle"] = f"Error: {e}" | |
| # Replication | |
| try: | |
| resp = client.get_bucket_replication(Bucket=bucket_name) | |
| summary["Replication"] = resp["ReplicationConfiguration"] | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "ReplicationConfigurationNotFoundError": | |
| summary["Replication"] = "None" | |
| else: | |
| summary["Replication"] = f"Error: {e}" | |
| # Ownership controls | |
| try: | |
| resp = client.get_bucket_ownership_controls(Bucket=bucket_name) | |
| summary["OwnershipControls"] = resp["OwnershipControls"]["Rules"] | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "OwnershipControlsNotFoundError": | |
| summary["OwnershipControls"] = "None" | |
| else: | |
| summary["OwnershipControls"] = f"Error: {e}" | |
| # ACL | |
| try: | |
| resp = client.get_bucket_acl(Bucket=bucket_name) | |
| summary["ACL"] = { | |
| "Owner": resp.get("Owner"), | |
| "Grants": resp.get("Grants"), | |
| } | |
| except ClientError as e: | |
| summary["ACL"] = f"Error: {e}" | |
| # Policy | |
| try: | |
| resp = client.get_bucket_policy(Bucket=bucket_name) | |
| summary["Policy"] = json.loads(resp["Policy"]) | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "NoSuchBucketPolicy": | |
| summary["Policy"] = "None" | |
| else: | |
| summary["Policy"] = f"Error: {e}" | |
| return summary | |
| def get_bucket_metrics(client, bucket_name): | |
| summary = {} | |
| # Object count & size (via CloudWatch) | |
| try: | |
| metrics = client.get_metric_statistics( | |
| Namespace="AWS/S3", | |
| MetricName="NumberOfObjects", | |
| Dimensions=[{"Name": "BucketName", "Value": bucket_name}, | |
| {"Name": "StorageType", "Value": "AllStorageTypes"}], | |
| StartTime=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2), | |
| EndTime=datetime.datetime.now(datetime.timezone.utc), | |
| Period=86400, | |
| Statistics=["Maximum"], | |
| ) | |
| datapoints = metrics.get("Datapoints", []) | |
| if datapoints: | |
| summary["ObjectCount"] = int(datapoints[-1]["Maximum"]) | |
| else: | |
| summary["ObjectCount"] = "Not available" | |
| except ClientError as e: | |
| summary["ObjectCount"] = f"Error: {e}" | |
| try: | |
| metrics = client.get_metric_statistics( | |
| Namespace="AWS/S3", | |
| MetricName="BucketSizeBytes", | |
| Dimensions=[{"Name": "BucketName", "Value": bucket_name}, | |
| {"Name": "StorageType", "Value": "StandardStorage"}], | |
| StartTime=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2), | |
| EndTime=datetime.datetime.now(datetime.timezone.utc), | |
| Period=86400, | |
| Statistics=["Maximum"], | |
| ) | |
| datapoints = metrics.get("Datapoints", []) | |
| if datapoints: | |
| summary["BucketSizeBytes"] = int(datapoints[-1]["Maximum"]) | |
| else: | |
| summary["BucketSizeBytes"] = "Not available" | |
| except ClientError as e: | |
| summary["BucketSizeBytes"] = f"Error: {e}" | |
| return summary | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description="Summarize S3 bucket configuration (lifecycle, replication, encryption, versioning, ACL, etc.)" | |
| ) | |
| parser.add_argument("--profile", required=True) | |
| parser.add_argument("--region", default="us-west-2", help="AWS region (default: us-west-2)") | |
| parser.add_argument("--metrics-only", action='store_true', default=False) | |
| parser.add_argument("--buckets", nargs='+', required=True, help="Name(s) of the S3 bucket") | |
| args = parser.parse_args() | |
| session = boto3.Session(profile_name=args.profile, region_name=args.region) | |
| s3 = session.client("s3") | |
| clw = session.client("cloudwatch") | |
| output = {} | |
| for bucket in args.buckets: | |
| summary = {} | |
| if not args.metrics_only: | |
| summary = get_bucket_summary(s3, bucket) | |
| metrics_summary = get_bucket_metrics(clw, bucket) | |
| output[bucket] = {**summary, **metrics_summary} | |
| print(json.dumps(output, indent=2)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "boto3", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| import argparse | |
| import time | |
| import boto3 | |
| def is_instance_id(value: str) -> bool: | |
| return value.startswith("i-") and len(value) in (10, 19) | |
| def describe_instances(ec2, filters): | |
| resp = ec2.describe_instances(Filters=filters) | |
| instances = [] | |
| for res in resp["Reservations"]: | |
| for inst in res["Instances"]: | |
| if inst["State"]["Name"] == "running": | |
| name = next((t["Value"] for t in inst.get("Tags", []) if t["Key"] == "Name"), "") | |
| instances.append({ | |
| "InstanceId": inst["InstanceId"], | |
| "Name": name, | |
| "PrivateIp": inst.get("PrivateIpAddress"), | |
| "PublicIp": inst.get("PublicIpAddress"), | |
| }) | |
| return instances | |
| def resolve_instances(ec2, target: str): | |
| filters = [] | |
| if is_instance_id(target): | |
| filters = [{"Name": "instance-id", "Values": [target]}] | |
| elif all(p.isdigit() or p == "." for p in target): | |
| filters = [{"Name": "private-ip-address", "Values": [target]}] | |
| instances = describe_instances(ec2, filters) | |
| if not instances: | |
| filters = [{"Name": "ip-address", "Values": [target]}] | |
| else: | |
| filters = [{"Name": "tag:Name", "Values": [f"*{target}*"]}] | |
| return describe_instances(ec2, filters) | |
| def run_commands(ssm, instance_ids: list[str], command: str): | |
| resp = ssm.send_command( | |
| InstanceIds=instance_ids, | |
| DocumentName="AWS-RunShellScript", | |
| Parameters={"commands": [command]}, | |
| ) | |
| command_id = resp["Command"]["CommandId"] | |
| while True: | |
| resp = ssm.list_command_invocations(CommandId=command_id) | |
| if len(resp.get("CommandInvocations", [])) < len(instance_ids): | |
| time.sleep(2) | |
| continue | |
| else: | |
| break | |
| results = {} | |
| for invocation in resp.get("CommandInvocations", []): | |
| while True: | |
| inv = ssm.get_command_invocation(CommandId=invocation["CommandId"], InstanceId=invocation["InstanceId"]) | |
| if inv["Status"] in ("Success", "Failed", "Cancelled", "TimedOut"): | |
| results[invocation["InstanceId"]] = { | |
| "Status": inv["Status"], | |
| "StdOut": inv.get("StandardOutputContent", ""), | |
| "StdErr": inv.get("StandardErrorContent", ""), | |
| } | |
| break | |
| time.sleep(2) | |
| return results | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description="Run shell commands on EC2 instances via SSM." | |
| ) | |
| parser.add_argument("--profile", required=True, help="AWS profile") | |
| parser.add_argument("--region", required=True, help="AWS region") | |
| parser.add_argument("target", help="Instance ID, Name substring, or IP") | |
| parser.add_argument("command", help="Shell command to run remotely") | |
| args = parser.parse_args() | |
| session = boto3.Session(profile_name=args.profile, region_name=args.region) | |
| ec2 = session.client("ec2") | |
| ssm = session.client("ssm") | |
| instances = resolve_instances(ec2, args.target) | |
| if not instances: | |
| print(f"No running instances matched {args.target}") | |
| exit(1) | |
| ids = [i["InstanceId"] for i in instances] | |
| print(f"Running {repr(args.command)} on {len(ids)} instance(s): {ids}") | |
| results = run_commands(ssm, ids, args.command) | |
| for iid, out in results.items(): | |
| print("=" * 60) | |
| print(f"Instance: {iid} ({next(i['Name'] for i in instances if i['InstanceId']==iid)})") | |
| print(f"Status: {out['Status']}") | |
| if out["StdOut"]: | |
| print("--- STDOUT ---") | |
| print(out["StdOut"]) | |
| if out["StdErr"]: | |
| print("--- STDERR ---") | |
| print(out["StdErr"]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| # ec2-ssm.sh — Start an AWS SSM Session Manager shell by name pattern, instance ID, or IP. | |
| # Usage: | |
| # ec2-ssm.sh [-p AWS_PROFILE] [-r AWS_REGION] <pattern|instance-id|ip> | |
| # Examples: | |
| # ec2-ssm.sh web-prod # match tag:Name contains 'web-prod' | |
| # ec2-ssm.sh i-0abc123def4567890 # direct instance id | |
| # ec2-ssm.sh 10.0.2.15 # private IP | |
| # ec2-ssm.sh 3.98.123.45 # public IP | |
| # | |
| # Requirements: AWS CLI v2 (configured), SSM Session Manager plugin. | |
| set -euo pipefail | |
| die() { echo "Error: $*" >&2; exit 1; } | |
| usage() { | |
| grep '^# ' "$0" | sed 's/^# //' | |
| exit 1 | |
| } | |
| PROFILE="" | |
| REGION="" | |
| RUN_AS="" | |
| # --- Parse flags --- | |
| while getopts ":p:r:u:h" opt; do | |
| case "$opt" in | |
| p) PROFILE=$OPTARG ;; | |
| r) REGION=$OPTARG ;; | |
| u) RUN_AS=$OPTARG ;; | |
| h) usage ;; | |
| \?) die "Unknown option -$OPTARG (use -h for help)" ;; | |
| :) die "Option -$OPTARG requires an argument" ;; | |
| esac | |
| done | |
| shift $((OPTIND-1)) | |
| [[ $# -ge 1 ]] || usage | |
| TARGET_INPUT="$*" | |
| aws_cli=(aws) | |
| [[ -n "$PROFILE" ]] && aws_cli+=(--profile "$PROFILE") | |
| [[ -n "$REGION" ]] && aws_cli+=(--region "$REGION") | |
| # --- Helpers --- | |
| is_instance_id() { | |
| [[ "$1" =~ ^i-([0-9a-f]{8}|[0-9a-f]{17})$ ]] | |
| } | |
| is_ipv4() { | |
| local ip="$1" | |
| [[ "$ip" =~ ^([0-9]{1,3}\.){3}[0-9]{1,3}$ ]] || return 1 | |
| IFS='.' read -r a b c d <<< "$ip" | |
| for n in "$a" "$b" "$c" "$d"; do | |
| (( n >= 0 && n <= 255 )) || return 1 | |
| done | |
| } | |
| # Describe instances and return tab-delimited rows: | |
| # InstanceId<TAB>Name<TAB>PrivateIp<TAB>PublicIp | |
| describe_instances() { | |
| # shellcheck disable=SC2145 | |
| "${aws_cli[@]}" ec2 describe-instances "$@" \ | |
| --query 'Reservations[].Instances[].[ | |
| InstanceId, | |
| (Tags[?Key==`Name`].Value | [0]) || ``, | |
| PrivateIpAddress || ``, | |
| PublicIpAddress || ``]' \ | |
| --output text | |
| } | |
| choose_interactively() { | |
| local lines=("$@") | |
| echo "Multiple instances matched:" >&2 | |
| local i=1 | |
| for row in "${lines[@]}"; do | |
| IFS=$'\t' read -r iid name priv pub <<< "$row" | |
| printf ' [%d] %s name="%s" private=%s public=%s\n' "$i" "$iid" "${name:--}" "${priv:--}" "${pub:--}" >&2 | |
| ((i++)) | |
| done | |
| echo -n "Enter number: " >&2 | |
| read -r sel | |
| [[ "$sel" =~ ^[0-9]+$ ]] || die "Invalid selection." >&2 | |
| (( sel >= 1 && sel < i )) || die "Selection out of range." >&2 | |
| echo "${lines[$((sel-1))]}" | |
| } | |
| resolve_instance_id() { | |
| local input="$1" | |
| local rows=() | |
| if is_instance_id "$input"; then | |
| while IFS= read -r line; do | |
| rows+=("$line") | |
| done < <(describe_instances --filters "Name=instance-id,Values=$input" "Name=instance-state-name,Values=running") | |
| elif is_ipv4 "$input"; then | |
| while IFS= read -r line; do | |
| rows+=("$line") | |
| done < <(describe_instances --filters "Name=private-ip-address,Values=$input" "Name=instance-state-name,Values=running") | |
| if (( ${#rows[@]} == 0 )); then | |
| while IFS= read -r line; do | |
| rows+=("$line") | |
| done < <(describe_instances --filters "Name=ip-address,Values=$input" "Name=instance-state-name,Values=running") | |
| fi | |
| else | |
| while IFS= read -r line; do | |
| rows+=("$line") | |
| done < <(describe_instances --filters "Name=tag:Name,Values=*${input}*" "Name=instance-state-name,Values=running") | |
| fi | |
| (( ${#rows[@]} > 0 )) || die "No running instances matched '$input'." | |
| local chosen | |
| if (( ${#rows[@]} == 1 )); then | |
| chosen="${rows[0]}" | |
| else | |
| chosen="$(choose_interactively "${rows[@]}")" | |
| fi | |
| IFS=$'\t' read -r iid name priv pub <<< "$chosen" | |
| echo "$iid" | |
| } | |
| # --- Main --- | |
| instance_id="$(resolve_instance_id "$TARGET_INPUT")" | |
| ssm_args=("") | |
| [[ -n "$RUN_AS" ]] && ssm_args+=(--parameters '{"runAs":["'${RUN_AS}'"]}') | |
| echo "Starting SSM session to ${instance_id} ..." | |
| # Optional: you can pass --document-name and --parameters for port forwarding, etc. | |
| "${aws_cli[@]}" ssm start-session --target "$instance_id" "${ssm_args[@]}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| set -eEo pipefail | |
| while getopts 'a:p:b:' opt; do | |
| case "$opt" in | |
| a ) | |
| AWS_ACCOUNT="${OPTARG}"; | |
| ;; | |
| p ) | |
| AWS_PROFILE="${OPTARG}"; | |
| ;; | |
| --) shift; | |
| break | |
| ;; | |
| esac | |
| done | |
| shift "$(($OPTIND -1))" | |
| _jq64() { echo "$1" | base64 -D | jq -cr "$2"; } | |
| _jq() { echo "$1" | jq -cr "$2"; } | |
| echo "AccountId,UserName,CreateDate,ConsoleAccess,PasswordLastUsed,MFADevices,AccessKey1,AccessKey2" | |
| export AWS_REGION="us-east-1" | |
| if [[ -n "$AWS_PROFILE" ]]; | |
| then | |
| PROFILES="${AWS_PROFILE}" | |
| elif [[ -n "$AWS_ACCOUNT" ]]; | |
| then | |
| PROFILES="${AWS_ACCOUNT}:AWSReadOnlyAccess" | |
| else | |
| PROFILES=$(aws-sso list | grep -F AWSReadOnlyAccess | awk '{ print $7 }' | tail -n +2 | sort) | |
| fi | |
| for PROFILE in $PROFILES; | |
| do | |
| export AWS_PROFILE="$PROFILE" | |
| echo "=> Checking account '${PROFILE}'" >/dev/stderr | |
| for U in $(aws iam list-users | jq -r '.Users[] | @base64'); | |
| do | |
| USERNAME=$(_jq64 "$U" ".UserName") | |
| CREATEDATE=$(_jq64 "$U" ".CreateDate") | |
| PASSWORDLASTUSED=$(_jq64 "$U" ".PasswordLastUsed") | |
| CONSOLE=$(aws iam get-login-profile --user-name "$USERNAME" > /dev/null 2>&1 && echo "Yes" || echo "No") | |
| MFA_DEVICES=$(aws iam list-mfa-devices --user-name "$USERNAME" | jq -r '.MFADevices | length') | |
| KEYS_STR="" | |
| for KEY in $(aws iam list-access-keys --user-name "$USERNAME" | jq -r '.AccessKeyMetadata[] | @base64'); | |
| do | |
| ACCESSKEY_ID=$(_jq64 "$KEY" ".AccessKeyId") | |
| KEY_STATUS=$(_jq64 "$KEY" ".Status") | |
| KEYS_STR="${KEYS_STR}$(aws iam get-access-key-last-used --access-key-id "$ACCESSKEY_ID" | jq --arg "KID" "$ACCESSKEY_ID" --arg "KSTS" "$KEY_STATUS" -cr '.AccessKeyLastUsed | [$KID, $KSTS, .LastUsedDate, .ServiceName] | @sh')," | |
| done | |
| echo "$(echo "$PROFILE" | cut -d':' -f1 ),${USERNAME},${CREATEDATE},${CONSOLE},${PASSWORDLASTUSED},${MFA_DEVICES},${KEYS_STR}" | |
| done | |
| done; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "boto3", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import re | |
| import boto3 | |
| def get_load_balancers_with_tag(client, tag_filters: dict[str, list[str] | None], **kwargs): | |
| lbs = [] | |
| paginator = client.get_paginator("describe_load_balancers") | |
| describe_params = {} | |
| if kwargs.get("name"): | |
| describe_params["Names"] = [kwargs["name"]] | |
| for page in paginator.paginate(**describe_params): | |
| lbs.extend(page["LoadBalancers"]) | |
| target_keys_norm = [k.lower() for k in tag_filters.keys()] | |
| if not kwargs and not tag_filters: | |
| return lbs | |
| name_pattern = None | |
| if kwargs.get("name_pattern"): | |
| name_pattern = re.compile(kwargs["name_pattern"], re.IGNORECASE) | |
| filtered = [] | |
| for lb in lbs: | |
| arn = lb["LoadBalancerArn"] | |
| if kwargs.get("ip_address_type") and kwargs["ip_address_type"] != lb["IpAddressType"]: | |
| continue | |
| if kwargs.get("type") and kwargs["type"] != lb["Type"]: | |
| continue | |
| if kwargs.get("scheme") and kwargs["scheme"] != lb["Scheme"]: | |
| continue | |
| if name_pattern and not name_pattern.search(lb["LoadBalancerName"]): | |
| continue | |
| if tag_filters: | |
| tag_desc = client.describe_tags(ResourceArns=[arn]) | |
| tags = tag_desc["TagDescriptions"][0].get("Tags", []) | |
| for tag in tags: | |
| if tag["Key"].lower() in target_keys_norm: | |
| if tag["Key"].lower() in target_keys_norm and any(v.lower() == tag["Value"].lower() for v in tag_filters[tag["Key"].lower()]): | |
| filtered.append(lb) | |
| break | |
| else: | |
| filtered.append(lb) | |
| return filtered | |
| def find_lbs_by_instance_or_ip(client, instance_id: str | None, ip: str | None): | |
| matches = set() | |
| tg_paginator = client.get_paginator("describe_target_groups") | |
| tgs = [] | |
| for page in tg_paginator.paginate(): | |
| tgs.extend(page["TargetGroups"]) | |
| for tg in tgs: | |
| arn = tg["TargetGroupArn"] | |
| th = client.describe_target_health(TargetGroupArn=arn) | |
| for desc in th.get("TargetHealthDescriptions", []): | |
| target = desc["Target"] | |
| if (instance_id and target.get("Id") == instance_id) or (ip and target.get("Id") == ip): | |
| lb_arns = tg.get("LoadBalancerArns", []) | |
| for lb_arn in lb_arns: | |
| matches.add(lb_arn) | |
| if not matches: | |
| return [] | |
| resp = client.describe_load_balancers(LoadBalancerArns=list(matches)) | |
| return resp["LoadBalancers"] | |
| def get_listener_rules(client, listener_arn: str): | |
| resp = client.describe_rules(ListenerArn=listener_arn) | |
| return resp.get("Rules", []) | |
| def get_listeners(client, lb_arn: str): | |
| listeners = [] | |
| resp = client.describe_listeners(LoadBalancerArn=lb_arn) | |
| for listener in resp.get("Listeners", []): | |
| listener["Rules"] = get_listener_rules(client, listener["ListenerArn"]) | |
| listener.pop("DefaultActions", None) | |
| listeners.append(listener) | |
| return listeners | |
| def get_security_groups(client, sg_ids: list[str]): | |
| resp = client.describe_security_groups(GroupIds=sg_ids) | |
| return resp.get("SecurityGroups", []) | |
| def parse_tags_filters(tags_arg: list[str] | None) -> dict[str, list[str] | None]: | |
| filters = {} | |
| if not tags_arg: | |
| return {} | |
| for tag_arg in tags_arg: | |
| if "=" in tag_arg: | |
| k, v = tag_arg.split("=", 1) | |
| filters.setdefault(k.strip(), []) | |
| filters[k.strip()].append(v.strip()) | |
| else: | |
| filters[tag_arg.strip()] = None | |
| return filters | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description="Find ALBs/NLBs by tag, name, or instance/IP association." | |
| ) | |
| parser.add_argument("--profile", required=True, help="AWS profile") | |
| parser.add_argument("--region", required=True, help="AWS region") | |
| parser.add_argument("--name", required=False, help="Name filter") | |
| parser.add_argument("--name-pattern", required=False, help="Name regex filter") | |
| parser.add_argument("--scheme", required=False, choices=["internal", "internet-facing"], help="Scheme filter") | |
| parser.add_argument("--type", required=False, choices=["application", "network"], help="Type filter") | |
| parser.add_argument("--ip-address-type", required=False, choices=["ipv4", "dualstack"], help="IpAddressType filter") | |
| parser.add_argument("--tag", nargs='+', required=False, help="Tag filter: KEY or KEY=VALUE") | |
| parser.add_argument("--target-instance-id", required=False, help="EC2 instance ID to search") | |
| parser.add_argument("--target-ip", required=False, help="Target IP address to search") | |
| args = parser.parse_args() | |
| session = boto3.Session(profile_name=args.profile, region_name=args.region) | |
| elb_client = session.client("elbv2") | |
| ec2_client = session.client("ec2") | |
| tag_filters = parse_tags_filters(args.tag) | |
| if args.target_instance_id or args.target_ip: | |
| matches = find_lbs_by_instance_or_ip(elb_client, args.target_instance_id, args.target_ip) | |
| else: | |
| matches = get_load_balancers_with_tag( | |
| elb_client, tag_filters, | |
| name=args.name, | |
| scheme=args.scheme, | |
| type=args.type, | |
| name_pattern=args.name_pattern, | |
| ip_address_type=args.ip_address_type, | |
| ) | |
| for lb in matches: | |
| lb["Listeners"] = get_listeners(elb_client, lb["LoadBalancerArn"]) | |
| lb["SecurityGroups"] = get_security_groups(ec2_client, lb["SecurityGroups"]) | |
| print(json.dumps(matches, indent=2, default=str)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| set -eEo pipefail | |
| while getopts 'a:p:b:' opt; do | |
| case "$opt" in | |
| a ) | |
| AWS_ACCOUNT="${OPTARG}"; | |
| ;; | |
| p ) | |
| AWS_PROFILE="${OPTARG}"; | |
| ;; | |
| b ) | |
| BUCKET_NAME="${OPTARG}"; | |
| ;; | |
| --) shift; | |
| break | |
| ;; | |
| esac | |
| done | |
| shift "$(($OPTIND -1))" | |
| if [[ -n "$AWS_PROFILE" ]]; | |
| then | |
| PROFILES="${AWS_PROFILE}" | |
| elif [[ -n "$AWS_ACCOUNT" ]]; | |
| then | |
| PROFILES="${AWS_ACCOUNT}:AWSReadOnlyAccess" | |
| else | |
| PROFILES=$(aws-sso list | grep -F AWSReadOnlyAccess | awk '{ print $7 }' | tail -n +2 | sort) | |
| fi | |
| for PROFILE in $PROFILES; | |
| do | |
| export AWS_PROFILE="$PROFILE" | |
| AWS_ACCOUNT="$(echo "$PROFILE" | cut -d':' -f1 )" | |
| echo "=> Checking account '${AWS_ACCOUNT}'" >/dev/stderr | |
| aws s3api list-buckets | jq -r --arg "BNP" "$BUCKET_NAME" --arg "AID" "$AWS_ACCOUNT" '.Buckets[] | select(($BNP == "") or (.Name | contains($BNP))) | [$AID, .Name] | @csv' | |
| done; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "boto3", | |
| # ] | |
| # /// | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| def enrich_ip_sets(client, details, scope): | |
| """Look for IPSetReferenceStatements in WebACL rules and fetch their contents.""" | |
| if "Rules" not in details: | |
| return | |
| for rule in details["Rules"]: | |
| stmt = rule.get("Statement", {}) | |
| resolve_ip_sets_in_statement(client, stmt, scope) | |
| def resolve_ip_sets_in_statement(client, stmt, scope): | |
| """Recursively resolve statements to find IPSetReferenceStatement entries.""" | |
| if "IPSetReferenceStatement" in stmt: | |
| ref = stmt["IPSetReferenceStatement"] | |
| arn = ref.get("ARN") | |
| try: | |
| ipset = client.get_ip_set( | |
| Scope=scope, | |
| Id=arn.split("/")[-1], | |
| Name=arn.split("/")[-2], | |
| ) | |
| # attach addresses to the statement | |
| ref["Addresses"] = sorted(ipset["IPSet"].get("Addresses", [])) | |
| except ClientError as e: | |
| ref["Error"] = f"Error fetching IPSet: {e}" | |
| if "RegexPatternSetReferenceStatement" in stmt: | |
| ref = stmt["RegexPatternSetReferenceStatement"] | |
| arn = ref.get("ARN") | |
| try: | |
| regexset = client.get_regex_pattern_set( | |
| Scope=scope, | |
| Id=arn.split("/")[-1], | |
| Name=arn.split("/")[-2], | |
| ) | |
| ref["RegexList"] = regexset["RegexPatternSet"].get("RegularExpressionList", []) | |
| except ClientError as e: | |
| ref["Error"] = f"Error fetching RegexPatternSet: {e}" | |
| # Handle compound statements (OR, AND, NOT, etc.) | |
| for key in ("AndStatement", "OrStatement"): | |
| if key in stmt: | |
| for s in stmt[key]["Statements"]: | |
| resolve_ip_sets_in_statement(client, s, scope) | |
| if "NotStatement" in stmt: | |
| resolve_ip_sets_in_statement(client, stmt["NotStatement"]["Statement"]) | |
| def list_web_acls(client, scope, web_acl_names): | |
| try: | |
| yield from [ | |
| acl for acl in client.list_web_acls(Scope=scope).get("WebACLs", []) if (web_acl_names and acl["Name"] in web_acl_names) or (not web_acl_names) | |
| ] | |
| except ClientError as e: | |
| print(f"Error listing WebACLs: {e}") | |
| return [] | |
| def get_web_acl_details(client, acl, scope): | |
| details = {} | |
| # WebACL core definition | |
| try: | |
| resp = client.get_web_acl( | |
| Scope=scope, | |
| Name=acl["Name"], | |
| Id=acl["Id"], | |
| ) | |
| details = resp.get("WebACL", {}) | |
| enrich_ip_sets(client, details, scope) | |
| except ClientError as e: | |
| details["Error"] = f"Error: {e}" | |
| # Logging configuration | |
| try: | |
| resp = client.get_logging_configuration(ResourceArn=acl["ARN"]) | |
| details["LoggingConfiguration"] = resp.get("LoggingConfiguration") | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "WAFNonexistentItemException": | |
| details["LoggingConfiguration"] = "None" | |
| else: | |
| details["LoggingConfiguration"] = f"Error: {e}" | |
| return details | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser( | |
| description="Summarize AWS WAF WebACLs (config + logging)." | |
| ) | |
| parser.add_argument("--profile", required=True, help="AWS profile") | |
| parser.add_argument("--region", required=True, help="AWS region") | |
| parser.add_argument("--scope", default="REGIONAL", choices=["REGIONAL", "CLOUDFRONT"], help="WAF scope (REGIONAL or CLOUDFRONT)") | |
| parser.add_argument("--web-acl", nargs='+', required=False, help="Name(s) of the WebACL") | |
| args = parser.parse_args() | |
| session = boto3.Session(profile_name=args.profile, region_name=args.region) | |
| waf_client = session.client("wafv2") | |
| if args.scope == "CLOUDFRONT": | |
| region = "us-east-1" | |
| else: | |
| region = args.region | |
| output = {} | |
| for acl in list_web_acls(waf_client, args.scope, args.web_acl): | |
| details = get_web_acl_details(waf_client, acl, args.scope) | |
| output[acl["Name"]] = details | |
| print(json.dumps(output, indent=2, default=str)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment