Last active
November 16, 2025 18:25
-
-
Save akiraaisha/b1e50befa7b28f00abff14b216e5d549 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Paperless-ngx RAG Pipeline | |
| author: Sherlock Think Alpha | |
| author_url: https://example.com | |
| git_url: https://github.com/open-webui/pipelines/ | |
| description: RAG for Paperless-ngx. Retrieves relevant docs via API, injects formatted context into messages for LLM. Configurable via valves. | |
| required_open_webui_version: 0.4.3 | |
| requirements: requests,pydantic | |
| version: 1.1 | |
| license: MIT | |
| """ | |
| from typing import List, Union, Generator, Iterator, Dict, Any, Optional | |
| from pydantic import BaseModel, Field | |
| import requests | |
| import os | |
| import re | |
| import json | |
| from datetime import datetime | |
| import time | |
| from logging import getLogger | |
| logger = getLogger(__name__) | |
| logger.setLevel("DEBUG") | |
| class Pipeline: | |
| class Valves(BaseModel): | |
| # Core Config | |
| PAPERLESS_URL: str = Field(default="http://192.168.0.84:8000", description="Paperless-ngx base URL (e.g., http://192.168.0.84:8000)") | |
| PAPERLESS_TOKEN: str = Field(default="05cffa1b34db869d9a143584fe2a77b57c74b8f5", description="Paperless-ngx API Token") | |
| # Retrieval | |
| TOP_K: int = Field(default=10, description="Max docs to retrieve (1-10)") | |
| MAX_CONTENT_LEN: int = Field(default=3000000, description="Max chars per doc content") | |
| ENABLED: bool = Field(default=True, description="Enable this pipeline") | |
| # Filters | |
| TAGS_SLUG: Optional[str] = Field(default=None, description="Filter by tag slug (e.g., invoice)") | |
| DATE_AFTER: Optional[str] = Field(default=None, description="Filter date_after (YYYY-MM-DD)") | |
| # Debug & Rate | |
| DEBUG_MODE: bool = Field(default=True, description="Show full logs/stats/links in chat UI") | |
| FALLBACK_RECENT: bool = Field(default=True, description="If 0 matches, show top recent docs") | |
| RATE_LIMIT: int = Field(default=10, description="Rate limit (ops/minute)") | |
| # Mode | |
| MODE: str = Field(default="augment", description="augment (inject context) | direct") | |
| def __init__(self): | |
| self.name = "Paperless-ngx RAG (v1.4 Fixed)" | |
| self._update_valves() | |
| def _update_valves(self): | |
| self.valves = self.Valves( | |
| **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()} | |
| ) | |
| self.valves.PAPERLESS_URL = self.valves.PAPERLESS_URL.rstrip('/') | |
| logger.info(f"Valves: URL={self.valves.PAPERLESS_URL}, TOP_K={self.valves.TOP_K}") | |
| def _get_headers(self): | |
| return {'Authorization': f'Token {self.valves.PAPERLESS_TOKEN}'} | |
| def _get_health(self) -> Dict[str, Any]: | |
| try: | |
| resp = requests.get(f"{self.valves.PAPERLESS_URL}/api/documents/", headers=self._get_headers(), timeout=5) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| return {'status': 'β OK', 'total_docs': data.get('count', 0), 'error': None} | |
| except Exception as e: | |
| return {'status': 'β ERROR', 'total_docs': 0, 'error': str(e)} | |
| def _build_query_params(self, query: str, recent: bool = False) -> str: | |
| if recent or not query.strip(): | |
| return f"page_size={self.valves.TOP_K}&ordering=-created" | |
| params = f"query={requests.utils.quote(query)}&page_size={self.valves.TOP_K}&ordering=-created" | |
| if self.valves.TAGS_SLUG: | |
| params += f"&tags__slug={self.valves.TAGS_SLUG}" | |
| if self.valves.DATE_AFTER: | |
| params += f"&date_after={self.valves.DATE_AFTER}" | |
| return params | |
| def _format_context(self, contexts: List[str], title: str = "Context") -> str: | |
| return f"**{title}** ({len(contexts)} docs):\n\n" + "\n\n---\n\n".join(contexts) | |
| def _retrieve_docs(self, query: str, recent_fallback: bool = False) -> tuple[str, int, str]: | |
| if not self.valves.ENABLED: | |
| return "Pipeline disabled.", 0, "Disabled." | |
| health = self._get_health() | |
| paperless_ui = f"{self.valves.PAPERLESS_URL}" | |
| paperless_search = f"{paperless_ui}/?q={requests.utils.quote(query) if query else ''}" | |
| debug_parts = [] | |
| if self.valves.DEBUG_MODE: | |
| debug_parts.extend([ | |
| "π **Paperless Debug**", | |
| f"- URL: [{paperless_ui}]({paperless_ui})", | |
| f"- Search: [{paperless_search}]({paperless_search})", | |
| f"- Health: {health['status']} (Total docs: {health['total_docs']})" | |
| ]) | |
| if health['error']: | |
| debug_parts.append(f"- Error: {health['error']}") | |
| try: | |
| params = self._build_query_params(query, recent=recent_fallback) | |
| search_url = f"{self.valves.PAPERLESS_URL}/api/documents/?{params}" | |
| if self.valves.DEBUG_MODE: | |
| debug_parts.append(f"- API Call: `{search_url}`") | |
| search_resp = requests.get(search_url, headers=self._get_headers(), timeout=10) | |
| search_resp.raise_for_status() | |
| data = search_resp.json() | |
| docs = data.get('results', []) | |
| matches = len(docs) | |
| if self.valves.DEBUG_MODE: | |
| debug_parts.append(f"- Matches: {matches}/{data.get('count', 0)}") | |
| contexts = [] | |
| doc_debugs = [] | |
| for i, doc in enumerate(docs[:self.valves.TOP_K]): | |
| try: | |
| doc_url = f"{self.valves.PAPERLESS_URL}/api/documents/{doc['id']}/" | |
| doc_resp = requests.get(doc_url, headers=self._get_headers(), timeout=10) | |
| doc_resp.raise_for_status() | |
| doc_data = doc_resp.json() | |
| content = (doc_data.get('content', '')[:self.valves.MAX_CONTENT_LEN]).strip() | |
| doc_id = doc['id'] | |
| if content: | |
| # FIXED: Use doc_data['document'] | |
| doc_slug = doc_data.get('document', str(doc_id)) | |
| view_url = f"{self.valves.PAPERLESS_URL}/documents/{doc_slug}/" | |
| title = doc_data.get('title', f'Doc {doc_id}') | |
| contexts.append( | |
| f"**{title}** (ID: {doc_id})\n\n{content}\n\n*Source: [View]({view_url})*" | |
| ) | |
| doc_debugs.append(f"- Doc {doc_id}: OK ({len(content)} chars)") | |
| else: | |
| doc_debugs.append(f"- Doc {doc_id}: Empty content (re-OCR?)") | |
| except Exception as doc_err: | |
| doc_debugs.append(f"- Doc {doc['id']}: Fetch failed - {str(doc_err)}") | |
| if self.valves.DEBUG_MODE: | |
| debug_parts.extend(doc_debugs) | |
| if contexts: | |
| title = "Recent Docs" if recent_fallback else "Context" | |
| context = self._format_context(contexts, title) | |
| tips = "\nπ‘ **Tips**: Use exact titles/filenames. Re-OCR empty docs (Admin > OCR)." | |
| return context + tips, len(contexts), "\n".join(debug_parts) | |
| else: | |
| no_content_msg = "No content in docs. Check OCR (Admin > OCR > Re-process all)." | |
| if recent_fallback or not self.valves.FALLBACK_RECENT: | |
| return no_content_msg, 0, "\n".join(debug_parts) | |
| # Recursive recent fallback | |
| logger.info("No content β Recent fallback") | |
| return self._retrieve_docs(query, recent_fallback=True) | |
| except Exception as e: | |
| error_msg = f"API Error: {str(e)}" | |
| logger.error(error_msg) | |
| return error_msg, 0, "\n".join(debug_parts + [f"- Full Error: {str(e)}"]) | |
| async def on_startup(self): | |
| logger.info(f"π {self.name} starting...") | |
| self._update_valves() | |
| health = self._get_health() | |
| logger.info(f"Health: {health}") | |
| # Test recent | |
| _, _, debug = self._retrieve_docs("test") | |
| logger.info(f"Test debug: {debug[:200]}...") | |
| async def on_shutdown(self): | |
| logger.info(f"π {self.name} shutdown") | |
| async def on_valves_updated(self) -> None: | |
| self._update_valves() | |
| def rate_check(self, dt_start: datetime) -> bool: | |
| dt_end = datetime.now() | |
| time_diff = (dt_end - dt_start).total_seconds() | |
| time_buffer = 60 / self.valves.RATE_LIMIT | |
| if time_diff >= time_buffer: return False | |
| time.sleep(time_buffer - time_diff) | |
| return True | |
| def pipe( | |
| self, | |
| user_message: str, | |
| model_id: str, | |
| messages: List[dict], | |
| body: dict | |
| ) -> Union[str, Generator, Iterator, List[dict]]: | |
| if not self.valves.ENABLED: | |
| if body.get("stream", False): | |
| yield "β Pipeline disabled." | |
| return | |
| return "Pipeline disabled." | |
| dt_start = datetime.now() | |
| streaming = body.get("stream", False) | |
| logger.info(f"Query: '{user_message}'") | |
| self.rate_check(dt_start) | |
| context, num_docs, debug_md = self._retrieve_docs(user_message) | |
| if streaming or self.valves.DEBUG_MODE: | |
| if debug_md.strip(): | |
| yield f"{debug_md}\n\n" | |
| yield f"π {context}\n\n" | |
| if self.valves.MODE == "direct": | |
| yield "π§ Switch to 'augment' for LLM RAG.\n" | |
| return | |
| if self.valves.MODE == "augment": | |
| augmented_messages = messages.copy() | |
| last_msg = augmented_messages[-1] | |
| if last_msg.get('role') == 'user': | |
| last_msg['content'] = ( | |
| f"{debug_md}\n\n" | |
| f"{context}\n\n" | |
| f"---\n\n" | |
| f"**Question**: {last_msg['content']}\n\n" | |
| f"Answer using ONLY context. Cite [Doc ID]." | |
| ) | |
| return augmented_messages | |
| else: | |
| return f"{debug_md}\n\n{context}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment