Skip to content

Instantly share code, notes, and snippets.

@asvinours
Last active September 30, 2025 21:46
Show Gist options
  • Select an option

  • Save asvinours/0a3f6d93a14e7f012f0c7e8cb6ec4cb8 to your computer and use it in GitHub Desktop.

Select an option

Save asvinours/0a3f6d93a14e7f012f0c7e8cb6ec4cb8 to your computer and use it in GitHub Desktop.
aws-scripts for daily operations
# /// script
# dependencies = [
# "boto3",
# ]
# ///
from __future__ import annotations
import argparse
import datetime
import json
import boto3
from botocore.exceptions import ClientError
def get_bucket_summary(client, bucket_name):
summary = {}
# Versioning
try:
resp = client.get_bucket_versioning(Bucket=bucket_name)
summary["Versioning"] = resp.get("Status", "Disabled")
except ClientError as e:
summary["Versioning"] = f"Error: {e}"
# Encryption
try:
resp = client.get_bucket_encryption(Bucket=bucket_name)
rules = resp["ServerSideEncryptionConfiguration"]["Rules"]
summary["Encryption"] = rules
except ClientError as e:
if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError":
summary["Encryption"] = "None"
else:
summary["Encryption"] = f"Error: {e}"
# Lifecycle
try:
resp = client.get_bucket_lifecycle_configuration(Bucket=bucket_name)
summary["Lifecycle"] = resp["Rules"]
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchLifecycleConfiguration":
summary["Lifecycle"] = "None"
else:
summary["Lifecycle"] = f"Error: {e}"
# Replication
try:
resp = client.get_bucket_replication(Bucket=bucket_name)
summary["Replication"] = resp["ReplicationConfiguration"]
except ClientError as e:
if e.response["Error"]["Code"] == "ReplicationConfigurationNotFoundError":
summary["Replication"] = "None"
else:
summary["Replication"] = f"Error: {e}"
# Ownership controls
try:
resp = client.get_bucket_ownership_controls(Bucket=bucket_name)
summary["OwnershipControls"] = resp["OwnershipControls"]["Rules"]
except ClientError as e:
if e.response["Error"]["Code"] == "OwnershipControlsNotFoundError":
summary["OwnershipControls"] = "None"
else:
summary["OwnershipControls"] = f"Error: {e}"
# ACL
try:
resp = client.get_bucket_acl(Bucket=bucket_name)
summary["ACL"] = {
"Owner": resp.get("Owner"),
"Grants": resp.get("Grants"),
}
except ClientError as e:
summary["ACL"] = f"Error: {e}"
# Policy
try:
resp = client.get_bucket_policy(Bucket=bucket_name)
summary["Policy"] = json.loads(resp["Policy"])
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchBucketPolicy":
summary["Policy"] = "None"
else:
summary["Policy"] = f"Error: {e}"
return summary
def get_bucket_metrics(client, bucket_name):
summary = {}
# Object count & size (via CloudWatch)
try:
metrics = client.get_metric_statistics(
Namespace="AWS/S3",
MetricName="NumberOfObjects",
Dimensions=[{"Name": "BucketName", "Value": bucket_name},
{"Name": "StorageType", "Value": "AllStorageTypes"}],
StartTime=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2),
EndTime=datetime.datetime.now(datetime.timezone.utc),
Period=86400,
Statistics=["Maximum"],
)
datapoints = metrics.get("Datapoints", [])
if datapoints:
summary["ObjectCount"] = int(datapoints[-1]["Maximum"])
else:
summary["ObjectCount"] = "Not available"
except ClientError as e:
summary["ObjectCount"] = f"Error: {e}"
try:
metrics = client.get_metric_statistics(
Namespace="AWS/S3",
MetricName="BucketSizeBytes",
Dimensions=[{"Name": "BucketName", "Value": bucket_name},
{"Name": "StorageType", "Value": "StandardStorage"}],
StartTime=datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2),
EndTime=datetime.datetime.now(datetime.timezone.utc),
Period=86400,
Statistics=["Maximum"],
)
datapoints = metrics.get("Datapoints", [])
if datapoints:
summary["BucketSizeBytes"] = int(datapoints[-1]["Maximum"])
else:
summary["BucketSizeBytes"] = "Not available"
except ClientError as e:
summary["BucketSizeBytes"] = f"Error: {e}"
return summary
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Summarize S3 bucket configuration (lifecycle, replication, encryption, versioning, ACL, etc.)"
)
parser.add_argument("--profile", required=True)
parser.add_argument("--region", default="us-west-2", help="AWS region (default: us-west-2)")
parser.add_argument("--metrics-only", action='store_true', default=False)
parser.add_argument("--buckets", nargs='+', required=True, help="Name(s) of the S3 bucket")
args = parser.parse_args()
session = boto3.Session(profile_name=args.profile, region_name=args.region)
s3 = session.client("s3")
clw = session.client("cloudwatch")
output = {}
for bucket in args.buckets:
summary = {}
if not args.metrics_only:
summary = get_bucket_summary(s3, bucket)
metrics_summary = get_bucket_metrics(clw, bucket)
output[bucket] = {**summary, **metrics_summary}
print(json.dumps(output, indent=2))
# /// script
# dependencies = [
# "boto3",
# ]
# ///
from __future__ import annotations
import argparse
import time
import boto3
def is_instance_id(value: str) -> bool:
return value.startswith("i-") and len(value) in (10, 19)
def describe_instances(ec2, filters):
resp = ec2.describe_instances(Filters=filters)
instances = []
for res in resp["Reservations"]:
for inst in res["Instances"]:
if inst["State"]["Name"] == "running":
name = next((t["Value"] for t in inst.get("Tags", []) if t["Key"] == "Name"), "")
instances.append({
"InstanceId": inst["InstanceId"],
"Name": name,
"PrivateIp": inst.get("PrivateIpAddress"),
"PublicIp": inst.get("PublicIpAddress"),
})
return instances
def resolve_instances(ec2, target: str):
filters = []
if is_instance_id(target):
filters = [{"Name": "instance-id", "Values": [target]}]
elif all(p.isdigit() or p == "." for p in target):
filters = [{"Name": "private-ip-address", "Values": [target]}]
instances = describe_instances(ec2, filters)
if not instances:
filters = [{"Name": "ip-address", "Values": [target]}]
else:
filters = [{"Name": "tag:Name", "Values": [f"*{target}*"]}]
return describe_instances(ec2, filters)
def run_commands(ssm, instance_ids: list[str], command: str):
resp = ssm.send_command(
InstanceIds=instance_ids,
DocumentName="AWS-RunShellScript",
Parameters={"commands": [command]},
)
command_id = resp["Command"]["CommandId"]
while True:
resp = ssm.list_command_invocations(CommandId=command_id)
if len(resp.get("CommandInvocations", [])) < len(instance_ids):
time.sleep(2)
continue
else:
break
results = {}
for invocation in resp.get("CommandInvocations", []):
while True:
inv = ssm.get_command_invocation(CommandId=invocation["CommandId"], InstanceId=invocation["InstanceId"])
if inv["Status"] in ("Success", "Failed", "Cancelled", "TimedOut"):
results[invocation["InstanceId"]] = {
"Status": inv["Status"],
"StdOut": inv.get("StandardOutputContent", ""),
"StdErr": inv.get("StandardErrorContent", ""),
}
break
time.sleep(2)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run shell commands on EC2 instances via SSM."
)
parser.add_argument("--profile", required=True, help="AWS profile")
parser.add_argument("--region", required=True, help="AWS region")
parser.add_argument("target", help="Instance ID, Name substring, or IP")
parser.add_argument("command", help="Shell command to run remotely")
args = parser.parse_args()
session = boto3.Session(profile_name=args.profile, region_name=args.region)
ec2 = session.client("ec2")
ssm = session.client("ssm")
instances = resolve_instances(ec2, args.target)
if not instances:
print(f"No running instances matched {args.target}")
exit(1)
ids = [i["InstanceId"] for i in instances]
print(f"Running {repr(args.command)} on {len(ids)} instance(s): {ids}")
results = run_commands(ssm, ids, args.command)
for iid, out in results.items():
print("=" * 60)
print(f"Instance: {iid} ({next(i['Name'] for i in instances if i['InstanceId']==iid)})")
print(f"Status: {out['Status']}")
if out["StdOut"]:
print("--- STDOUT ---")
print(out["StdOut"])
if out["StdErr"]:
print("--- STDERR ---")
print(out["StdErr"])
#!/usr/bin/env bash
# ec2-ssm.sh — Start an AWS SSM Session Manager shell by name pattern, instance ID, or IP.
# Usage:
# ec2-ssm.sh [-p AWS_PROFILE] [-r AWS_REGION] <pattern|instance-id|ip>
# Examples:
# ec2-ssm.sh web-prod # match tag:Name contains 'web-prod'
# ec2-ssm.sh i-0abc123def4567890 # direct instance id
# ec2-ssm.sh 10.0.2.15 # private IP
# ec2-ssm.sh 3.98.123.45 # public IP
#
# Requirements: AWS CLI v2 (configured), SSM Session Manager plugin.
set -euo pipefail
die() { echo "Error: $*" >&2; exit 1; }
usage() {
grep '^# ' "$0" | sed 's/^# //'
exit 1
}
PROFILE=""
REGION=""
RUN_AS=""
# --- Parse flags ---
while getopts ":p:r:u:h" opt; do
case "$opt" in
p) PROFILE=$OPTARG ;;
r) REGION=$OPTARG ;;
u) RUN_AS=$OPTARG ;;
h) usage ;;
\?) die "Unknown option -$OPTARG (use -h for help)" ;;
:) die "Option -$OPTARG requires an argument" ;;
esac
done
shift $((OPTIND-1))
[[ $# -ge 1 ]] || usage
TARGET_INPUT="$*"
aws_cli=(aws)
[[ -n "$PROFILE" ]] && aws_cli+=(--profile "$PROFILE")
[[ -n "$REGION" ]] && aws_cli+=(--region "$REGION")
# --- Helpers ---
is_instance_id() {
[[ "$1" =~ ^i-([0-9a-f]{8}|[0-9a-f]{17})$ ]]
}
is_ipv4() {
local ip="$1"
[[ "$ip" =~ ^([0-9]{1,3}\.){3}[0-9]{1,3}$ ]] || return 1
IFS='.' read -r a b c d <<< "$ip"
for n in "$a" "$b" "$c" "$d"; do
(( n >= 0 && n <= 255 )) || return 1
done
}
# Describe instances and return tab-delimited rows:
# InstanceId<TAB>Name<TAB>PrivateIp<TAB>PublicIp
describe_instances() {
# shellcheck disable=SC2145
"${aws_cli[@]}" ec2 describe-instances "$@" \
--query 'Reservations[].Instances[].[
InstanceId,
(Tags[?Key==`Name`].Value | [0]) || ``,
PrivateIpAddress || ``,
PublicIpAddress || ``]' \
--output text
}
choose_interactively() {
local lines=("$@")
echo "Multiple instances matched:" >&2
local i=1
for row in "${lines[@]}"; do
IFS=$'\t' read -r iid name priv pub <<< "$row"
printf ' [%d] %s name="%s" private=%s public=%s\n' "$i" "$iid" "${name:--}" "${priv:--}" "${pub:--}" >&2
((i++))
done
echo -n "Enter number: " >&2
read -r sel
[[ "$sel" =~ ^[0-9]+$ ]] || die "Invalid selection." >&2
(( sel >= 1 && sel < i )) || die "Selection out of range." >&2
echo "${lines[$((sel-1))]}"
}
resolve_instance_id() {
local input="$1"
local rows=()
if is_instance_id "$input"; then
while IFS= read -r line; do
rows+=("$line")
done < <(describe_instances --filters "Name=instance-id,Values=$input" "Name=instance-state-name,Values=running")
elif is_ipv4 "$input"; then
while IFS= read -r line; do
rows+=("$line")
done < <(describe_instances --filters "Name=private-ip-address,Values=$input" "Name=instance-state-name,Values=running")
if (( ${#rows[@]} == 0 )); then
while IFS= read -r line; do
rows+=("$line")
done < <(describe_instances --filters "Name=ip-address,Values=$input" "Name=instance-state-name,Values=running")
fi
else
while IFS= read -r line; do
rows+=("$line")
done < <(describe_instances --filters "Name=tag:Name,Values=*${input}*" "Name=instance-state-name,Values=running")
fi
(( ${#rows[@]} > 0 )) || die "No running instances matched '$input'."
local chosen
if (( ${#rows[@]} == 1 )); then
chosen="${rows[0]}"
else
chosen="$(choose_interactively "${rows[@]}")"
fi
IFS=$'\t' read -r iid name priv pub <<< "$chosen"
echo "$iid"
}
# --- Main ---
instance_id="$(resolve_instance_id "$TARGET_INPUT")"
ssm_args=("")
[[ -n "$RUN_AS" ]] && ssm_args+=(--parameters '{"runAs":["'${RUN_AS}'"]}')
echo "Starting SSM session to ${instance_id} ..."
# Optional: you can pass --document-name and --parameters for port forwarding, etc.
"${aws_cli[@]}" ssm start-session --target "$instance_id" "${ssm_args[@]}"
#!/usr/bin/env bash
set -eEo pipefail
while getopts 'a:p:b:' opt; do
case "$opt" in
a )
AWS_ACCOUNT="${OPTARG}";
;;
p )
AWS_PROFILE="${OPTARG}";
;;
--) shift;
break
;;
esac
done
shift "$(($OPTIND -1))"
_jq64() { echo "$1" | base64 -D | jq -cr "$2"; }
_jq() { echo "$1" | jq -cr "$2"; }
echo "AccountId,UserName,CreateDate,ConsoleAccess,PasswordLastUsed,MFADevices,AccessKey1,AccessKey2"
export AWS_REGION="us-east-1"
if [[ -n "$AWS_PROFILE" ]];
then
PROFILES="${AWS_PROFILE}"
elif [[ -n "$AWS_ACCOUNT" ]];
then
PROFILES="${AWS_ACCOUNT}:AWSReadOnlyAccess"
else
PROFILES=$(aws-sso list | grep -F AWSReadOnlyAccess | awk '{ print $7 }' | tail -n +2 | sort)
fi
for PROFILE in $PROFILES;
do
export AWS_PROFILE="$PROFILE"
echo "=> Checking account '${PROFILE}'" >/dev/stderr
for U in $(aws iam list-users | jq -r '.Users[] | @base64');
do
USERNAME=$(_jq64 "$U" ".UserName")
CREATEDATE=$(_jq64 "$U" ".CreateDate")
PASSWORDLASTUSED=$(_jq64 "$U" ".PasswordLastUsed")
CONSOLE=$(aws iam get-login-profile --user-name "$USERNAME" > /dev/null 2>&1 && echo "Yes" || echo "No")
MFA_DEVICES=$(aws iam list-mfa-devices --user-name "$USERNAME" | jq -r '.MFADevices | length')
KEYS_STR=""
for KEY in $(aws iam list-access-keys --user-name "$USERNAME" | jq -r '.AccessKeyMetadata[] | @base64');
do
ACCESSKEY_ID=$(_jq64 "$KEY" ".AccessKeyId")
KEY_STATUS=$(_jq64 "$KEY" ".Status")
KEYS_STR="${KEYS_STR}$(aws iam get-access-key-last-used --access-key-id "$ACCESSKEY_ID" | jq --arg "KID" "$ACCESSKEY_ID" --arg "KSTS" "$KEY_STATUS" -cr '.AccessKeyLastUsed | [$KID, $KSTS, .LastUsedDate, .ServiceName] | @sh'),"
done
echo "$(echo "$PROFILE" | cut -d':' -f1 ),${USERNAME},${CREATEDATE},${CONSOLE},${PASSWORDLASTUSED},${MFA_DEVICES},${KEYS_STR}"
done
done;
# /// script
# dependencies = [
# "boto3",
# ]
# ///
from __future__ import annotations
import argparse
import json
import re
import boto3
def get_load_balancers_with_tag(client, tag_filters: dict[str, list[str] | None], **kwargs):
lbs = []
paginator = client.get_paginator("describe_load_balancers")
describe_params = {}
if kwargs.get("name"):
describe_params["Names"] = [kwargs["name"]]
for page in paginator.paginate(**describe_params):
lbs.extend(page["LoadBalancers"])
target_keys_norm = [k.lower() for k in tag_filters.keys()]
if not kwargs and not tag_filters:
return lbs
name_pattern = None
if kwargs.get("name_pattern"):
name_pattern = re.compile(kwargs["name_pattern"], re.IGNORECASE)
filtered = []
for lb in lbs:
arn = lb["LoadBalancerArn"]
if kwargs.get("ip_address_type") and kwargs["ip_address_type"] != lb["IpAddressType"]:
continue
if kwargs.get("type") and kwargs["type"] != lb["Type"]:
continue
if kwargs.get("scheme") and kwargs["scheme"] != lb["Scheme"]:
continue
if name_pattern and not name_pattern.search(lb["LoadBalancerName"]):
continue
if tag_filters:
tag_desc = client.describe_tags(ResourceArns=[arn])
tags = tag_desc["TagDescriptions"][0].get("Tags", [])
for tag in tags:
if tag["Key"].lower() in target_keys_norm:
if tag["Key"].lower() in target_keys_norm and any(v.lower() == tag["Value"].lower() for v in tag_filters[tag["Key"].lower()]):
filtered.append(lb)
break
else:
filtered.append(lb)
return filtered
def find_lbs_by_instance_or_ip(client, instance_id: str | None, ip: str | None):
matches = set()
tg_paginator = client.get_paginator("describe_target_groups")
tgs = []
for page in tg_paginator.paginate():
tgs.extend(page["TargetGroups"])
for tg in tgs:
arn = tg["TargetGroupArn"]
th = client.describe_target_health(TargetGroupArn=arn)
for desc in th.get("TargetHealthDescriptions", []):
target = desc["Target"]
if (instance_id and target.get("Id") == instance_id) or (ip and target.get("Id") == ip):
lb_arns = tg.get("LoadBalancerArns", [])
for lb_arn in lb_arns:
matches.add(lb_arn)
if not matches:
return []
resp = client.describe_load_balancers(LoadBalancerArns=list(matches))
return resp["LoadBalancers"]
def get_listener_rules(client, listener_arn: str):
resp = client.describe_rules(ListenerArn=listener_arn)
return resp.get("Rules", [])
def get_listeners(client, lb_arn: str):
listeners = []
resp = client.describe_listeners(LoadBalancerArn=lb_arn)
for listener in resp.get("Listeners", []):
listener["Rules"] = get_listener_rules(client, listener["ListenerArn"])
listener.pop("DefaultActions", None)
listeners.append(listener)
return listeners
def get_security_groups(client, sg_ids: list[str]):
resp = client.describe_security_groups(GroupIds=sg_ids)
return resp.get("SecurityGroups", [])
def parse_tags_filters(tags_arg: list[str] | None) -> dict[str, list[str] | None]:
filters = {}
if not tags_arg:
return {}
for tag_arg in tags_arg:
if "=" in tag_arg:
k, v = tag_arg.split("=", 1)
filters.setdefault(k.strip(), [])
filters[k.strip()].append(v.strip())
else:
filters[tag_arg.strip()] = None
return filters
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Find ALBs/NLBs by tag, name, or instance/IP association."
)
parser.add_argument("--profile", required=True, help="AWS profile")
parser.add_argument("--region", required=True, help="AWS region")
parser.add_argument("--name", required=False, help="Name filter")
parser.add_argument("--name-pattern", required=False, help="Name regex filter")
parser.add_argument("--scheme", required=False, choices=["internal", "internet-facing"], help="Scheme filter")
parser.add_argument("--type", required=False, choices=["application", "network"], help="Type filter")
parser.add_argument("--ip-address-type", required=False, choices=["ipv4", "dualstack"], help="IpAddressType filter")
parser.add_argument("--tag", nargs='+', required=False, help="Tag filter: KEY or KEY=VALUE")
parser.add_argument("--target-instance-id", required=False, help="EC2 instance ID to search")
parser.add_argument("--target-ip", required=False, help="Target IP address to search")
args = parser.parse_args()
session = boto3.Session(profile_name=args.profile, region_name=args.region)
elb_client = session.client("elbv2")
ec2_client = session.client("ec2")
tag_filters = parse_tags_filters(args.tag)
if args.target_instance_id or args.target_ip:
matches = find_lbs_by_instance_or_ip(elb_client, args.target_instance_id, args.target_ip)
else:
matches = get_load_balancers_with_tag(
elb_client, tag_filters,
name=args.name,
scheme=args.scheme,
type=args.type,
name_pattern=args.name_pattern,
ip_address_type=args.ip_address_type,
)
for lb in matches:
lb["Listeners"] = get_listeners(elb_client, lb["LoadBalancerArn"])
lb["SecurityGroups"] = get_security_groups(ec2_client, lb["SecurityGroups"])
print(json.dumps(matches, indent=2, default=str))
#!/usr/bin/env bash
set -eEo pipefail
while getopts 'a:p:b:' opt; do
case "$opt" in
a )
AWS_ACCOUNT="${OPTARG}";
;;
p )
AWS_PROFILE="${OPTARG}";
;;
b )
BUCKET_NAME="${OPTARG}";
;;
--) shift;
break
;;
esac
done
shift "$(($OPTIND -1))"
if [[ -n "$AWS_PROFILE" ]];
then
PROFILES="${AWS_PROFILE}"
elif [[ -n "$AWS_ACCOUNT" ]];
then
PROFILES="${AWS_ACCOUNT}:AWSReadOnlyAccess"
else
PROFILES=$(aws-sso list | grep -F AWSReadOnlyAccess | awk '{ print $7 }' | tail -n +2 | sort)
fi
for PROFILE in $PROFILES;
do
export AWS_PROFILE="$PROFILE"
AWS_ACCOUNT="$(echo "$PROFILE" | cut -d':' -f1 )"
echo "=> Checking account '${AWS_ACCOUNT}'" >/dev/stderr
aws s3api list-buckets | jq -r --arg "BNP" "$BUCKET_NAME" --arg "AID" "$AWS_ACCOUNT" '.Buckets[] | select(($BNP == "") or (.Name | contains($BNP))) | [$AID, .Name] | @csv'
done;
# /// script
# dependencies = [
# "boto3",
# ]
# ///
from __future__ import annotations
import argparse
import json
import boto3
from botocore.exceptions import ClientError
def enrich_ip_sets(client, details, scope):
"""Look for IPSetReferenceStatements in WebACL rules and fetch their contents."""
if "Rules" not in details:
return
for rule in details["Rules"]:
stmt = rule.get("Statement", {})
resolve_ip_sets_in_statement(client, stmt, scope)
def resolve_ip_sets_in_statement(client, stmt, scope):
"""Recursively resolve statements to find IPSetReferenceStatement entries."""
if "IPSetReferenceStatement" in stmt:
ref = stmt["IPSetReferenceStatement"]
arn = ref.get("ARN")
try:
ipset = client.get_ip_set(
Scope=scope,
Id=arn.split("/")[-1],
Name=arn.split("/")[-2],
)
# attach addresses to the statement
ref["Addresses"] = sorted(ipset["IPSet"].get("Addresses", []))
except ClientError as e:
ref["Error"] = f"Error fetching IPSet: {e}"
if "RegexPatternSetReferenceStatement" in stmt:
ref = stmt["RegexPatternSetReferenceStatement"]
arn = ref.get("ARN")
try:
regexset = client.get_regex_pattern_set(
Scope=scope,
Id=arn.split("/")[-1],
Name=arn.split("/")[-2],
)
ref["RegexList"] = regexset["RegexPatternSet"].get("RegularExpressionList", [])
except ClientError as e:
ref["Error"] = f"Error fetching RegexPatternSet: {e}"
# Handle compound statements (OR, AND, NOT, etc.)
for key in ("AndStatement", "OrStatement"):
if key in stmt:
for s in stmt[key]["Statements"]:
resolve_ip_sets_in_statement(client, s, scope)
if "NotStatement" in stmt:
resolve_ip_sets_in_statement(client, stmt["NotStatement"]["Statement"])
def list_web_acls(client, scope, web_acl_names):
try:
yield from [
acl for acl in client.list_web_acls(Scope=scope).get("WebACLs", []) if (web_acl_names and acl["Name"] in web_acl_names) or (not web_acl_names)
]
except ClientError as e:
print(f"Error listing WebACLs: {e}")
return []
def get_web_acl_details(client, acl, scope):
details = {}
# WebACL core definition
try:
resp = client.get_web_acl(
Scope=scope,
Name=acl["Name"],
Id=acl["Id"],
)
details = resp.get("WebACL", {})
enrich_ip_sets(client, details, scope)
except ClientError as e:
details["Error"] = f"Error: {e}"
# Logging configuration
try:
resp = client.get_logging_configuration(ResourceArn=acl["ARN"])
details["LoggingConfiguration"] = resp.get("LoggingConfiguration")
except ClientError as e:
if e.response["Error"]["Code"] == "WAFNonexistentItemException":
details["LoggingConfiguration"] = "None"
else:
details["LoggingConfiguration"] = f"Error: {e}"
return details
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Summarize AWS WAF WebACLs (config + logging)."
)
parser.add_argument("--profile", required=True, help="AWS profile")
parser.add_argument("--region", required=True, help="AWS region")
parser.add_argument("--scope", default="REGIONAL", choices=["REGIONAL", "CLOUDFRONT"], help="WAF scope (REGIONAL or CLOUDFRONT)")
parser.add_argument("--web-acl", nargs='+', required=False, help="Name(s) of the WebACL")
args = parser.parse_args()
session = boto3.Session(profile_name=args.profile, region_name=args.region)
waf_client = session.client("wafv2")
if args.scope == "CLOUDFRONT":
region = "us-east-1"
else:
region = args.region
output = {}
for acl in list_web_acls(waf_client, args.scope, args.web_acl):
details = get_web_acl_details(waf_client, acl, args.scope)
output[acl["Name"]] = details
print(json.dumps(output, indent=2, default=str))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment