Created
November 18, 2025 01:30
-
-
Save williamzujkowski/4df107e30a0d2e5f5156df1d9203ca13 to your computer and use it in GitHub Desktop.
SIEM Python Automation - Wazuh and Graylog API Integration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| SIEM Automation Scripts - Wazuh and Graylog Integration | |
| """ | |
| import requests | |
| from requests.auth import HTTPBasicAuth | |
| import json | |
| class WazuhAPI: | |
| """Wazuh API client for alert automation.""" | |
| def __init__(self, base_url, username, password): | |
| self.base_url = base_url | |
| self.auth = HTTPBasicAuth(username, password) | |
| self.verify_ssl = False # Homelab self-signed cert | |
| def get_alerts(self, severity="high", limit=100): | |
| """Query recent alerts by severity.""" | |
| endpoint = f"{self.base_url}/security/alerts" | |
| params = { | |
| "limit": limit, | |
| "severity": severity, | |
| "sort": "-timestamp" | |
| } | |
| response = requests.get( | |
| endpoint, | |
| auth=self.auth, | |
| params=params, | |
| verify=self.verify_ssl | |
| ) | |
| response.raise_for_status() | |
| return response.json()["data"] | |
| def block_ip(self, ip_address, duration=3600): | |
| """Add IP to active response block list.""" | |
| endpoint = f"{self.base_url}/active-response" | |
| data = { | |
| "command": "firewall-drop", | |
| "custom": True, | |
| "alert": { | |
| "data": { | |
| "srcip": ip_address | |
| } | |
| } | |
| } | |
| response = requests.put( | |
| endpoint, | |
| auth=self.auth, | |
| json=data, | |
| verify=self.verify_ssl | |
| ) | |
| return response.status_code == 200 | |
| class GraylogAPI: | |
| """Graylog API client for log analysis.""" | |
| def __init__(self, base_url, token): | |
| self.base_url = base_url | |
| self.headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Accept": "application/json" | |
| } | |
| def search_logs(self, query, time_range=3600): | |
| """Search logs with Graylog query syntax.""" | |
| endpoint = f"{self.base_url}/search/universal/relative" | |
| params = { | |
| "query": query, | |
| "range": time_range, | |
| "limit": 100 | |
| } | |
| response = requests.get( | |
| endpoint, | |
| headers=self.headers, | |
| params=params | |
| ) | |
| response.raise_for_status() | |
| return response.json()["messages"] | |
| def create_stream(self, title, description, rules): | |
| """Create new log stream with rules.""" | |
| endpoint = f"{self.base_url}/streams" | |
| data = { | |
| "title": title, | |
| "description": description, | |
| "rules": rules, | |
| "matching_type": "AND" | |
| } | |
| response = requests.post( | |
| endpoint, | |
| headers=self.headers, | |
| json=data | |
| ) | |
| return response.json()["stream_id"] | |
| def create_alert(self, stream_id, condition_type, threshold): | |
| """Create alert condition for stream.""" | |
| endpoint = f"{self.base_url}/streams/{stream_id}/alerts/conditions" | |
| data = { | |
| "type": condition_type, | |
| "parameters": { | |
| "threshold": threshold, | |
| "threshold_type": "more", | |
| "time": 5 | |
| } | |
| } | |
| response = requests.post( | |
| endpoint, | |
| headers=self.headers, | |
| json=data | |
| ) | |
| return response.status_code == 201 | |
| def setup_ssh_brute_force_detection(): | |
| """Example: Setup SSH brute force detection in Graylog.""" | |
| graylog = GraylogAPI( | |
| base_url="http://graylog-server:9000/api", | |
| token="your_api_token_here" | |
| ) | |
| # Create stream for failed SSH logins | |
| rules = [ | |
| { | |
| "field": "program", | |
| "value": "sshd", | |
| "type": 1, # EXACT match | |
| "inverted": False | |
| }, | |
| { | |
| "field": "message", | |
| "value": "Failed password", | |
| "type": 1, | |
| "inverted": False | |
| } | |
| ] | |
| stream_id = graylog.create_stream( | |
| title="Failed SSH Logins", | |
| description="Tracks failed SSH authentication attempts", | |
| rules=rules | |
| ) | |
| # Create alert for >10 failures in 2 minutes | |
| graylog.create_alert( | |
| stream_id=stream_id, | |
| condition_type="message_count", | |
| threshold=10 | |
| ) | |
| print(f"SSH brute force detection configured on stream {stream_id}") | |
| def auto_remediate_brute_force(): | |
| """Example: Automated IP blocking for brute force attacks.""" | |
| wazuh = WazuhAPI( | |
| base_url="https://wazuh-manager:55000", | |
| username="wazuh-admin", | |
| password="password" | |
| ) | |
| # Get recent brute force alerts | |
| alerts = wazuh.get_alerts(severity="high", limit=50) | |
| blocked_ips = [] | |
| for alert in alerts: | |
| # Check if alert is SSH brute force | |
| if "brute" in alert["rule"]["description"].lower(): | |
| src_ip = alert.get("data", {}).get("srcip") | |
| if src_ip and src_ip not in blocked_ips: | |
| # Block IP via active response | |
| if wazuh.block_ip(src_ip, duration=7200): # 2 hours | |
| print(f"Blocked {src_ip} for SSH brute force") | |
| blocked_ips.append(src_ip) | |
| if __name__ == "__main__": | |
| # Setup automated SSH brute force detection | |
| setup_ssh_brute_force_detection() | |
| # Run auto-remediation | |
| auto_remediate_brute_force() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment