Created
March 12, 2026 21:18
-
-
Save aspotton/46970970fa6a978abc68e5d7ba45f264 to your computer and use it in GitHub Desktop.
A lightweight Python reverse proxy for vLLM/OpenAI-compatible endpoints that automatically toggles “thinking” per request to reduce latency without sacrificing quality when deeper reasoning is needed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| # MIT License | |
| # | |
| # Copyright (c) 2026 Adam Spotton | |
| # | |
| # Permission is hereby granted, free of charge, to any person obtaining a copy | |
| # of this software and associated documentation files (the "Software"), to deal | |
| # in the Software without restriction, including without limitation the rights | |
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| # copies of the Software, and to permit persons to whom the Software is | |
| # furnished to do so, subject to the following conditions: | |
| # | |
| # The above copyright notice and this permission notice shall be included in all | |
| # copies or substantial portions of the Software. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| # SOFTWARE. | |
| Adaptive thinking proxy for vLLM (OpenAI-compatible). | |
| - Listens on :8889 | |
| - Proxies to upstream http://127.0.0.1:8888 | |
| - For /v1/chat/completions and /v1/responses: | |
| 1) Runs a small classifier request (thinking disabled) to decide THINK? YES/NO | |
| 2) Forwards the original request with thinking enabled/disabled accordingly | |
| - Prints access log line + "THINK? YES/NO" to stdout | |
| Notes: | |
| - Classification adds overhead; keep it tiny and cache decisions. | |
| - Works with streaming (SSE) passthrough for the main request. | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import time | |
| from typing import Any, Dict, Iterable, Optional, Tuple | |
| from aiohttp import web, ClientSession, ClientTimeout | |
| LISTEN_HOST = os.environ.get("PROXY_LISTEN_HOST", "0.0.0.0") | |
| LISTEN_PORT = int(os.environ.get("PROXY_LISTEN_PORT", "8889")) | |
| UPSTREAM_BASE = os.environ.get("VLLM_UPSTREAM", "http://127.0.0.1:8888") | |
| # Which routes should get adaptive thinking | |
| ADAPTIVE_PATH_PREFIXES = ( | |
| "/v1/chat/completions", | |
| "/v1/responses", | |
| ) | |
| # If you want to force adaptive only for specific models, set comma-separated IDs: | |
| # e.g. "Intel/Qwen3.5-122B-A10B-int4-AutoRound" | |
| MODEL_ALLOWLIST = set( | |
| m.strip() for m in os.environ.get("MODEL_ALLOWLIST", "").split(",") if m.strip() | |
| ) | |
| # Classifier config | |
| CLASSIFIER_MODEL = os.environ.get("CLASSIFIER_MODEL", "") # empty => reuse request's model | |
| CLASSIFIER_TIMEOUT_S = float(os.environ.get("CLASSIFIER_TIMEOUT_S", "2.5")) | |
| CLASSIFIER_MAX_INPUT_CHARS = int(os.environ.get("CLASSIFIER_MAX_INPUT_CHARS", "1200")) | |
| CLASSIFIER_MAX_TOKENS = int(os.environ.get("CLASSIFIER_MAX_TOKENS", "1")) | |
| # Decision caching to reduce classifier calls | |
| DECISION_CACHE_TTL_S = int(os.environ.get("DECISION_CACHE_TTL_S", "60")) | |
| DECISION_CACHE_MAX = int(os.environ.get("DECISION_CACHE_MAX", "2048")) | |
| # Default decision if classifier fails: | |
| # "no" => disable thinking (faster) | |
| # "yes" => enable thinking (safer) | |
| DEFAULT_DECISION = os.environ.get("DEFAULT_DECISION", "no").strip().lower() | |
| DEFAULT_DECISION_BOOL = True if DEFAULT_DECISION == "yes" else False | |
| # Hop-by-hop + headers that must not be forwarded | |
| HOP_BY_HOP_HEADERS = { | |
| "connection", | |
| "keep-alive", | |
| "proxy-authenticate", | |
| "proxy-authorization", | |
| "te", | |
| "trailers", | |
| "transfer-encoding", | |
| "upgrade", | |
| # critical when mutating body: | |
| "content-length", | |
| } | |
| def _filtered_headers(headers: Iterable[Tuple[str, str]]) -> Dict[str, str]: | |
| out: Dict[str, str] = {} | |
| for k, v in headers: | |
| lk = k.lower() | |
| if lk in HOP_BY_HOP_HEADERS: | |
| continue | |
| if lk == "host": | |
| continue | |
| out[k] = v | |
| return out | |
| def _http_version_str(request: web.Request) -> str: | |
| ver = request.version | |
| return f"{ver.major}.{ver.minor}" | |
| def _peer_str(request: web.Request) -> str: | |
| peer = request.transport.get_extra_info("peername") if request.transport else None | |
| if peer and isinstance(peer, (tuple, list)) and len(peer) >= 2: | |
| return f"{peer[0]}:{peer[1]}" | |
| return "-:-" | |
| def _is_json(content_type: str) -> bool: | |
| return bool(content_type) and ("application/json" in content_type.lower()) | |
| def _load_json(raw: bytes) -> Optional[Dict[str, Any]]: | |
| try: | |
| obj = json.loads(raw.decode("utf-8")) | |
| return obj if isinstance(obj, dict) else None | |
| except Exception: | |
| return None | |
| def _dump_json(obj: Dict[str, Any]) -> bytes: | |
| return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8") | |
| def _ensure_chat_template_kwargs(payload: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Ensure payload has chat_template_kwargs dict somewhere. | |
| Supports both top-level and the common wrappers extra_body / extraBody. | |
| Returns the dict that should be modified (the container holding chat_template_kwargs). | |
| """ | |
| # Prefer extra_body if present | |
| if isinstance(payload.get("extra_body"), dict): | |
| container = payload["extra_body"] | |
| elif isinstance(payload.get("extraBody"), dict): | |
| container = payload["extraBody"] | |
| else: | |
| container = payload | |
| ctk = container.get("chat_template_kwargs") | |
| if not isinstance(ctk, dict): | |
| ctk = {} | |
| container["chat_template_kwargs"] = ctk | |
| return container | |
| def set_thinking(payload: Dict[str, Any], enabled: bool) -> None: | |
| """ | |
| For Qwen3/3.5 on vLLM: chat_template_kwargs.enable_thinking controls thinking behavior. | |
| """ | |
| container = _ensure_chat_template_kwargs(payload) | |
| container["chat_template_kwargs"]["enable_thinking"] = bool(enabled) | |
| def extract_user_text_for_classification(payload: Dict[str, Any], path: str) -> str: | |
| """ | |
| Extract a compact user text representation from either: | |
| - /v1/chat/completions payload with "messages" | |
| - /v1/responses payload with "input" | |
| """ | |
| text_parts: list[str] = [] | |
| if path.startswith("/v1/chat/completions"): | |
| msgs = payload.get("messages") | |
| if isinstance(msgs, list): | |
| # Take the last user message (or last message) as the primary signal | |
| last_user = None | |
| for m in reversed(msgs): | |
| if isinstance(m, dict) and m.get("role") == "user": | |
| last_user = m | |
| break | |
| target = last_user if last_user else (msgs[-1] if msgs else None) | |
| if isinstance(target, dict): | |
| content = target.get("content") | |
| # OpenAI-style content can be string or array of parts | |
| if isinstance(content, str): | |
| text_parts.append(content) | |
| elif isinstance(content, list): | |
| for part in content: | |
| if isinstance(part, dict) and part.get("type") in (None, "text") and isinstance(part.get("text"), str): | |
| text_parts.append(part["text"]) | |
| elif path.startswith("/v1/responses"): | |
| inp = payload.get("input") | |
| # Responses API can be string or array of content items | |
| if isinstance(inp, str): | |
| text_parts.append(inp) | |
| elif isinstance(inp, list): | |
| # try to pull any text chunks | |
| for item in inp: | |
| if isinstance(item, dict): | |
| content = item.get("content") | |
| if isinstance(content, str): | |
| text_parts.append(content) | |
| elif isinstance(content, list): | |
| for part in content: | |
| if isinstance(part, dict) and part.get("type") in (None, "text") and isinstance(part.get("text"), str): | |
| text_parts.append(part["text"]) | |
| text = "\n".join(tp.strip() for tp in text_parts if tp and tp.strip()) | |
| if len(text) > CLASSIFIER_MAX_INPUT_CHARS: | |
| text = text[:CLASSIFIER_MAX_INPUT_CHARS] | |
| return text | |
| def classifier_prompt(user_text: str) -> str: | |
| return ( | |
| "You are a router that decides whether a user request needs deep reasoning.\n" | |
| "Return exactly one token: YES or NO.\n" | |
| "Answer YES only if the request requires multi-step reasoning, math/logic proofs, non-trivial planning, " | |
| "or careful analysis.\n" | |
| "Answer NO for simple chat, trivial questions, short edits, formatting, or straightforward lookups.\n\n" | |
| f"USER_REQUEST:\n{user_text}\n" | |
| ) | |
| class DecisionCache: | |
| def __init__(self) -> None: | |
| self._data: Dict[str, Tuple[bool, float]] = {} | |
| def get(self, key: str) -> Optional[bool]: | |
| now = time.time() | |
| item = self._data.get(key) | |
| if not item: | |
| return None | |
| value, expires = item | |
| if expires < now: | |
| self._data.pop(key, None) | |
| return None | |
| return value | |
| def put(self, key: str, value: bool) -> None: | |
| now = time.time() | |
| if len(self._data) >= DECISION_CACHE_MAX: | |
| # drop an arbitrary expired entry or the oldest-ish by expiry | |
| # simple heuristic: clear fully if huge | |
| self._data.clear() | |
| self._data[key] = (value, now + DECISION_CACHE_TTL_S) | |
| async def ask_should_think( | |
| session: ClientSession, | |
| upstream_base: str, | |
| model: str, | |
| user_text: str, | |
| ) -> Optional[bool]: | |
| """ | |
| Returns True for YES, False for NO, None on failure. | |
| Uses upstream /v1/chat/completions with thinking disabled, tiny max_tokens. | |
| """ | |
| url = f"{upstream_base}/v1/chat/completions" | |
| payload: Dict[str, Any] = { | |
| "model": model, | |
| "temperature": 0, | |
| "top_p": 1, | |
| "max_tokens": CLASSIFIER_MAX_TOKENS, | |
| "stream": False, | |
| "messages": [ | |
| {"role": "system", "content": "Return YES or NO only."}, | |
| {"role": "user", "content": classifier_prompt(user_text)}, | |
| ], | |
| } | |
| # force thinking OFF for classifier | |
| set_thinking(payload, enabled=False) | |
| timeout = ClientTimeout(total=CLASSIFIER_TIMEOUT_S, connect=CLASSIFIER_TIMEOUT_S, sock_connect=CLASSIFIER_TIMEOUT_S, sock_read=CLASSIFIER_TIMEOUT_S) | |
| try: | |
| async with session.post(url, json=payload, timeout=timeout) as resp: | |
| if resp.status != 200: | |
| return None | |
| data = await resp.json() | |
| except Exception: | |
| return None | |
| # Parse OpenAI-compatible response | |
| try: | |
| choices = data.get("choices") | |
| if not isinstance(choices, list) or not choices: | |
| return None | |
| msg = choices[0].get("message", {}) | |
| content = msg.get("content", "") | |
| if not isinstance(content, str): | |
| return None | |
| ans = content.strip().upper() | |
| if ans.startswith("YES"): | |
| return True | |
| if ans.startswith("NO"): | |
| return False | |
| return None | |
| except Exception: | |
| return None | |
| async def handle(request: web.Request) -> web.StreamResponse: | |
| app = request.app | |
| session: ClientSession = app["session"] | |
| cache: DecisionCache = app["decision_cache"] | |
| upstream_url = f"{UPSTREAM_BASE}{request.rel_url}" | |
| # Read body | |
| orig_body = await request.read() | |
| content_type = request.headers.get("Content-Type", "") | |
| # Default: pass through unchanged | |
| forward_body = orig_body | |
| # For adaptive paths, parse JSON and decide thinking | |
| think_decision: Optional[bool] = None | |
| should_adapt = any(request.path.startswith(p) for p in ADAPTIVE_PATH_PREFIXES) and _is_json(content_type) | |
| payload = _load_json(orig_body) if should_adapt else None | |
| if should_adapt and payload is not None: | |
| req_model = payload.get("model") | |
| if isinstance(req_model, str): | |
| if MODEL_ALLOWLIST and req_model not in MODEL_ALLOWLIST: | |
| should_adapt = False # not allowed -> just proxy | |
| if should_adapt: | |
| user_text = extract_user_text_for_classification(payload, request.path) | |
| cache_key = f"{req_model}|{user_text}" | |
| cached = cache.get(cache_key) if user_text else None | |
| if cached is not None: | |
| think_decision = cached | |
| else: | |
| # Choose model for classifier | |
| cls_model = CLASSIFIER_MODEL.strip() or (req_model if isinstance(req_model, str) else "") | |
| if not cls_model or not user_text: | |
| think_decision = DEFAULT_DECISION_BOOL | |
| else: | |
| think_decision = await ask_should_think(session, UPSTREAM_BASE, cls_model, user_text) | |
| if think_decision is None: | |
| think_decision = DEFAULT_DECISION_BOOL | |
| if user_text: | |
| cache.put(cache_key, think_decision) | |
| # Apply decision to forwarded request | |
| # - YES => enable_thinking True | |
| # - NO => enable_thinking False | |
| set_thinking(payload, enabled=bool(think_decision)) | |
| forward_body = _dump_json(payload) | |
| # Forward request upstream | |
| headers = _filtered_headers(request.headers.items()) | |
| # Long streaming supported: disable total timeout | |
| timeout = ClientTimeout(total=None, connect=30, sock_connect=30, sock_read=None) | |
| async with session.request( | |
| method=request.method, | |
| url=upstream_url, | |
| headers=headers, | |
| data=forward_body if forward_body else None, | |
| allow_redirects=False, | |
| timeout=timeout, | |
| ) as upstream_resp: | |
| # Access log | |
| client = _peer_str(request) | |
| http_ver = _http_version_str(request) | |
| # Print decision for adaptive routes | |
| deep_mode = "Think Mode: YES" | |
| if should_adapt and think_decision is not None: | |
| if think_decision: | |
| deep_mode = "Think Mode: YES" | |
| else: | |
| deep_mode = "Think Mode: NO" | |
| print( | |
| f'{client} - {deep_mode} - "{request.method} {request.path_qs} HTTP/{http_ver}" ' | |
| f"{upstream_resp.status} {upstream_resp.reason}" | |
| ) | |
| # Stream response back | |
| resp_headers = _filtered_headers(upstream_resp.headers.items()) | |
| downstream = web.StreamResponse( | |
| status=upstream_resp.status, | |
| reason=upstream_resp.reason, | |
| headers=resp_headers, | |
| ) | |
| await downstream.prepare(request) | |
| async for chunk in upstream_resp.content.iter_chunked(8192): | |
| await downstream.write(chunk) | |
| await downstream.write_eof() | |
| return downstream | |
| async def on_startup(app: web.Application) -> None: | |
| # One shared session for performance | |
| app["session"] = ClientSession() | |
| app["decision_cache"] = DecisionCache() | |
| async def on_cleanup(app: web.Application) -> None: | |
| session: ClientSession = app["session"] | |
| await session.close() | |
| def main() -> None: | |
| app = web.Application(client_max_size=256 * 1024 * 1024) # 256MB | |
| app.on_startup.append(on_startup) | |
| app.on_cleanup.append(on_cleanup) | |
| app.router.add_route("*", "/{tail:.*}", handle) | |
| web.run_app(app, host=LISTEN_HOST, port=LISTEN_PORT) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment