Last active
July 28, 2025 15:21
-
-
Save pringlized/b4a30676d8fb03449a8fdf3e52fa59b4 to your computer and use it in GitHub Desktop.
OpenWebUI pipe to connect to a local FastAPI PydanticAI Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: FastAPI Razor Pipe | |
| author: Jim Pringle | |
| version: 0.0.3 | |
| This function connects to a local FastAPI server running an Ollama model. | |
| """ | |
| from typing import Optional, Callable, Awaitable | |
| from pydantic import BaseModel, Field | |
| import time | |
| import requests | |
| class Pipe: | |
| class Valves(BaseModel): | |
| fastapi_url: str = Field(default="http://localhost:8000/v1/chat/completions") | |
| model: str = Field(default="llama3.2:latest") | |
| emit_interval: float = Field( | |
| default=2.0, description="Interval in seconds between status emissions" | |
| ) | |
| enable_status_indicator: bool = Field( | |
| default=True, description="Enable or disable status indicator emissions" | |
| ) | |
| def __init__(self): | |
| self.type = "pipe" | |
| self.id = "fastapi_razor_pipe" | |
| self.name = "FastAPI Razor Pipe" | |
| self.valves = self.Valves() | |
| self.last_emit_time = 0 | |
| async def emit_status( | |
| self, | |
| __event_emitter__: Callable[[dict], Awaitable[None]], | |
| level: str, | |
| message: str, | |
| done: bool, | |
| ): | |
| current_time = time.time() | |
| if ( | |
| __event_emitter__ | |
| and self.valves.enable_status_indicator | |
| and ( | |
| current_time - self.last_emit_time >= self.valves.emit_interval or done | |
| ) | |
| ): | |
| await __event_emitter__( | |
| { | |
| "type": "status", | |
| "data": { | |
| "status": "complete" if done else "in_progress", | |
| "level": level, | |
| "description": message, | |
| "done": done, | |
| }, | |
| } | |
| ) | |
| self.last_emit_time = current_time | |
| async def pipe( | |
| self, | |
| body: dict, | |
| __user__: Optional[dict] = None, | |
| __event_emitter__: Callable[[dict], Awaitable[None]] = None, | |
| __event_call__: Callable[[dict], Awaitable[dict]] = None, | |
| ) -> Optional[dict]: | |
| await self.emit_status( | |
| __event_emitter__, "info", "Calling FastAPI Ollama server...", False | |
| ) | |
| messages = body.get("messages", []) | |
| if messages: | |
| user_message = messages[-1]["content"] | |
| try: | |
| # Construct request payload | |
| payload = { | |
| "model": self.valves.model, | |
| "messages": [{"role": "user", "content": user_message}], | |
| } | |
| # Send request to FastAPI server | |
| headers = {"Content-Type": "application/json"} | |
| response = requests.post( | |
| self.valves.fastapi_url, json=payload, headers=headers | |
| ) | |
| # Handle response | |
| if response.status_code == 200: | |
| api_response = response.json() | |
| print(api_response) | |
| assistant_reply = api_response["choices"][0]["message"]["content"] | |
| print(assistant_reply) | |
| else: | |
| raise Exception(f"Error: {response.status_code} - {response.text}") | |
| # Append assistant's response to the message list | |
| body["messages"].append( | |
| {"role": "assistant", "content": assistant_reply} | |
| ) | |
| except Exception as e: | |
| error_msg = f"Error during request: {str(e)}" | |
| await self.emit_status(__event_emitter__, "error", error_msg, True) | |
| # Ensure the error is appended to messages, so it's visible in Open WebUI | |
| body["messages"].append({"role": "assistant", "content": error_msg}) | |
| else: | |
| error_msg = "No messages found in the request body" | |
| await self.emit_status(__event_emitter__, "error", error_msg, True) | |
| # Append a system error message to chat history | |
| body["messages"].append({"role": "assistant", "content": error_msg}) | |
| await self.emit_status(__event_emitter__, "info", "Request complete", True) | |
| return assistant_reply # Always return the modified body |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment