Last active
July 1, 2025 06:30
-
-
Save tpdns90321/6bbe63ab5ec8b18644c3b6ba74ea3b46 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Anthropic Manifold Pipeline | |
| author: justinh-rahb, sriparashiva | |
| date: 2024-06-20 | |
| version: 1.4 | |
| license: MIT | |
| description: A pipeline for generating text and processing images using the Anthropic API. | |
| requirements: requests, sseclient-py | |
| environment_variables: ANTHROPIC_API_KEY, ANTHROPIC_THINKING_BUDGET_TOKENS, ANTHROPIC_ENABLE_THINKING | |
| """ | |
| import os | |
| import requests | |
| import json | |
| from typing import List, Union, Generator, Iterator | |
| from pydantic import BaseModel | |
| import sseclient | |
| from utils.pipelines.main import pop_system_message | |
| REASONING_EFFORT_BUDGET_TOKEN_MAP = { | |
| "none": None, | |
| "low": 1024, | |
| "medium": 4096, | |
| "high": 16384, | |
| "max": 32768, | |
| } | |
| # Maximum combined token limit for Claude 3.7 | |
| MAX_COMBINED_TOKENS = 64000 | |
| class Pipeline: | |
| class Valves(BaseModel): | |
| ANTHROPIC_API_KEY: str = "" | |
| def __init__(self): | |
| self.type = "manifold" | |
| self.id = "anthropic" | |
| self.name = "anthropic/" | |
| self.valves = self.Valves( | |
| **{ | |
| "ANTHROPIC_API_KEY": os.getenv( | |
| "ANTHROPIC_API_KEY", "your-api-key-here" | |
| ), | |
| } | |
| ) | |
| self.url = "https://api.anthropic.com/v1/messages" | |
| self.update_headers() | |
| def update_headers(self): | |
| self.headers = { | |
| "anthropic-version": "2023-06-01", | |
| "content-type": "application/json", | |
| "x-api-key": self.valves.ANTHROPIC_API_KEY, | |
| } | |
| def get_anthropic_models(self): | |
| return [ | |
| {"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, | |
| {"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, | |
| {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, | |
| {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, | |
| {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, | |
| {"id": "claude-3-7-sonnet-20250219", "name": "claude-3.7-sonnet"}, | |
| {"id": "claude-opus-4-20250514", "name": "claude-4-opus"}, | |
| {"id": "claude-sonnet-4-20250514", "name": "claude-4-sonnet"}, | |
| ] | |
| async def on_startup(self): | |
| print(f"on_startup:{__name__}") | |
| pass | |
| async def on_shutdown(self): | |
| print(f"on_shutdown:{__name__}") | |
| pass | |
| async def on_valves_updated(self): | |
| self.update_headers() | |
| def pipelines(self) -> List[dict]: | |
| return self.get_anthropic_models() | |
| def process_image(self, image_data): | |
| if image_data["url"].startswith("data:image"): | |
| mime_type, base64_data = image_data["url"].split(",", 1) | |
| media_type = mime_type.split(":")[1].split(";")[0] | |
| return { | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": media_type, | |
| "data": base64_data, | |
| }, | |
| } | |
| else: | |
| return { | |
| "type": "image", | |
| "source": {"type": "url", "url": image_data["url"]}, | |
| } | |
| def pipe( | |
| self, user_message: str, model_id: str, messages: List[dict], body: dict | |
| ) -> Union[str, Generator, Iterator]: | |
| try: | |
| # Remove unnecessary keys | |
| for key in ["user", "chat_id", "title"]: | |
| body.pop(key, None) | |
| system_message, messages = pop_system_message(messages) | |
| processed_messages = [] | |
| image_count = 0 | |
| total_image_size = 0 | |
| for message in messages: | |
| processed_content = [] | |
| if isinstance(message.get("content"), list): | |
| for item in message["content"]: | |
| if item["type"] == "text": | |
| processed_content.append( | |
| {"type": "text", "text": item["text"]} | |
| ) | |
| elif item["type"] == "image_url": | |
| if image_count >= 5: | |
| raise ValueError( | |
| "Maximum of 5 images per API call exceeded" | |
| ) | |
| processed_image = self.process_image(item["image_url"]) | |
| processed_content.append(processed_image) | |
| if processed_image["source"]["type"] == "base64": | |
| image_size = ( | |
| len(processed_image["source"]["data"]) * 3 / 4 | |
| ) | |
| else: | |
| image_size = 0 | |
| total_image_size += image_size | |
| if total_image_size > 100 * 1024 * 1024: | |
| raise ValueError( | |
| "Total size of images exceeds 100 MB limit" | |
| ) | |
| image_count += 1 | |
| else: | |
| processed_content = [ | |
| {"type": "text", "text": message.get("content", "")} | |
| ] | |
| processed_messages.append( | |
| {"role": message["role"], "content": processed_content} | |
| ) | |
| # Prepare the payload | |
| payload = { | |
| "model": model_id, | |
| "messages": processed_messages, | |
| "max_tokens": body.get("max_tokens", 4096), | |
| "temperature": body.get("temperature", 0.8), | |
| "top_k": body.get("top_k", 40), | |
| "top_p": body.get("top_p", 0.9), | |
| "stop_sequences": body.get("stop", []), | |
| **({"system": str(system_message)} if system_message else {}), | |
| "stream": body.get("stream", False), | |
| } | |
| supports_thinking = "claude-3-7" in model_id or "-4-" in model_id | |
| reasoning_effort = body.get("reasoning_effort", "none") | |
| budget_tokens = REASONING_EFFORT_BUDGET_TOKEN_MAP.get(reasoning_effort) | |
| # Allow users to input an integer value representing budget tokens | |
| if ( | |
| not budget_tokens | |
| and reasoning_effort is not None | |
| and reasoning_effort not in REASONING_EFFORT_BUDGET_TOKEN_MAP.keys() | |
| ): | |
| try: | |
| budget_tokens = int(reasoning_effort) | |
| except ValueError as e: | |
| print("Failed to convert reasoning effort to int", e) | |
| budget_tokens = None | |
| if supports_thinking and budget_tokens: | |
| # Check if the combined tokens (budget_tokens + max_tokens) exceeds the limit | |
| max_tokens = payload.get("max_tokens", 4096) | |
| combined_tokens = budget_tokens + max_tokens | |
| if combined_tokens > MAX_COMBINED_TOKENS: | |
| error_message = f"Error: Combined tokens (budget_tokens {budget_tokens} + max_tokens {max_tokens} = {combined_tokens}) exceeds the maximum limit of {MAX_COMBINED_TOKENS}" | |
| print(error_message) | |
| return error_message | |
| payload["max_tokens"] = combined_tokens | |
| payload["thinking"] = { | |
| "type": "enabled", | |
| "budget_tokens": budget_tokens, | |
| } | |
| # Thinking requires temperature 1.0 and does not support top_p, top_k | |
| payload["temperature"] = 1.0 | |
| if "top_k" in payload: | |
| del payload["top_k"] | |
| if "top_p" in payload: | |
| del payload["top_p"] | |
| if body.get("stream", False): | |
| return self.stream_response(payload) | |
| else: | |
| return self.get_completion(payload) | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def stream_response(self, payload: dict) -> Generator: | |
| """Used for title and tag generation""" | |
| try: | |
| response = requests.post( | |
| self.url, headers=self.headers, json=payload, stream=True | |
| ) | |
| print(f"{response} for {payload}") | |
| if response.status_code == 200: | |
| client = sseclient.SSEClient(response) | |
| for event in client.events(): | |
| try: | |
| data = json.loads(event.data) | |
| if data["type"] == "content_block_start": | |
| if data["content_block"]["type"] == "thinking": | |
| yield "<think>" | |
| else: | |
| yield data["content_block"]["text"] | |
| elif data["type"] == "content_block_delta": | |
| if data["delta"]["type"] == "thinking_delta": | |
| yield data["delta"]["thinking"] | |
| elif data["delta"]["type"] == "signature_delta": | |
| yield "\n </think> \n\n" | |
| else: | |
| yield data["delta"]["text"] | |
| elif data["type"] == "message_stop": | |
| break | |
| except json.JSONDecodeError: | |
| print(f"Failed to parse JSON: {event.data}") | |
| yield f"Error: Failed to parse JSON response" | |
| except KeyError as e: | |
| print(f"Unexpected data structure: {e} for payload {payload}") | |
| print(f"Full data: {data}") | |
| yield f"Error: Unexpected data structure: {e}" | |
| else: | |
| error_message = f"Error: {response.status_code} - {response.text}" | |
| print(error_message) | |
| yield error_message | |
| except Exception as e: | |
| error_message = f"Error: {str(e)}" | |
| print(error_message) | |
| yield error_message | |
| def get_completion(self, payload: dict) -> str: | |
| try: | |
| response = requests.post(self.url, headers=self.headers, json=payload) | |
| print(response, payload) | |
| if response.status_code == 200: | |
| res = response.json() | |
| for content in res["content"]: | |
| if not content.get("text"): | |
| continue | |
| return content["text"] | |
| return "" | |
| else: | |
| error_message = f"Error: {response.status_code} - {response.text}" | |
| print(error_message) | |
| return error_message | |
| except Exception as e: | |
| error_message = f"Error: {str(e)}" | |
| print(error_message) | |
| return error_message |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment