Created
November 25, 2025 13:52
-
-
Save RajChowdhury240/99fd7bd26e0ff129b7b1687c7b024761 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Pre-commit hook to scan CloudFormation templates for insecure IAM configurations. | |
| Checks for specific IAM actions with Resource "*" without any conditions. | |
| """ | |
| import json | |
| import yaml | |
| import sys | |
| import os | |
| from pathlib import Path | |
| # Define risky IAM actions to check for | |
| # Add or remove actions as needed for your security requirements | |
| RISKY_ACTIONS = [ | |
| "iam:CreateRole", | |
| "iam:DeleteRole", | |
| "s3:CreateBucket", | |
| "s3:DeleteBucket", | |
| "s3:PutBucketPolicy", | |
| "iam:PutRolePolicy", | |
| "iam:AttachRolePolicy", | |
| "iam:CreateUser", | |
| "iam:DeleteUser", | |
| "iam:PutUserPolicy", | |
| "ec2:RunInstances", | |
| "lambda:CreateFunction", | |
| "lambda:UpdateFunctionCode", | |
| ] | |
| def load_template(file_path): | |
| """Load CloudFormation template from JSON or YAML file.""" | |
| try: | |
| with open(file_path, 'r') as f: | |
| content = f.read() | |
| # Try JSON first | |
| try: | |
| return json.loads(content) | |
| except json.JSONDecodeError: | |
| # Try YAML | |
| try: | |
| return yaml.safe_load(content) | |
| except yaml.YAMLError: | |
| return None | |
| except Exception as e: | |
| print(f"Error reading {file_path}: {e}") | |
| return None | |
| def check_policy_statement(statement, resource_name): | |
| """Check if a policy statement has risky actions with wildcard resource and no conditions.""" | |
| issues = [] | |
| # Ensure statement is a dict | |
| if not isinstance(statement, dict): | |
| return issues | |
| # Get actions (handle both Action and NotAction) | |
| actions = statement.get('Action', []) | |
| if isinstance(actions, str): | |
| actions = [actions] | |
| # Get resources | |
| resources = statement.get('Resource', []) | |
| if isinstance(resources, str): | |
| resources = [resources] | |
| # Get conditions | |
| conditions = statement.get('Condition', {}) | |
| # Check if Resource contains "*" | |
| has_wildcard_resource = '*' in resources or any(r == '*' for r in resources) | |
| if has_wildcard_resource and not conditions: | |
| # Check for risky actions (exact match only) | |
| for action in actions: | |
| # Normalize action to lowercase for case-insensitive comparison | |
| action_lower = action.lower() | |
| # Check if this specific action is in our risky actions list | |
| if any(action_lower == risky.lower() for risky in RISKY_ACTIONS): | |
| issues.append({ | |
| 'resource': resource_name, | |
| 'action': action, | |
| 'issue': f"Action '{action}' with Resource '*' and no Condition" | |
| }) | |
| # Also catch wildcard actions like "*" or "s3:*" | |
| elif action == '*' or (action.endswith('*') and any(action_lower.startswith(risky.split(':')[0].lower() + ':') for risky in RISKY_ACTIONS)): | |
| issues.append({ | |
| 'resource': resource_name, | |
| 'action': action, | |
| 'issue': f"Action '{action}' with Resource '*' and no Condition" | |
| }) | |
| return issues | |
| def scan_template(template, file_path): | |
| """Scan CloudFormation template for insecure IAM configurations.""" | |
| issues = [] | |
| if not template or 'Resources' not in template: | |
| return issues | |
| resources = template['Resources'] | |
| for resource_name, resource_data in resources.items(): | |
| if not isinstance(resource_data, dict): | |
| continue | |
| resource_type = resource_data.get('Type', '') | |
| properties = resource_data.get('Properties', {}) | |
| # Check IAM Role policies | |
| if resource_type == 'AWS::IAM::Role': | |
| # Check inline policies | |
| policies = properties.get('Policies', []) | |
| for policy in policies: | |
| if isinstance(policy, dict): | |
| policy_doc = policy.get('PolicyDocument', {}) | |
| statements = policy_doc.get('Statement', []) | |
| if isinstance(statements, list): | |
| for stmt in statements: | |
| issues.extend(check_policy_statement(stmt, f"{resource_name} (Role)")) | |
| # Check IAM Policy | |
| elif resource_type == 'AWS::IAM::Policy': | |
| policy_doc = properties.get('PolicyDocument', {}) | |
| statements = policy_doc.get('Statement', []) | |
| if isinstance(statements, list): | |
| for stmt in statements: | |
| issues.extend(check_policy_statement(stmt, f"{resource_name} (Policy)")) | |
| # Check IAM ManagedPolicy | |
| elif resource_type == 'AWS::IAM::ManagedPolicy': | |
| policy_doc = properties.get('PolicyDocument', {}) | |
| statements = policy_doc.get('Statement', []) | |
| if isinstance(statements, list): | |
| for stmt in statements: | |
| issues.extend(check_policy_statement(stmt, f"{resource_name} (ManagedPolicy)")) | |
| return issues | |
| def get_cloudformation_files(): | |
| """Get all CloudFormation template files from git staged files.""" | |
| try: | |
| import subprocess | |
| result = subprocess.run( | |
| ['git', 'diff', '--cached', '--name-only', '--diff-filter=ACM'], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| return [] | |
| files = result.stdout.strip().split('\n') | |
| # Filter for CloudFormation files (common patterns) | |
| cfn_extensions = ['.json', '.yaml', '.yml', '.template'] | |
| cfn_patterns = ['cloudformation', 'cfn', 'template', 'stack'] | |
| cfn_files = [] | |
| for f in files: | |
| if not f: | |
| continue | |
| f_lower = f.lower() | |
| # Check extension and common naming patterns | |
| if any(f_lower.endswith(ext) for ext in cfn_extensions): | |
| if any(pattern in f_lower for pattern in cfn_patterns) or \ | |
| f_lower.endswith('.template'): | |
| cfn_files.append(f) | |
| return cfn_files | |
| except Exception as e: | |
| print(f"Error getting git files: {e}") | |
| return [] | |
| def print_issues(file_issues): | |
| """Print all issues found in a formatted way.""" | |
| print("\n" + "="*80) | |
| print("⚠️ IAM SECURITY ISSUES DETECTED ⚠️") | |
| print("="*80 + "\n") | |
| total_issues = 0 | |
| for file_path, issues in file_issues.items(): | |
| if issues: | |
| print(f"📄 File: {file_path}") | |
| print("-" * 80) | |
| for idx, issue in enumerate(issues, 1): | |
| print(f" {idx}. Resource: {issue['resource']}") | |
| print(f" Issue: {issue['issue']}") | |
| print() | |
| total_issues += len(issues) | |
| print("="*80) | |
| print(f"Total issues found: {total_issues}") | |
| print("="*80 + "\n") | |
| return total_issues | |
| def get_user_confirmation(issue_count): | |
| """Ask user if they want to continue with the commit.""" | |
| print(f"⚠️ {issue_count} IAM security issue(s) found.") | |
| print("These issues may expose your AWS resources to security risks.") | |
| print("\nPlease fix them before committing.") | |
| try: | |
| response = input("\nDo you still want to continue? (yes/no) [no]: ").strip().lower() | |
| if response == 'yes': | |
| print("\n⚠️ Proceeding with commit despite security issues...") | |
| return True | |
| else: | |
| print("\n✅ Commit aborted. Please fix the issues and try again.") | |
| return False | |
| except (KeyboardInterrupt, EOFError): | |
| print("\n✅ Commit aborted.") | |
| return False | |
| def main(): | |
| """Main function to run the pre-commit hook.""" | |
| cfn_files = get_cloudformation_files() | |
| if not cfn_files: | |
| # No CloudFormation files to check | |
| sys.exit(0) | |
| file_issues = {} | |
| for file_path in cfn_files: | |
| if not os.path.exists(file_path): | |
| continue | |
| template = load_template(file_path) | |
| if template: | |
| issues = scan_template(template, file_path) | |
| if issues: | |
| file_issues[file_path] = issues | |
| if file_issues: | |
| total_issues = print_issues(file_issues) | |
| if get_user_confirmation(total_issues): | |
| sys.exit(0) # Allow commit | |
| else: | |
| sys.exit(1) # Block commit | |
| else: | |
| # No issues found | |
| sys.exit(0) | |
| if __name__ == '__main__': | |
| main() |
Author
RajChowdhury240
commented
Nov 25, 2025
Author
Statement:
- Effect: Allow
Action:
- iam:CreateRole
- s3:CreateBucket
Resource: "*"
# No Condition block = WARNING
```
```yaml
Statement:
- Effect: Allow
Action:
- iam:CreateRole
Resource: "*"
Condition:
StringEquals:
aws:RequestedRegion: us-east-1
# Has Condition block = SAFE
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment