Skip to content

Instantly share code, notes, and snippets.

@aspotton
Created March 12, 2026 21:18
Show Gist options
  • Select an option

  • Save aspotton/46970970fa6a978abc68e5d7ba45f264 to your computer and use it in GitHub Desktop.

Select an option

Save aspotton/46970970fa6a978abc68e5d7ba45f264 to your computer and use it in GitHub Desktop.
A lightweight Python reverse proxy for vLLM/OpenAI-compatible endpoints that automatically toggles “thinking” per request to reduce latency without sacrificing quality when deeper reasoning is needed.
#!/usr/bin/env python3
"""
# MIT License
#
# Copyright (c) 2026 Adam Spotton
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
Adaptive thinking proxy for vLLM (OpenAI-compatible).
- Listens on :8889
- Proxies to upstream http://127.0.0.1:8888
- For /v1/chat/completions and /v1/responses:
1) Runs a small classifier request (thinking disabled) to decide THINK? YES/NO
2) Forwards the original request with thinking enabled/disabled accordingly
- Prints access log line + "THINK? YES/NO" to stdout
Notes:
- Classification adds overhead; keep it tiny and cache decisions.
- Works with streaming (SSE) passthrough for the main request.
"""
import asyncio
import json
import os
import time
from typing import Any, Dict, Iterable, Optional, Tuple
from aiohttp import web, ClientSession, ClientTimeout
LISTEN_HOST = os.environ.get("PROXY_LISTEN_HOST", "0.0.0.0")
LISTEN_PORT = int(os.environ.get("PROXY_LISTEN_PORT", "8889"))
UPSTREAM_BASE = os.environ.get("VLLM_UPSTREAM", "http://127.0.0.1:8888")
# Which routes should get adaptive thinking
ADAPTIVE_PATH_PREFIXES = (
"/v1/chat/completions",
"/v1/responses",
)
# If you want to force adaptive only for specific models, set comma-separated IDs:
# e.g. "Intel/Qwen3.5-122B-A10B-int4-AutoRound"
MODEL_ALLOWLIST = set(
m.strip() for m in os.environ.get("MODEL_ALLOWLIST", "").split(",") if m.strip()
)
# Classifier config
CLASSIFIER_MODEL = os.environ.get("CLASSIFIER_MODEL", "") # empty => reuse request's model
CLASSIFIER_TIMEOUT_S = float(os.environ.get("CLASSIFIER_TIMEOUT_S", "2.5"))
CLASSIFIER_MAX_INPUT_CHARS = int(os.environ.get("CLASSIFIER_MAX_INPUT_CHARS", "1200"))
CLASSIFIER_MAX_TOKENS = int(os.environ.get("CLASSIFIER_MAX_TOKENS", "1"))
# Decision caching to reduce classifier calls
DECISION_CACHE_TTL_S = int(os.environ.get("DECISION_CACHE_TTL_S", "60"))
DECISION_CACHE_MAX = int(os.environ.get("DECISION_CACHE_MAX", "2048"))
# Default decision if classifier fails:
# "no" => disable thinking (faster)
# "yes" => enable thinking (safer)
DEFAULT_DECISION = os.environ.get("DEFAULT_DECISION", "no").strip().lower()
DEFAULT_DECISION_BOOL = True if DEFAULT_DECISION == "yes" else False
# Hop-by-hop + headers that must not be forwarded
HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
# critical when mutating body:
"content-length",
}
def _filtered_headers(headers: Iterable[Tuple[str, str]]) -> Dict[str, str]:
out: Dict[str, str] = {}
for k, v in headers:
lk = k.lower()
if lk in HOP_BY_HOP_HEADERS:
continue
if lk == "host":
continue
out[k] = v
return out
def _http_version_str(request: web.Request) -> str:
ver = request.version
return f"{ver.major}.{ver.minor}"
def _peer_str(request: web.Request) -> str:
peer = request.transport.get_extra_info("peername") if request.transport else None
if peer and isinstance(peer, (tuple, list)) and len(peer) >= 2:
return f"{peer[0]}:{peer[1]}"
return "-:-"
def _is_json(content_type: str) -> bool:
return bool(content_type) and ("application/json" in content_type.lower())
def _load_json(raw: bytes) -> Optional[Dict[str, Any]]:
try:
obj = json.loads(raw.decode("utf-8"))
return obj if isinstance(obj, dict) else None
except Exception:
return None
def _dump_json(obj: Dict[str, Any]) -> bytes:
return json.dumps(obj, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def _ensure_chat_template_kwargs(payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Ensure payload has chat_template_kwargs dict somewhere.
Supports both top-level and the common wrappers extra_body / extraBody.
Returns the dict that should be modified (the container holding chat_template_kwargs).
"""
# Prefer extra_body if present
if isinstance(payload.get("extra_body"), dict):
container = payload["extra_body"]
elif isinstance(payload.get("extraBody"), dict):
container = payload["extraBody"]
else:
container = payload
ctk = container.get("chat_template_kwargs")
if not isinstance(ctk, dict):
ctk = {}
container["chat_template_kwargs"] = ctk
return container
def set_thinking(payload: Dict[str, Any], enabled: bool) -> None:
"""
For Qwen3/3.5 on vLLM: chat_template_kwargs.enable_thinking controls thinking behavior.
"""
container = _ensure_chat_template_kwargs(payload)
container["chat_template_kwargs"]["enable_thinking"] = bool(enabled)
def extract_user_text_for_classification(payload: Dict[str, Any], path: str) -> str:
"""
Extract a compact user text representation from either:
- /v1/chat/completions payload with "messages"
- /v1/responses payload with "input"
"""
text_parts: list[str] = []
if path.startswith("/v1/chat/completions"):
msgs = payload.get("messages")
if isinstance(msgs, list):
# Take the last user message (or last message) as the primary signal
last_user = None
for m in reversed(msgs):
if isinstance(m, dict) and m.get("role") == "user":
last_user = m
break
target = last_user if last_user else (msgs[-1] if msgs else None)
if isinstance(target, dict):
content = target.get("content")
# OpenAI-style content can be string or array of parts
if isinstance(content, str):
text_parts.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") in (None, "text") and isinstance(part.get("text"), str):
text_parts.append(part["text"])
elif path.startswith("/v1/responses"):
inp = payload.get("input")
# Responses API can be string or array of content items
if isinstance(inp, str):
text_parts.append(inp)
elif isinstance(inp, list):
# try to pull any text chunks
for item in inp:
if isinstance(item, dict):
content = item.get("content")
if isinstance(content, str):
text_parts.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") in (None, "text") and isinstance(part.get("text"), str):
text_parts.append(part["text"])
text = "\n".join(tp.strip() for tp in text_parts if tp and tp.strip())
if len(text) > CLASSIFIER_MAX_INPUT_CHARS:
text = text[:CLASSIFIER_MAX_INPUT_CHARS]
return text
def classifier_prompt(user_text: str) -> str:
return (
"You are a router that decides whether a user request needs deep reasoning.\n"
"Return exactly one token: YES or NO.\n"
"Answer YES only if the request requires multi-step reasoning, math/logic proofs, non-trivial planning, "
"or careful analysis.\n"
"Answer NO for simple chat, trivial questions, short edits, formatting, or straightforward lookups.\n\n"
f"USER_REQUEST:\n{user_text}\n"
)
class DecisionCache:
def __init__(self) -> None:
self._data: Dict[str, Tuple[bool, float]] = {}
def get(self, key: str) -> Optional[bool]:
now = time.time()
item = self._data.get(key)
if not item:
return None
value, expires = item
if expires < now:
self._data.pop(key, None)
return None
return value
def put(self, key: str, value: bool) -> None:
now = time.time()
if len(self._data) >= DECISION_CACHE_MAX:
# drop an arbitrary expired entry or the oldest-ish by expiry
# simple heuristic: clear fully if huge
self._data.clear()
self._data[key] = (value, now + DECISION_CACHE_TTL_S)
async def ask_should_think(
session: ClientSession,
upstream_base: str,
model: str,
user_text: str,
) -> Optional[bool]:
"""
Returns True for YES, False for NO, None on failure.
Uses upstream /v1/chat/completions with thinking disabled, tiny max_tokens.
"""
url = f"{upstream_base}/v1/chat/completions"
payload: Dict[str, Any] = {
"model": model,
"temperature": 0,
"top_p": 1,
"max_tokens": CLASSIFIER_MAX_TOKENS,
"stream": False,
"messages": [
{"role": "system", "content": "Return YES or NO only."},
{"role": "user", "content": classifier_prompt(user_text)},
],
}
# force thinking OFF for classifier
set_thinking(payload, enabled=False)
timeout = ClientTimeout(total=CLASSIFIER_TIMEOUT_S, connect=CLASSIFIER_TIMEOUT_S, sock_connect=CLASSIFIER_TIMEOUT_S, sock_read=CLASSIFIER_TIMEOUT_S)
try:
async with session.post(url, json=payload, timeout=timeout) as resp:
if resp.status != 200:
return None
data = await resp.json()
except Exception:
return None
# Parse OpenAI-compatible response
try:
choices = data.get("choices")
if not isinstance(choices, list) or not choices:
return None
msg = choices[0].get("message", {})
content = msg.get("content", "")
if not isinstance(content, str):
return None
ans = content.strip().upper()
if ans.startswith("YES"):
return True
if ans.startswith("NO"):
return False
return None
except Exception:
return None
async def handle(request: web.Request) -> web.StreamResponse:
app = request.app
session: ClientSession = app["session"]
cache: DecisionCache = app["decision_cache"]
upstream_url = f"{UPSTREAM_BASE}{request.rel_url}"
# Read body
orig_body = await request.read()
content_type = request.headers.get("Content-Type", "")
# Default: pass through unchanged
forward_body = orig_body
# For adaptive paths, parse JSON and decide thinking
think_decision: Optional[bool] = None
should_adapt = any(request.path.startswith(p) for p in ADAPTIVE_PATH_PREFIXES) and _is_json(content_type)
payload = _load_json(orig_body) if should_adapt else None
if should_adapt and payload is not None:
req_model = payload.get("model")
if isinstance(req_model, str):
if MODEL_ALLOWLIST and req_model not in MODEL_ALLOWLIST:
should_adapt = False # not allowed -> just proxy
if should_adapt:
user_text = extract_user_text_for_classification(payload, request.path)
cache_key = f"{req_model}|{user_text}"
cached = cache.get(cache_key) if user_text else None
if cached is not None:
think_decision = cached
else:
# Choose model for classifier
cls_model = CLASSIFIER_MODEL.strip() or (req_model if isinstance(req_model, str) else "")
if not cls_model or not user_text:
think_decision = DEFAULT_DECISION_BOOL
else:
think_decision = await ask_should_think(session, UPSTREAM_BASE, cls_model, user_text)
if think_decision is None:
think_decision = DEFAULT_DECISION_BOOL
if user_text:
cache.put(cache_key, think_decision)
# Apply decision to forwarded request
# - YES => enable_thinking True
# - NO => enable_thinking False
set_thinking(payload, enabled=bool(think_decision))
forward_body = _dump_json(payload)
# Forward request upstream
headers = _filtered_headers(request.headers.items())
# Long streaming supported: disable total timeout
timeout = ClientTimeout(total=None, connect=30, sock_connect=30, sock_read=None)
async with session.request(
method=request.method,
url=upstream_url,
headers=headers,
data=forward_body if forward_body else None,
allow_redirects=False,
timeout=timeout,
) as upstream_resp:
# Access log
client = _peer_str(request)
http_ver = _http_version_str(request)
# Print decision for adaptive routes
deep_mode = "Think Mode: YES"
if should_adapt and think_decision is not None:
if think_decision:
deep_mode = "Think Mode: YES"
else:
deep_mode = "Think Mode: NO"
print(
f'{client} - {deep_mode} - "{request.method} {request.path_qs} HTTP/{http_ver}" '
f"{upstream_resp.status} {upstream_resp.reason}"
)
# Stream response back
resp_headers = _filtered_headers(upstream_resp.headers.items())
downstream = web.StreamResponse(
status=upstream_resp.status,
reason=upstream_resp.reason,
headers=resp_headers,
)
await downstream.prepare(request)
async for chunk in upstream_resp.content.iter_chunked(8192):
await downstream.write(chunk)
await downstream.write_eof()
return downstream
async def on_startup(app: web.Application) -> None:
# One shared session for performance
app["session"] = ClientSession()
app["decision_cache"] = DecisionCache()
async def on_cleanup(app: web.Application) -> None:
session: ClientSession = app["session"]
await session.close()
def main() -> None:
app = web.Application(client_max_size=256 * 1024 * 1024) # 256MB
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
app.router.add_route("*", "/{tail:.*}", handle)
web.run_app(app, host=LISTEN_HOST, port=LISTEN_PORT)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment