Last active
November 28, 2025 09:21
-
-
Save akiraaisha/0b569ba492259be66ef61ea58fe0318e to your computer and use it in GitHub Desktop.
Paperless-ngx_and_Openwebui_pipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: TestPipeline | |
| author: 0xThresh | |
| date: 2024-08-11 | |
| version: 1.1 | |
| license: MIT | |
| description: A pipeline for using text-to-SQL for retrieving relevant information from a database using the Llama Index library. | |
| requirements: llama_index, sqlalchemy, psycopg2-binary | |
| """ | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Any | |
| import os | |
| import torch | |
| import openai | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import Filter | |
| import asyncio | |
| # Global Config (from env) | |
| EMBED_LOCAL = os.getenv('EMBED_LOCAL', 'true').lower() == 'true' | |
| RERANK_LOCAL = os.getenv('RERANK_LOCAL', 'true').lower() == 'true' | |
| OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY') | |
| OPENROUTER_BASE = "https://openrouter.ai/api/v1" | |
| EMBED_MODEL_OPENROUTER = os.getenv('EMBED_MODEL_OPENROUTER', 'google/gemini-embedding-exp-03-07') | |
| RERANK_MODEL_OPENROUTER = os.getenv('RERANK_MODEL_OPENROUTER', 'BAAI/bge-reranker-v2-gemma') | |
| COLLECTION_NAME = "paperless_docs" | |
| openai.api_key = OPENROUTER_API_KEY | |
| openai.api_base = OPENROUTER_BASE | |
| class Valves(BaseModel): | |
| qdrant_client: QdrantClient = QdrantClient("http://qdrant:6333") | |
| class Pipe: | |
| def __init__(self): | |
| self.valves = Valves() # Fixed: removed self.Valves() | |
| # Lazy init models | |
| self.embedder = None | |
| self.reranker_tokenizer = None | |
| self.reranker_model = None | |
| async def _embed_query(self, query: str) -> List[float]: | |
| if EMBED_LOCAL: | |
| if self.embedder is None: | |
| self.embedder = SentenceTransformer("BAAI/bge-large-en-v1.5") | |
| return self.embedder.encode(query).tolist() | |
| else: | |
| resp = openai.Embedding.create(input=query, model=EMBED_MODEL_OPENROUTER) | |
| return resp['data'][0]['embedding'] | |
| async def _rerank(self, query: str, docs: List[Dict]) -> List[Dict]: | |
| reranked_docs = [] | |
| texts = [doc['text'] for doc in docs] | |
| if RERANK_LOCAL: | |
| if self.reranker_tokenizer is None: | |
| self.reranker_tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-reranker-large") | |
| self.reranker_model = AutoModelForSequenceClassification.from_pretrained("BAAI/bge-reranker-large") | |
| for text, doc in zip(texts, docs): | |
| inputs = self.reranker_tokenizer(query, text, return_tensors='pt', truncation=True, max_length=512) | |
| with torch.no_grad(): | |
| score = self.reranker_model(**inputs).logits[0].numpy()[0] | |
| reranked_docs.append({'doc': doc, 'score': float(score)}) | |
| else: | |
| # Openrouter rerank: Many models take [{"query": q, "text": t}] | |
| rerank_inputs = [{"query": query, "text": t} for t in texts] | |
| resp = openai.ChatCompletion.create( | |
| model=RERANK_MODEL_OPENROUTER, | |
| messages=[{"role": "user", "content": "Rerank these by relevance: " + str(rerank_inputs)}], | |
| temperature=0.0 | |
| ) | |
| # Placeholder scoring - adjust based on actual API response | |
| scores = [0.9 - i*0.02 for i in range(len(texts))] | |
| for text, doc, score in zip(texts, docs, scores): | |
| reranked_docs.append({'doc': doc, 'score': score}) | |
| return sorted(reranked_docs, key=lambda x: x['score'], reverse=True) | |
| async def pipe(self, body: Dict[str, Any]) -> Dict[str, Any]: # Made async | |
| query = body['message']['content'] | |
| # Embed + Retrieve (with await) | |
| q_vector = await self._embed_query(query) # Added await | |
| hits = self.valves.qdrant_client.search( | |
| COLLECTION_NAME, | |
| query_vector=q_vector, | |
| limit=50 | |
| ) | |
| contexts = [{'text': hit.payload['text'], **hit.payload} for hit in hits] | |
| # Rerank (with await) | |
| top_contexts = await self._rerank(query, contexts) # Added await | |
| top_contexts = top_contexts[:5] # Get top 5 after await | |
| # Citations | |
| formatted_context = [] | |
| for item in top_contexts: | |
| payload = item['doc'] | |
| citation = f"[Doc ID: {payload['doc_id']} | Title: {payload['doc_title']} | Page: {payload['page']} | Score: {item['score']:.2f}]" | |
| formatted_context.append(f"{citation}\n{payload['text']}\n") | |
| body['context'] = "\n\n".join(formatted_context) | |
| body['body'] = f"Answer using these sources (cite with [Doc ID]):\n{body['context']}\n\nQuestion: {query}" | |
| return body |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment