Created
July 3, 2025 07:06
-
-
Save tellmeY18/7a9b17fdea3dd06fc6abd9a53237b65d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from langchain_anthropic import ChatAnthropic | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.chains import RetrievalQA | |
| from langchain.prompts import PromptTemplate | |
| from datetime import datetime, timedelta | |
| from hashlib import md5 | |
| import os | |
| import re | |
| import logging | |
| # Configuration | |
| MAIN_CHROMA_DB_DIR = "chroma_db" | |
| FAQ_CHROMA_DB_DIR = "faq_cache_db" | |
| ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") | |
| CACHE_MAX_AGE_DAYS = 30 | |
| CACHE_MIN_SIMILARITY = 0.82 | |
| CACHE_ACCESS_THRESHOLD = 3 | |
| # Security patterns | |
| INJECTION_PATTERNS = [ | |
| r"(?i)ignore previous", | |
| r"(?i)system prompt", | |
| r"(?i)hidden command", | |
| r"(?i)internal only" | |
| ] | |
| app = Flask(__name__) | |
| CORS(app) | |
| logging.basicConfig(level=logging.INFO) | |
| # Global system components | |
| main_db = None | |
| faq_db = None | |
| qa_system = None | |
| def sanitize_input(query): | |
| """Sanitize user input against prompt injections""" | |
| query = re.sub(r'[^\w\s\-?.]', '', query) | |
| for pattern in INJECTION_PATTERNS: | |
| query = re.sub(pattern, '', query) | |
| return query.strip()[:500] | |
| CUSTOM_PROMPT = PromptTemplate( | |
| template="""Respond as a helpful Habitat representative using only this context. If unsure, say "Let me check that for you." | |
| <context> | |
| {context} | |
| </context> | |
| Question: {question} | |
| Habitat Response:""", | |
| input_variables=["context", "question"] | |
| ) | |
| def initialize_qa_system(): | |
| """Initialize components with proper separation""" | |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| main_db = Chroma( | |
| persist_directory=MAIN_CHROMA_DB_DIR, | |
| embedding_function=embeddings | |
| ) | |
| faq_db = Chroma( | |
| persist_directory=FAQ_CHROMA_DB_DIR, | |
| embedding_function=embeddings | |
| ) | |
| llm = ChatAnthropic( | |
| anthropic_api_key=ANTHROPIC_API_KEY, | |
| model_name="claude-3-5-sonnet-latest", | |
| temperature=0.0, | |
| max_tokens=512, | |
| model_kwargs={ | |
| "system": "You are a expert assistant. Only use provided context." | |
| } | |
| ) | |
| return main_db, faq_db, RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=main_db.as_retriever( | |
| search_type="mmr", | |
| search_kwargs={"k": 4, "score_threshold": 0.7} | |
| ), | |
| chain_type_kwargs={"prompt": CUSTOM_PROMPT}, | |
| return_source_documents=True | |
| ) | |
| def get_content_hash(docs): | |
| """Create stable hash from source content""" | |
| content = "".join(d.page_content[:500] for d in docs) | |
| return md5(content.encode()).hexdigest() | |
| def query_faq_cache(faq_db, query, source_hash): | |
| """Find cached responses with content validation""" | |
| try: | |
| results = faq_db.similarity_search_with_score( | |
| query, | |
| k=3, | |
| filter={"source_hash": source_hash} | |
| ) | |
| now = datetime.now() | |
| valid = [ | |
| (r[0], r[1]) for r in results | |
| if r[1] >= CACHE_MIN_SIMILARITY and | |
| datetime.fromisoformat(r[0].metadata["timestamp"]) > now - timedelta(days=CACHE_MAX_AGE_DAYS) | |
| ] | |
| return max(valid, key=lambda x: x[1])[0].metadata["answer"] if valid else None | |
| except Exception as e: | |
| logging.error(f"Cache query error: {str(e)}") | |
| return None | |
| def store_in_faq_cache(faq_db, query, response, source_docs): | |
| """Store response with content validation metadata""" | |
| try: | |
| source_hash = get_content_hash(source_docs) | |
| doc_id = md5(f"{query}_{source_hash}".encode()).hexdigest() | |
| metadata = { | |
| "question": query, | |
| "answer": response, | |
| "timestamp": datetime.now().isoformat(), | |
| "source_hash": source_hash, | |
| "query_variations": [query], | |
| "access_count": 1 | |
| } | |
| faq_db._client.get_collection(faq_db._collection.name).upsert( | |
| ids=[doc_id], | |
| documents=[f"Q: {query}\nA: {response}"], | |
| metadatas=[metadata] | |
| ) | |
| except Exception as e: | |
| logging.error(f"Cache storage error: {str(e)}") | |
| def handle_cache(faq_db, query, response): | |
| """Intelligent cache management workflow""" | |
| if not response["source_documents"]: | |
| return None | |
| source_hash = get_content_hash(response["source_documents"]) | |
| cached = query_faq_cache(faq_db, query, source_hash) | |
| if cached: | |
| return cached | |
| if (len(response["source_documents"]) >= 2 and | |
| response["result"] != "I don't know." and | |
| "I don't have" not in response["result"]): | |
| store_in_faq_cache(faq_db, query, response["result"], response["source_documents"]) | |
| return None | |
| @app.before_request | |
| def initialize_app(): | |
| global main_db, faq_db, qa_system | |
| main_db, faq_db, qa_system = initialize_qa_system() | |
| @app.route('/query', methods=['POST']) | |
| def process_query(): | |
| data = request.get_json() | |
| if not data or 'query' not in data: | |
| return jsonify({"error": "Missing query parameter"}), 400 | |
| raw_query = data['query'] | |
| query = sanitize_input(raw_query) | |
| if not query: | |
| return jsonify({"error": "Invalid query after sanitization"}), 400 | |
| try: | |
| response = qa_system.invoke({"query": query}) | |
| raw_answer = response["result"] | |
| # Process answer for display | |
| processed_answer = raw_answer.replace("According to the context", "In our records") | |
| processed_answer = processed_answer.replace("the context shows", "we have") | |
| if processed_answer.startswith("Answer:"): | |
| processed_answer = processed_answer.replace("Answer:", "Here's what we know:") | |
| # Cache handling | |
| cached_answer = handle_cache(faq_db, query, response) | |
| if cached_answer: | |
| # Process cached answer similarly | |
| cached_answer = cached_answer.replace("According to the context", "In our records") | |
| cached_answer = cached_answer.replace("the context shows", "we have") | |
| if cached_answer.startswith("Answer:"): | |
| cached_answer = cached_answer.replace("Answer:", "Here's what we know:") | |
| return jsonify({"answer": cached_answer, "cached": True}) | |
| return jsonify({"answer": processed_answer, "cached": False}) | |
| except Exception as e: | |
| logging.error(f"Error processing query: {str(e)}") | |
| return jsonify({"error": "Internal server error"}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=6969) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment