Created
November 18, 2025 01:44
-
-
Save williamzujkowski/c501bf21bf504d84fa21edb3688da814 to your computer and use it in GitHub Desktop.
LLM Alert Triage - Automated Security Alert Classification with Ollama
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| LLM-Powered Security Alert Triage | |
| Automated alert classification using local Ollama LLM | |
| """ | |
| import ollama | |
| import requests | |
| import json | |
| from datetime import datetime, timedelta | |
| WAZUH_API = "https://wazuh-manager:55000" | |
| WAZUH_TOKEN = "your_wazuh_token" | |
| OLLAMA_MODEL = "llama3.1:8b" | |
| def get_wazuh_alerts(hours=1): | |
| """Fetch recent alerts from Wazuh SIEM.""" | |
| headers = {"Authorization": f"Bearer {WAZUH_TOKEN}"} | |
| params = { | |
| "limit": 100, | |
| "time_range": f"{hours}h", | |
| "sort": "-timestamp" | |
| } | |
| response = requests.get( | |
| f"{WAZUH_API}/security/alerts", | |
| headers=headers, | |
| params=params, | |
| verify=False | |
| ) | |
| return response.json().get("data", {}).get("alerts", []) | |
| def create_triage_prompt(alert): | |
| """Generate LLM prompt from alert JSON.""" | |
| prompt = f"""You are a security analyst triaging alerts. Classify this alert: | |
| **Alert Details:** | |
| - Rule ID: {alert['rule']['id']} | |
| - Rule Description: {alert['rule']['description']} | |
| - Severity Level: {alert['rule']['level']} | |
| - Source IP: {alert.get('data', {}).get('srcip', 'N/A')} | |
| - Destination IP: {alert.get('data', {}).get('dstip', 'N/A')} | |
| - Timestamp: {alert['timestamp']} | |
| - Agent: {alert.get('agent', {}).get('name', 'N/A')} | |
| - Full Log: {alert.get('full_log', '')[:500]} | |
| **Your task:** | |
| 1. Classify severity: Critical, High, Medium, or Low | |
| 2. Explain reasoning (2-3 sentences) | |
| 3. Suggest next investigation step | |
| 4. Indicate if false positive likely (yes/no) | |
| **Format response as JSON:** | |
| {{ | |
| "severity": "High", | |
| "reasoning": "SSH brute force from external IP with 50 failed attempts", | |
| "next_step": "Block source IP, review SSH logs for successful logins", | |
| "false_positive": "no" | |
| }} | |
| Respond ONLY with valid JSON, no other text. | |
| """ | |
| return prompt | |
| def classify_alert(prompt): | |
| """Send prompt to local Ollama LLM.""" | |
| response = ollama.chat( | |
| model=OLLAMA_MODEL, | |
| messages=[{ | |
| 'role': 'system', | |
| 'content': 'You are a security analyst. Always respond with valid JSON.' | |
| }, { | |
| 'role': 'user', | |
| 'content': prompt | |
| }], | |
| options={ | |
| 'temperature': 0.1, # Low temperature for consistent classification | |
| 'top_p': 0.9 | |
| } | |
| ) | |
| return response['message']['content'] | |
| def send_slack_notification(alert, classification): | |
| """Send high-severity alerts to Slack.""" | |
| webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL" | |
| message = { | |
| "text": f"🚨 {classification['severity']}: {alert['rule']['description']}", | |
| "blocks": [ | |
| { | |
| "type": "section", | |
| "text": { | |
| "type": "mrkdwn", | |
| "text": f"*Severity:* {classification['severity']}\n*Reasoning:* {classification['reasoning']}\n*Next Step:* {classification['next_step']}" | |
| } | |
| } | |
| ] | |
| } | |
| requests.post(webhook_url, json=message) | |
| def log_to_database(alert, classification): | |
| """Log alert and classification to SQLite database.""" | |
| import sqlite3 | |
| conn = sqlite3.connect('/var/lib/alert-triage/alerts.db') | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO alerts (timestamp, rule_id, rule_description, severity, | |
| llm_classification, reasoning, false_positive) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| alert['timestamp'], | |
| alert['rule']['id'], | |
| alert['rule']['description'], | |
| alert['rule']['level'], | |
| classification['severity'], | |
| classification['reasoning'], | |
| classification['false_positive'] | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def process_alerts(): | |
| """Main triage loop.""" | |
| alerts = get_wazuh_alerts(hours=1) | |
| print(f"Processing {len(alerts)} alerts...") | |
| for alert in alerts: | |
| # Generate prompt | |
| prompt = create_triage_prompt(alert) | |
| # Get LLM classification | |
| llm_response = classify_alert(prompt) | |
| try: | |
| classification = json.loads(llm_response) | |
| except json.JSONDecodeError: | |
| print(f"Failed to parse LLM response for alert {alert['rule']['id']}") | |
| continue | |
| # Log to database | |
| log_to_database(alert, classification) | |
| # High-severity alerts trigger notifications | |
| if classification['severity'] in ['Critical', 'High']: | |
| if classification['false_positive'] == 'no': | |
| send_slack_notification(alert, classification) | |
| print(f"[{classification['severity']}] {alert['rule']['description']}") | |
| print(f"Triage complete. {len(alerts)} alerts processed.") | |
| if __name__ == "__main__": | |
| # Setup database | |
| import sqlite3 | |
| conn = sqlite3.connect('/var/lib/alert-triage/alerts.db') | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS alerts ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT, | |
| rule_id TEXT, | |
| rule_description TEXT, | |
| severity INTEGER, | |
| llm_classification TEXT, | |
| reasoning TEXT, | |
| false_positive TEXT | |
| ) | |
| ''') | |
| conn.close() | |
| # Run triage | |
| process_alerts() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment