Skip to content

Instantly share code, notes, and snippets.

@tellmeY18
Created July 3, 2025 07:06
Show Gist options
  • Select an option

  • Save tellmeY18/7a9b17fdea3dd06fc6abd9a53237b65d to your computer and use it in GitHub Desktop.

Select an option

Save tellmeY18/7a9b17fdea3dd06fc6abd9a53237b65d to your computer and use it in GitHub Desktop.
from flask import Flask, request, jsonify
from flask_cors import CORS
from langchain_anthropic import ChatAnthropic
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from datetime import datetime, timedelta
from hashlib import md5
import os
import re
import logging
# Configuration
MAIN_CHROMA_DB_DIR = "chroma_db"
FAQ_CHROMA_DB_DIR = "faq_cache_db"
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
CACHE_MAX_AGE_DAYS = 30
CACHE_MIN_SIMILARITY = 0.82
CACHE_ACCESS_THRESHOLD = 3
# Security patterns
INJECTION_PATTERNS = [
r"(?i)ignore previous",
r"(?i)system prompt",
r"(?i)hidden command",
r"(?i)internal only"
]
app = Flask(__name__)
CORS(app)
logging.basicConfig(level=logging.INFO)
# Global system components
main_db = None
faq_db = None
qa_system = None
def sanitize_input(query):
"""Sanitize user input against prompt injections"""
query = re.sub(r'[^\w\s\-?.]', '', query)
for pattern in INJECTION_PATTERNS:
query = re.sub(pattern, '', query)
return query.strip()[:500]
CUSTOM_PROMPT = PromptTemplate(
template="""Respond as a helpful Habitat representative using only this context. If unsure, say "Let me check that for you."
<context>
{context}
</context>
Question: {question}
Habitat Response:""",
input_variables=["context", "question"]
)
def initialize_qa_system():
"""Initialize components with proper separation"""
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
main_db = Chroma(
persist_directory=MAIN_CHROMA_DB_DIR,
embedding_function=embeddings
)
faq_db = Chroma(
persist_directory=FAQ_CHROMA_DB_DIR,
embedding_function=embeddings
)
llm = ChatAnthropic(
anthropic_api_key=ANTHROPIC_API_KEY,
model_name="claude-3-5-sonnet-latest",
temperature=0.0,
max_tokens=512,
model_kwargs={
"system": "You are a expert assistant. Only use provided context."
}
)
return main_db, faq_db, RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=main_db.as_retriever(
search_type="mmr",
search_kwargs={"k": 4, "score_threshold": 0.7}
),
chain_type_kwargs={"prompt": CUSTOM_PROMPT},
return_source_documents=True
)
def get_content_hash(docs):
"""Create stable hash from source content"""
content = "".join(d.page_content[:500] for d in docs)
return md5(content.encode()).hexdigest()
def query_faq_cache(faq_db, query, source_hash):
"""Find cached responses with content validation"""
try:
results = faq_db.similarity_search_with_score(
query,
k=3,
filter={"source_hash": source_hash}
)
now = datetime.now()
valid = [
(r[0], r[1]) for r in results
if r[1] >= CACHE_MIN_SIMILARITY and
datetime.fromisoformat(r[0].metadata["timestamp"]) > now - timedelta(days=CACHE_MAX_AGE_DAYS)
]
return max(valid, key=lambda x: x[1])[0].metadata["answer"] if valid else None
except Exception as e:
logging.error(f"Cache query error: {str(e)}")
return None
def store_in_faq_cache(faq_db, query, response, source_docs):
"""Store response with content validation metadata"""
try:
source_hash = get_content_hash(source_docs)
doc_id = md5(f"{query}_{source_hash}".encode()).hexdigest()
metadata = {
"question": query,
"answer": response,
"timestamp": datetime.now().isoformat(),
"source_hash": source_hash,
"query_variations": [query],
"access_count": 1
}
faq_db._client.get_collection(faq_db._collection.name).upsert(
ids=[doc_id],
documents=[f"Q: {query}\nA: {response}"],
metadatas=[metadata]
)
except Exception as e:
logging.error(f"Cache storage error: {str(e)}")
def handle_cache(faq_db, query, response):
"""Intelligent cache management workflow"""
if not response["source_documents"]:
return None
source_hash = get_content_hash(response["source_documents"])
cached = query_faq_cache(faq_db, query, source_hash)
if cached:
return cached
if (len(response["source_documents"]) >= 2 and
response["result"] != "I don't know." and
"I don't have" not in response["result"]):
store_in_faq_cache(faq_db, query, response["result"], response["source_documents"])
return None
@app.before_request
def initialize_app():
global main_db, faq_db, qa_system
main_db, faq_db, qa_system = initialize_qa_system()
@app.route('/query', methods=['POST'])
def process_query():
data = request.get_json()
if not data or 'query' not in data:
return jsonify({"error": "Missing query parameter"}), 400
raw_query = data['query']
query = sanitize_input(raw_query)
if not query:
return jsonify({"error": "Invalid query after sanitization"}), 400
try:
response = qa_system.invoke({"query": query})
raw_answer = response["result"]
# Process answer for display
processed_answer = raw_answer.replace("According to the context", "In our records")
processed_answer = processed_answer.replace("the context shows", "we have")
if processed_answer.startswith("Answer:"):
processed_answer = processed_answer.replace("Answer:", "Here's what we know:")
# Cache handling
cached_answer = handle_cache(faq_db, query, response)
if cached_answer:
# Process cached answer similarly
cached_answer = cached_answer.replace("According to the context", "In our records")
cached_answer = cached_answer.replace("the context shows", "we have")
if cached_answer.startswith("Answer:"):
cached_answer = cached_answer.replace("Answer:", "Here's what we know:")
return jsonify({"answer": cached_answer, "cached": True})
return jsonify({"answer": processed_answer, "cached": False})
except Exception as e:
logging.error(f"Error processing query: {str(e)}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=6969)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment