Skip to content

Instantly share code, notes, and snippets.

@RajChowdhury240
Created November 25, 2025 13:52
Show Gist options
  • Select an option

  • Save RajChowdhury240/99fd7bd26e0ff129b7b1687c7b024761 to your computer and use it in GitHub Desktop.

Select an option

Save RajChowdhury240/99fd7bd26e0ff129b7b1687c7b024761 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Pre-commit hook to scan CloudFormation templates for insecure IAM configurations.
Checks for specific IAM actions with Resource "*" without any conditions.
"""
import json
import yaml
import sys
import os
from pathlib import Path
# Define risky IAM actions to check for
# Add or remove actions as needed for your security requirements
RISKY_ACTIONS = [
"iam:CreateRole",
"iam:DeleteRole",
"s3:CreateBucket",
"s3:DeleteBucket",
"s3:PutBucketPolicy",
"iam:PutRolePolicy",
"iam:AttachRolePolicy",
"iam:CreateUser",
"iam:DeleteUser",
"iam:PutUserPolicy",
"ec2:RunInstances",
"lambda:CreateFunction",
"lambda:UpdateFunctionCode",
]
def load_template(file_path):
"""Load CloudFormation template from JSON or YAML file."""
try:
with open(file_path, 'r') as f:
content = f.read()
# Try JSON first
try:
return json.loads(content)
except json.JSONDecodeError:
# Try YAML
try:
return yaml.safe_load(content)
except yaml.YAMLError:
return None
except Exception as e:
print(f"Error reading {file_path}: {e}")
return None
def check_policy_statement(statement, resource_name):
"""Check if a policy statement has risky actions with wildcard resource and no conditions."""
issues = []
# Ensure statement is a dict
if not isinstance(statement, dict):
return issues
# Get actions (handle both Action and NotAction)
actions = statement.get('Action', [])
if isinstance(actions, str):
actions = [actions]
# Get resources
resources = statement.get('Resource', [])
if isinstance(resources, str):
resources = [resources]
# Get conditions
conditions = statement.get('Condition', {})
# Check if Resource contains "*"
has_wildcard_resource = '*' in resources or any(r == '*' for r in resources)
if has_wildcard_resource and not conditions:
# Check for risky actions (exact match only)
for action in actions:
# Normalize action to lowercase for case-insensitive comparison
action_lower = action.lower()
# Check if this specific action is in our risky actions list
if any(action_lower == risky.lower() for risky in RISKY_ACTIONS):
issues.append({
'resource': resource_name,
'action': action,
'issue': f"Action '{action}' with Resource '*' and no Condition"
})
# Also catch wildcard actions like "*" or "s3:*"
elif action == '*' or (action.endswith('*') and any(action_lower.startswith(risky.split(':')[0].lower() + ':') for risky in RISKY_ACTIONS)):
issues.append({
'resource': resource_name,
'action': action,
'issue': f"Action '{action}' with Resource '*' and no Condition"
})
return issues
def scan_template(template, file_path):
"""Scan CloudFormation template for insecure IAM configurations."""
issues = []
if not template or 'Resources' not in template:
return issues
resources = template['Resources']
for resource_name, resource_data in resources.items():
if not isinstance(resource_data, dict):
continue
resource_type = resource_data.get('Type', '')
properties = resource_data.get('Properties', {})
# Check IAM Role policies
if resource_type == 'AWS::IAM::Role':
# Check inline policies
policies = properties.get('Policies', [])
for policy in policies:
if isinstance(policy, dict):
policy_doc = policy.get('PolicyDocument', {})
statements = policy_doc.get('Statement', [])
if isinstance(statements, list):
for stmt in statements:
issues.extend(check_policy_statement(stmt, f"{resource_name} (Role)"))
# Check IAM Policy
elif resource_type == 'AWS::IAM::Policy':
policy_doc = properties.get('PolicyDocument', {})
statements = policy_doc.get('Statement', [])
if isinstance(statements, list):
for stmt in statements:
issues.extend(check_policy_statement(stmt, f"{resource_name} (Policy)"))
# Check IAM ManagedPolicy
elif resource_type == 'AWS::IAM::ManagedPolicy':
policy_doc = properties.get('PolicyDocument', {})
statements = policy_doc.get('Statement', [])
if isinstance(statements, list):
for stmt in statements:
issues.extend(check_policy_statement(stmt, f"{resource_name} (ManagedPolicy)"))
return issues
def get_cloudformation_files():
"""Get all CloudFormation template files from git staged files."""
try:
import subprocess
result = subprocess.run(
['git', 'diff', '--cached', '--name-only', '--diff-filter=ACM'],
capture_output=True,
text=True
)
if result.returncode != 0:
return []
files = result.stdout.strip().split('\n')
# Filter for CloudFormation files (common patterns)
cfn_extensions = ['.json', '.yaml', '.yml', '.template']
cfn_patterns = ['cloudformation', 'cfn', 'template', 'stack']
cfn_files = []
for f in files:
if not f:
continue
f_lower = f.lower()
# Check extension and common naming patterns
if any(f_lower.endswith(ext) for ext in cfn_extensions):
if any(pattern in f_lower for pattern in cfn_patterns) or \
f_lower.endswith('.template'):
cfn_files.append(f)
return cfn_files
except Exception as e:
print(f"Error getting git files: {e}")
return []
def print_issues(file_issues):
"""Print all issues found in a formatted way."""
print("\n" + "="*80)
print("⚠️ IAM SECURITY ISSUES DETECTED ⚠️")
print("="*80 + "\n")
total_issues = 0
for file_path, issues in file_issues.items():
if issues:
print(f"📄 File: {file_path}")
print("-" * 80)
for idx, issue in enumerate(issues, 1):
print(f" {idx}. Resource: {issue['resource']}")
print(f" Issue: {issue['issue']}")
print()
total_issues += len(issues)
print("="*80)
print(f"Total issues found: {total_issues}")
print("="*80 + "\n")
return total_issues
def get_user_confirmation(issue_count):
"""Ask user if they want to continue with the commit."""
print(f"⚠️ {issue_count} IAM security issue(s) found.")
print("These issues may expose your AWS resources to security risks.")
print("\nPlease fix them before committing.")
try:
response = input("\nDo you still want to continue? (yes/no) [no]: ").strip().lower()
if response == 'yes':
print("\n⚠️ Proceeding with commit despite security issues...")
return True
else:
print("\n✅ Commit aborted. Please fix the issues and try again.")
return False
except (KeyboardInterrupt, EOFError):
print("\n✅ Commit aborted.")
return False
def main():
"""Main function to run the pre-commit hook."""
cfn_files = get_cloudformation_files()
if not cfn_files:
# No CloudFormation files to check
sys.exit(0)
file_issues = {}
for file_path in cfn_files:
if not os.path.exists(file_path):
continue
template = load_template(file_path)
if template:
issues = scan_template(template, file_path)
if issues:
file_issues[file_path] = issues
if file_issues:
total_issues = print_issues(file_issues)
if get_user_confirmation(total_issues):
sys.exit(0) # Allow commit
else:
sys.exit(1) # Block commit
else:
# No issues found
sys.exit(0)
if __name__ == '__main__':
main()
@RajChowdhury240
Copy link
Author

# Pre-commit configuration for CloudFormation IAM security scanning
repos:
  - repo: local
    hooks:
      - id: cfn-iam-security-check
        name: CloudFormation IAM Security Check
        entry: python .pre-commit-hooks/cfn-iam-security-check.py
        language: system
        files: \.(json|yaml|yml|template)$
        pass_filenames: false
        always_run: false
        verbose: true
        ```

@RajChowdhury240
Copy link
Author

Statement:
  - Effect: Allow
    Action:
      - iam:CreateRole
      - s3:CreateBucket
    Resource: "*"
    # No Condition block = WARNING
    ```
```yaml
Statement:
  - Effect: Allow
    Action:
      - iam:CreateRole
    Resource: "*"
    Condition:
      StringEquals:
        aws:RequestedRegion: us-east-1
    # Has Condition block = SAFE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment