Skip to content

Instantly share code, notes, and snippets.

@williamzujkowski
Created November 18, 2025 01:30
Show Gist options
  • Select an option

  • Save williamzujkowski/4df107e30a0d2e5f5156df1d9203ca13 to your computer and use it in GitHub Desktop.

Select an option

Save williamzujkowski/4df107e30a0d2e5f5156df1d9203ca13 to your computer and use it in GitHub Desktop.
SIEM Python Automation - Wazuh and Graylog API Integration
#!/usr/bin/env python3
"""
SIEM Automation Scripts - Wazuh and Graylog Integration
"""
import requests
from requests.auth import HTTPBasicAuth
import json
class WazuhAPI:
"""Wazuh API client for alert automation."""
def __init__(self, base_url, username, password):
self.base_url = base_url
self.auth = HTTPBasicAuth(username, password)
self.verify_ssl = False # Homelab self-signed cert
def get_alerts(self, severity="high", limit=100):
"""Query recent alerts by severity."""
endpoint = f"{self.base_url}/security/alerts"
params = {
"limit": limit,
"severity": severity,
"sort": "-timestamp"
}
response = requests.get(
endpoint,
auth=self.auth,
params=params,
verify=self.verify_ssl
)
response.raise_for_status()
return response.json()["data"]
def block_ip(self, ip_address, duration=3600):
"""Add IP to active response block list."""
endpoint = f"{self.base_url}/active-response"
data = {
"command": "firewall-drop",
"custom": True,
"alert": {
"data": {
"srcip": ip_address
}
}
}
response = requests.put(
endpoint,
auth=self.auth,
json=data,
verify=self.verify_ssl
)
return response.status_code == 200
class GraylogAPI:
"""Graylog API client for log analysis."""
def __init__(self, base_url, token):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
def search_logs(self, query, time_range=3600):
"""Search logs with Graylog query syntax."""
endpoint = f"{self.base_url}/search/universal/relative"
params = {
"query": query,
"range": time_range,
"limit": 100
}
response = requests.get(
endpoint,
headers=self.headers,
params=params
)
response.raise_for_status()
return response.json()["messages"]
def create_stream(self, title, description, rules):
"""Create new log stream with rules."""
endpoint = f"{self.base_url}/streams"
data = {
"title": title,
"description": description,
"rules": rules,
"matching_type": "AND"
}
response = requests.post(
endpoint,
headers=self.headers,
json=data
)
return response.json()["stream_id"]
def create_alert(self, stream_id, condition_type, threshold):
"""Create alert condition for stream."""
endpoint = f"{self.base_url}/streams/{stream_id}/alerts/conditions"
data = {
"type": condition_type,
"parameters": {
"threshold": threshold,
"threshold_type": "more",
"time": 5
}
}
response = requests.post(
endpoint,
headers=self.headers,
json=data
)
return response.status_code == 201
def setup_ssh_brute_force_detection():
"""Example: Setup SSH brute force detection in Graylog."""
graylog = GraylogAPI(
base_url="http://graylog-server:9000/api",
token="your_api_token_here"
)
# Create stream for failed SSH logins
rules = [
{
"field": "program",
"value": "sshd",
"type": 1, # EXACT match
"inverted": False
},
{
"field": "message",
"value": "Failed password",
"type": 1,
"inverted": False
}
]
stream_id = graylog.create_stream(
title="Failed SSH Logins",
description="Tracks failed SSH authentication attempts",
rules=rules
)
# Create alert for >10 failures in 2 minutes
graylog.create_alert(
stream_id=stream_id,
condition_type="message_count",
threshold=10
)
print(f"SSH brute force detection configured on stream {stream_id}")
def auto_remediate_brute_force():
"""Example: Automated IP blocking for brute force attacks."""
wazuh = WazuhAPI(
base_url="https://wazuh-manager:55000",
username="wazuh-admin",
password="password"
)
# Get recent brute force alerts
alerts = wazuh.get_alerts(severity="high", limit=50)
blocked_ips = []
for alert in alerts:
# Check if alert is SSH brute force
if "brute" in alert["rule"]["description"].lower():
src_ip = alert.get("data", {}).get("srcip")
if src_ip and src_ip not in blocked_ips:
# Block IP via active response
if wazuh.block_ip(src_ip, duration=7200): # 2 hours
print(f"Blocked {src_ip} for SSH brute force")
blocked_ips.append(src_ip)
if __name__ == "__main__":
# Setup automated SSH brute force detection
setup_ssh_brute_force_detection()
# Run auto-remediation
auto_remediate_brute_force()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment