Skip to content

Instantly share code, notes, and snippets.

@williamzujkowski
Created November 18, 2025 01:44
Show Gist options
  • Select an option

  • Save williamzujkowski/81c7b4914517758e7a7fdc0c61aeb699 to your computer and use it in GitHub Desktop.

Select an option

Save williamzujkowski/81c7b4914517758e7a7fdc0c61aeb699 to your computer and use it in GitHub Desktop.
RAG-Enhanced LLM Triage - Historical Context with ChromaDB
#!/usr/bin/env python3
"""
RAG-Enhanced LLM Alert Triage
Uses ChromaDB vector database for historical context
"""
import ollama
import chromadb
from chromadb.utils import embedding_functions
import json
# Initialize vector database
client = chromadb.Client()
embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction()
collection = client.create_collection(
name="alert_history",
embedding_function=embedding_fn
)
def add_historical_alerts():
"""Populate vector DB with historical alert data."""
historical_alerts = [
{
"id": "alert_001",
"log": "SSH login from 203.0.113.5",
"severity": "Low",
"context": "VPN endpoint, routine connection"
},
{
"id": "alert_002",
"log": "Port scan from 198.51.100.42",
"severity": "High",
"context": "Known scanner, block recommended"
}
# ... more historical alerts
]
for alert in historical_alerts:
collection.add(
documents=[alert['log']],
metadatas=[{
'severity': alert['severity'],
'context': alert['context']
}],
ids=[alert['id']]
)
def get_similar_alerts(new_alert_log, limit=5):
"""Find historically similar alerts using vector similarity."""
results = collection.query(
query_texts=[new_alert_log],
n_results=limit
)
return results['documents'][0], results['metadatas'][0]
def create_rag_prompt(alert):
"""Generate prompt with RAG context."""
# Find similar historical alerts
similar_logs, similar_metadata = get_similar_alerts(alert['full_log'])
# Build context string
context_lines = []
for log, meta in zip(similar_logs, similar_metadata):
context_lines.append(f"- {log} (severity: {meta['severity']}, context: {meta['context']})")
context = "\n".join(context_lines)
prompt = f"""Classify this alert. Consider similar historical alerts for context:
**Current Alert:**
{alert['full_log']}
**Similar Historical Alerts:**
{context}
**Task:**
Classify severity: Critical, High, Medium, or Low
Explain reasoning considering historical context
**JSON Response:**
{{
"severity": "...",
"reasoning": "...",
"next_step": "..."
}}
"""
return prompt
def classify_with_rag(alert):
"""Classify alert using RAG-enhanced LLM."""
prompt = create_rag_prompt(alert)
response = ollama.chat(
model='llama3.1:8b',
messages=[{'role': 'user', 'content': prompt}],
options={'temperature': 0.1}
)
return json.loads(response['message']['content'])
if __name__ == "__main__":
# Setup historical context
add_historical_alerts()
# Example: Classify new alert with RAG context
new_alert = {
"full_log": "SSH login from 203.0.113.5",
"rule": {"id": "5710", "description": "SSH authentication success"}
}
classification = classify_with_rag(new_alert)
print(json.dumps(classification, indent=2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment