Skip to content

Instantly share code, notes, and snippets.

@akiraaisha
Last active November 16, 2025 18:25
Show Gist options
  • Select an option

  • Save akiraaisha/b1e50befa7b28f00abff14b216e5d549 to your computer and use it in GitHub Desktop.

Select an option

Save akiraaisha/b1e50befa7b28f00abff14b216e5d549 to your computer and use it in GitHub Desktop.
"""
title: Paperless-ngx RAG Pipeline
author: Sherlock Think Alpha
author_url: https://example.com
git_url: https://github.com/open-webui/pipelines/
description: RAG for Paperless-ngx. Retrieves relevant docs via API, injects formatted context into messages for LLM. Configurable via valves.
required_open_webui_version: 0.4.3
requirements: requests,pydantic
version: 1.1
license: MIT
"""
from typing import List, Union, Generator, Iterator, Dict, Any, Optional
from pydantic import BaseModel, Field
import requests
import os
import re
import json
from datetime import datetime
import time
from logging import getLogger
logger = getLogger(__name__)
logger.setLevel("DEBUG")
class Pipeline:
class Valves(BaseModel):
# Core Config
PAPERLESS_URL: str = Field(default="http://192.168.0.84:8000", description="Paperless-ngx base URL (e.g., http://192.168.0.84:8000)")
PAPERLESS_TOKEN: str = Field(default="05cffa1b34db869d9a143584fe2a77b57c74b8f5", description="Paperless-ngx API Token")
# Retrieval
TOP_K: int = Field(default=10, description="Max docs to retrieve (1-10)")
MAX_CONTENT_LEN: int = Field(default=3000000, description="Max chars per doc content")
ENABLED: bool = Field(default=True, description="Enable this pipeline")
# Filters
TAGS_SLUG: Optional[str] = Field(default=None, description="Filter by tag slug (e.g., invoice)")
DATE_AFTER: Optional[str] = Field(default=None, description="Filter date_after (YYYY-MM-DD)")
# Debug & Rate
DEBUG_MODE: bool = Field(default=True, description="Show full logs/stats/links in chat UI")
FALLBACK_RECENT: bool = Field(default=True, description="If 0 matches, show top recent docs")
RATE_LIMIT: int = Field(default=10, description="Rate limit (ops/minute)")
# Mode
MODE: str = Field(default="augment", description="augment (inject context) | direct")
def __init__(self):
self.name = "Paperless-ngx RAG (v1.4 Fixed)"
self._update_valves()
def _update_valves(self):
self.valves = self.Valves(
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
)
self.valves.PAPERLESS_URL = self.valves.PAPERLESS_URL.rstrip('/')
logger.info(f"Valves: URL={self.valves.PAPERLESS_URL}, TOP_K={self.valves.TOP_K}")
def _get_headers(self):
return {'Authorization': f'Token {self.valves.PAPERLESS_TOKEN}'}
def _get_health(self) -> Dict[str, Any]:
try:
resp = requests.get(f"{self.valves.PAPERLESS_URL}/api/documents/", headers=self._get_headers(), timeout=5)
resp.raise_for_status()
data = resp.json()
return {'status': 'βœ… OK', 'total_docs': data.get('count', 0), 'error': None}
except Exception as e:
return {'status': '❌ ERROR', 'total_docs': 0, 'error': str(e)}
def _build_query_params(self, query: str, recent: bool = False) -> str:
if recent or not query.strip():
return f"page_size={self.valves.TOP_K}&ordering=-created"
params = f"query={requests.utils.quote(query)}&page_size={self.valves.TOP_K}&ordering=-created"
if self.valves.TAGS_SLUG:
params += f"&tags__slug={self.valves.TAGS_SLUG}"
if self.valves.DATE_AFTER:
params += f"&date_after={self.valves.DATE_AFTER}"
return params
def _format_context(self, contexts: List[str], title: str = "Context") -> str:
return f"**{title}** ({len(contexts)} docs):\n\n" + "\n\n---\n\n".join(contexts)
def _retrieve_docs(self, query: str, recent_fallback: bool = False) -> tuple[str, int, str]:
if not self.valves.ENABLED:
return "Pipeline disabled.", 0, "Disabled."
health = self._get_health()
paperless_ui = f"{self.valves.PAPERLESS_URL}"
paperless_search = f"{paperless_ui}/?q={requests.utils.quote(query) if query else ''}"
debug_parts = []
if self.valves.DEBUG_MODE:
debug_parts.extend([
"πŸ” **Paperless Debug**",
f"- URL: [{paperless_ui}]({paperless_ui})",
f"- Search: [{paperless_search}]({paperless_search})",
f"- Health: {health['status']} (Total docs: {health['total_docs']})"
])
if health['error']:
debug_parts.append(f"- Error: {health['error']}")
try:
params = self._build_query_params(query, recent=recent_fallback)
search_url = f"{self.valves.PAPERLESS_URL}/api/documents/?{params}"
if self.valves.DEBUG_MODE:
debug_parts.append(f"- API Call: `{search_url}`")
search_resp = requests.get(search_url, headers=self._get_headers(), timeout=10)
search_resp.raise_for_status()
data = search_resp.json()
docs = data.get('results', [])
matches = len(docs)
if self.valves.DEBUG_MODE:
debug_parts.append(f"- Matches: {matches}/{data.get('count', 0)}")
contexts = []
doc_debugs = []
for i, doc in enumerate(docs[:self.valves.TOP_K]):
try:
doc_url = f"{self.valves.PAPERLESS_URL}/api/documents/{doc['id']}/"
doc_resp = requests.get(doc_url, headers=self._get_headers(), timeout=10)
doc_resp.raise_for_status()
doc_data = doc_resp.json()
content = (doc_data.get('content', '')[:self.valves.MAX_CONTENT_LEN]).strip()
doc_id = doc['id']
if content:
# FIXED: Use doc_data['document']
doc_slug = doc_data.get('document', str(doc_id))
view_url = f"{self.valves.PAPERLESS_URL}/documents/{doc_slug}/"
title = doc_data.get('title', f'Doc {doc_id}')
contexts.append(
f"**{title}** (ID: {doc_id})\n\n{content}\n\n*Source: [View]({view_url})*"
)
doc_debugs.append(f"- Doc {doc_id}: OK ({len(content)} chars)")
else:
doc_debugs.append(f"- Doc {doc_id}: Empty content (re-OCR?)")
except Exception as doc_err:
doc_debugs.append(f"- Doc {doc['id']}: Fetch failed - {str(doc_err)}")
if self.valves.DEBUG_MODE:
debug_parts.extend(doc_debugs)
if contexts:
title = "Recent Docs" if recent_fallback else "Context"
context = self._format_context(contexts, title)
tips = "\nπŸ’‘ **Tips**: Use exact titles/filenames. Re-OCR empty docs (Admin > OCR)."
return context + tips, len(contexts), "\n".join(debug_parts)
else:
no_content_msg = "No content in docs. Check OCR (Admin > OCR > Re-process all)."
if recent_fallback or not self.valves.FALLBACK_RECENT:
return no_content_msg, 0, "\n".join(debug_parts)
# Recursive recent fallback
logger.info("No content β†’ Recent fallback")
return self._retrieve_docs(query, recent_fallback=True)
except Exception as e:
error_msg = f"API Error: {str(e)}"
logger.error(error_msg)
return error_msg, 0, "\n".join(debug_parts + [f"- Full Error: {str(e)}"])
async def on_startup(self):
logger.info(f"πŸš€ {self.name} starting...")
self._update_valves()
health = self._get_health()
logger.info(f"Health: {health}")
# Test recent
_, _, debug = self._retrieve_docs("test")
logger.info(f"Test debug: {debug[:200]}...")
async def on_shutdown(self):
logger.info(f"πŸ›‘ {self.name} shutdown")
async def on_valves_updated(self) -> None:
self._update_valves()
def rate_check(self, dt_start: datetime) -> bool:
dt_end = datetime.now()
time_diff = (dt_end - dt_start).total_seconds()
time_buffer = 60 / self.valves.RATE_LIMIT
if time_diff >= time_buffer: return False
time.sleep(time_buffer - time_diff)
return True
def pipe(
self,
user_message: str,
model_id: str,
messages: List[dict],
body: dict
) -> Union[str, Generator, Iterator, List[dict]]:
if not self.valves.ENABLED:
if body.get("stream", False):
yield "❌ Pipeline disabled."
return
return "Pipeline disabled."
dt_start = datetime.now()
streaming = body.get("stream", False)
logger.info(f"Query: '{user_message}'")
self.rate_check(dt_start)
context, num_docs, debug_md = self._retrieve_docs(user_message)
if streaming or self.valves.DEBUG_MODE:
if debug_md.strip():
yield f"{debug_md}\n\n"
yield f"πŸ“„ {context}\n\n"
if self.valves.MODE == "direct":
yield "🧠 Switch to 'augment' for LLM RAG.\n"
return
if self.valves.MODE == "augment":
augmented_messages = messages.copy()
last_msg = augmented_messages[-1]
if last_msg.get('role') == 'user':
last_msg['content'] = (
f"{debug_md}\n\n"
f"{context}\n\n"
f"---\n\n"
f"**Question**: {last_msg['content']}\n\n"
f"Answer using ONLY context. Cite [Doc ID]."
)
return augmented_messages
else:
return f"{debug_md}\n\n{context}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment