Created
November 18, 2025 01:44
-
-
Save williamzujkowski/81c7b4914517758e7a7fdc0c61aeb699 to your computer and use it in GitHub Desktop.
RAG-Enhanced LLM Triage - Historical Context with ChromaDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| RAG-Enhanced LLM Alert Triage | |
| Uses ChromaDB vector database for historical context | |
| """ | |
| import ollama | |
| import chromadb | |
| from chromadb.utils import embedding_functions | |
| import json | |
| # Initialize vector database | |
| client = chromadb.Client() | |
| embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction() | |
| collection = client.create_collection( | |
| name="alert_history", | |
| embedding_function=embedding_fn | |
| ) | |
| def add_historical_alerts(): | |
| """Populate vector DB with historical alert data.""" | |
| historical_alerts = [ | |
| { | |
| "id": "alert_001", | |
| "log": "SSH login from 203.0.113.5", | |
| "severity": "Low", | |
| "context": "VPN endpoint, routine connection" | |
| }, | |
| { | |
| "id": "alert_002", | |
| "log": "Port scan from 198.51.100.42", | |
| "severity": "High", | |
| "context": "Known scanner, block recommended" | |
| } | |
| # ... more historical alerts | |
| ] | |
| for alert in historical_alerts: | |
| collection.add( | |
| documents=[alert['log']], | |
| metadatas=[{ | |
| 'severity': alert['severity'], | |
| 'context': alert['context'] | |
| }], | |
| ids=[alert['id']] | |
| ) | |
| def get_similar_alerts(new_alert_log, limit=5): | |
| """Find historically similar alerts using vector similarity.""" | |
| results = collection.query( | |
| query_texts=[new_alert_log], | |
| n_results=limit | |
| ) | |
| return results['documents'][0], results['metadatas'][0] | |
| def create_rag_prompt(alert): | |
| """Generate prompt with RAG context.""" | |
| # Find similar historical alerts | |
| similar_logs, similar_metadata = get_similar_alerts(alert['full_log']) | |
| # Build context string | |
| context_lines = [] | |
| for log, meta in zip(similar_logs, similar_metadata): | |
| context_lines.append(f"- {log} (severity: {meta['severity']}, context: {meta['context']})") | |
| context = "\n".join(context_lines) | |
| prompt = f"""Classify this alert. Consider similar historical alerts for context: | |
| **Current Alert:** | |
| {alert['full_log']} | |
| **Similar Historical Alerts:** | |
| {context} | |
| **Task:** | |
| Classify severity: Critical, High, Medium, or Low | |
| Explain reasoning considering historical context | |
| **JSON Response:** | |
| {{ | |
| "severity": "...", | |
| "reasoning": "...", | |
| "next_step": "..." | |
| }} | |
| """ | |
| return prompt | |
| def classify_with_rag(alert): | |
| """Classify alert using RAG-enhanced LLM.""" | |
| prompt = create_rag_prompt(alert) | |
| response = ollama.chat( | |
| model='llama3.1:8b', | |
| messages=[{'role': 'user', 'content': prompt}], | |
| options={'temperature': 0.1} | |
| ) | |
| return json.loads(response['message']['content']) | |
| if __name__ == "__main__": | |
| # Setup historical context | |
| add_historical_alerts() | |
| # Example: Classify new alert with RAG context | |
| new_alert = { | |
| "full_log": "SSH login from 203.0.113.5", | |
| "rule": {"id": "5710", "description": "SSH authentication success"} | |
| } | |
| classification = classify_with_rag(new_alert) | |
| print(json.dumps(classification, indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment