Skip to content

Instantly share code, notes, and snippets.

@williamzujkowski
Created November 18, 2025 01:44
Show Gist options
  • Select an option

  • Save williamzujkowski/c501bf21bf504d84fa21edb3688da814 to your computer and use it in GitHub Desktop.

Select an option

Save williamzujkowski/c501bf21bf504d84fa21edb3688da814 to your computer and use it in GitHub Desktop.
LLM Alert Triage - Automated Security Alert Classification with Ollama
#!/usr/bin/env python3
"""
LLM-Powered Security Alert Triage
Automated alert classification using local Ollama LLM
"""
import ollama
import requests
import json
from datetime import datetime, timedelta
WAZUH_API = "https://wazuh-manager:55000"
WAZUH_TOKEN = "your_wazuh_token"
OLLAMA_MODEL = "llama3.1:8b"
def get_wazuh_alerts(hours=1):
"""Fetch recent alerts from Wazuh SIEM."""
headers = {"Authorization": f"Bearer {WAZUH_TOKEN}"}
params = {
"limit": 100,
"time_range": f"{hours}h",
"sort": "-timestamp"
}
response = requests.get(
f"{WAZUH_API}/security/alerts",
headers=headers,
params=params,
verify=False
)
return response.json().get("data", {}).get("alerts", [])
def create_triage_prompt(alert):
"""Generate LLM prompt from alert JSON."""
prompt = f"""You are a security analyst triaging alerts. Classify this alert:
**Alert Details:**
- Rule ID: {alert['rule']['id']}
- Rule Description: {alert['rule']['description']}
- Severity Level: {alert['rule']['level']}
- Source IP: {alert.get('data', {}).get('srcip', 'N/A')}
- Destination IP: {alert.get('data', {}).get('dstip', 'N/A')}
- Timestamp: {alert['timestamp']}
- Agent: {alert.get('agent', {}).get('name', 'N/A')}
- Full Log: {alert.get('full_log', '')[:500]}
**Your task:**
1. Classify severity: Critical, High, Medium, or Low
2. Explain reasoning (2-3 sentences)
3. Suggest next investigation step
4. Indicate if false positive likely (yes/no)
**Format response as JSON:**
{{
"severity": "High",
"reasoning": "SSH brute force from external IP with 50 failed attempts",
"next_step": "Block source IP, review SSH logs for successful logins",
"false_positive": "no"
}}
Respond ONLY with valid JSON, no other text.
"""
return prompt
def classify_alert(prompt):
"""Send prompt to local Ollama LLM."""
response = ollama.chat(
model=OLLAMA_MODEL,
messages=[{
'role': 'system',
'content': 'You are a security analyst. Always respond with valid JSON.'
}, {
'role': 'user',
'content': prompt
}],
options={
'temperature': 0.1, # Low temperature for consistent classification
'top_p': 0.9
}
)
return response['message']['content']
def send_slack_notification(alert, classification):
"""Send high-severity alerts to Slack."""
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
message = {
"text": f"🚨 {classification['severity']}: {alert['rule']['description']}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Severity:* {classification['severity']}\n*Reasoning:* {classification['reasoning']}\n*Next Step:* {classification['next_step']}"
}
}
]
}
requests.post(webhook_url, json=message)
def log_to_database(alert, classification):
"""Log alert and classification to SQLite database."""
import sqlite3
conn = sqlite3.connect('/var/lib/alert-triage/alerts.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO alerts (timestamp, rule_id, rule_description, severity,
llm_classification, reasoning, false_positive)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
alert['timestamp'],
alert['rule']['id'],
alert['rule']['description'],
alert['rule']['level'],
classification['severity'],
classification['reasoning'],
classification['false_positive']
))
conn.commit()
conn.close()
def process_alerts():
"""Main triage loop."""
alerts = get_wazuh_alerts(hours=1)
print(f"Processing {len(alerts)} alerts...")
for alert in alerts:
# Generate prompt
prompt = create_triage_prompt(alert)
# Get LLM classification
llm_response = classify_alert(prompt)
try:
classification = json.loads(llm_response)
except json.JSONDecodeError:
print(f"Failed to parse LLM response for alert {alert['rule']['id']}")
continue
# Log to database
log_to_database(alert, classification)
# High-severity alerts trigger notifications
if classification['severity'] in ['Critical', 'High']:
if classification['false_positive'] == 'no':
send_slack_notification(alert, classification)
print(f"[{classification['severity']}] {alert['rule']['description']}")
print(f"Triage complete. {len(alerts)} alerts processed.")
if __name__ == "__main__":
# Setup database
import sqlite3
conn = sqlite3.connect('/var/lib/alert-triage/alerts.db')
conn.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
rule_id TEXT,
rule_description TEXT,
severity INTEGER,
llm_classification TEXT,
reasoning TEXT,
false_positive TEXT
)
''')
conn.close()
# Run triage
process_alerts()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment