Skip to content

Instantly share code, notes, and snippets.

@unixsysdev
Created July 23, 2025 16:32
Show Gist options
  • Select an option

  • Save unixsysdev/27cab50974d6a98317c57b8a82cfab9d to your computer and use it in GitHub Desktop.

Select an option

Save unixsysdev/27cab50974d6a98317c57b8a82cfab9d to your computer and use it in GitHub Desktop.
Serena MCP Interface via MCPO is a Streamlit web app that bridges AI models to Serena's powerful coding tools by converting the complex MCP (Model Context Protocol) into simple HTTP calls, letting you have natural conversations with an AI assistant that can actually read, write, and analyze code in real projects.
#!/usr/bin/env python3
"""
Streamlit Interface for Serena MCP using MCPO Proxy
Uses MCPO to convert Serena MCP server to OpenAPI HTTP endpoints.
Downloads Serena directly from git repository
# streamlit run serena.py
"""
import streamlit as st
import json
import requests
import subprocess
import os
import time
import tempfile
import stat
from typing import List, Dict, Any, Optional
import logging
import atexit
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure Streamlit
st.set_page_config(
page_title="Serena MCP Interface (MCPO)",
page_icon="🔧",
layout="wide",
initial_sidebar_state="expanded"
)
# Configuration
LITELLM_URL = "http://localhost:8000/v1/chat/completions"
MODEL_NAME = "kimi-k2-instruct-tools"
MCPO_PORT = 8001
MCPO_API_KEY = "serena-secret-key"
class MCPOManager:
"""Manages MCPO proxy server process for git repo approach only"""
def __init__(self, port: int = MCPO_PORT, api_key: str = MCPO_API_KEY):
self.port = port
self.api_key = api_key
self.process = None
self.base_url = f"http://localhost:{port}"
def start_mcpo(self) -> tuple[bool, str]:
"""Start MCPO proxy server with git repo approach only"""
try:
# Check if MCPO is available
mcpo_available = False
error_msg = ""
# Try uvx first
try:
result = subprocess.run(["uvx", "mcpo", "--help"],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
mcpo_available = True
else:
error_msg = f"uvx mcpo failed: {result.stderr}"
except (subprocess.TimeoutExpired, FileNotFoundError):
error_msg = "uvx not found, trying direct mcpo command"
# Try direct mcpo command as fallback
if not mcpo_available:
try:
result = subprocess.run(["mcpo", "--help"],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
mcpo_available = True
logger.info("Using direct mcpo command")
else:
error_msg += f"; mcpo direct failed: {result.stderr}"
except (subprocess.TimeoutExpired, FileNotFoundError):
error_msg += "; mcpo not found in PATH"
if not mcpo_available:
return False, f"MCPO not available. {error_msg}. Install with: pip install mcpo"
# Check if port is already in use
try:
response = requests.get(f"{self.base_url}/health", timeout=2)
if response.status_code == 200:
logger.info("MCPO already running, using existing instance")
return True, "MCPO already running"
except requests.exceptions.RequestException:
pass # Port is free, good to start
# Git repo approach only
logger.info("🚀 Starting MCPO with git repo approach (latest Serena)...")
success, message = self._start_git_repo_approach()
if success:
return True, f"✅ Git repo approach successful: {message}"
else:
return False, f"❌ Git repo approach failed: {message}"
except Exception as e:
logger.error(f"Failed to start MCPO: {e}")
return False, f"Failed to start MCPO: {str(e)}"
def _start_git_repo_approach(self) -> tuple[bool, str]:
"""Start MCPO with git repo Serena download - with verbose logging"""
try:
# Find Python 3.11
python311_path = self._find_python311()
# Build command for git repo approach
if python311_path:
mcpo_cmd = [
"uvx", "--python", python311_path, "mcpo",
"--port", str(self.port),
"--api-key", self.api_key,
"--",
"uvx", "--from", "git+https://github.com/oraios/serena", "serena-mcp-server", "--transport", "stdio"
]
logger.info(f"🐍 Using Python 3.11: {python311_path}")
else:
mcpo_cmd = [
"uvx", "mcpo",
"--port", str(self.port),
"--api-key", self.api_key,
"--",
"uvx", "--from", "git+https://github.com/oraios/serena", "serena-mcp-server", "--transport", "stdio"
]
logger.warning("⚠️ Python 3.11 not found, using default Python")
logger.info(f"🚀 Starting MCPO with command:")
logger.info(f" {' '.join(mcpo_cmd)}")
logger.info(f"📁 Working directory: {os.getcwd()}")
logger.info(f"🌐 Expected endpoint: {self.base_url}")
# Start MCPO process
logger.info("🔄 Launching subprocess...")
self.process = subprocess.Popen(
mcpo_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
universal_newlines=True
)
logger.info(f"📋 Process started with PID: {self.process.pid}")
logger.info("⏳ Waiting for MCPO to download Serena and start up...")
logger.info("💡 This may take 1-2 minutes for git download and build...")
# Wait for startup with verbose logging
if self._wait_for_startup():
logger.info("🎉 Git repo approach successful!")
return True, "Git repo approach successful"
else:
logger.error("❌ Git repo approach failed during startup")
self._cleanup_failed_process()
return False, "Git repo approach: startup timeout or failure"
except Exception as e:
self._cleanup_failed_process()
logger.error(f"💥 Git repo approach exception: {e}")
return False, f"Git repo approach failed: {str(e)}"
def _find_python311(self) -> Optional[str]:
"""Find Python 3.11 path"""
for possible_path in ["/Users/marcel/WorkSpace/UI/mcp_chat_env/bin/python3.11",
"python3.11",
"/usr/local/bin/python3.11",
"/opt/homebrew/bin/python3.11"]:
try:
result = subprocess.run([possible_path, "--version"],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and "3.11" in result.stdout:
logger.info(f"Found Python 3.11 at: {possible_path}")
return possible_path
except (subprocess.TimeoutExpired, FileNotFoundError):
continue
return None
def _wait_for_startup(self, timeout: int = 120) -> bool:
"""Wait for MCPO to start up - check multiple endpoints"""
logger.info(f"⏳ Waiting for MCPO startup (timeout: {timeout}s)...")
for attempt in range(timeout):
try:
time.sleep(1)
# Log every 10 seconds
if attempt % 10 == 0 and attempt > 0:
logger.info(f"⏳ Still waiting... {attempt}s elapsed (timeout in {timeout-attempt}s)")
# Try multiple MCPO endpoints
endpoints_to_check = [
f"{self.base_url}/health",
f"{self.base_url}/docs",
f"{self.base_url}/openapi.json",
f"{self.base_url}/"
]
for endpoint in endpoints_to_check:
try:
response = requests.get(endpoint, timeout=3)
if response.status_code in [200, 404]: # 404 is fine, means server is up
logger.info(f"✅ MCPO responding at {endpoint} (status: {response.status_code})")
# Double check with OpenAPI endpoint
try:
api_response = requests.get(f"{self.base_url}/openapi.json", timeout=3)
if api_response.status_code == 200:
logger.info(f"🎯 OpenAPI endpoint confirmed - MCPO fully ready!")
logger.info(f"✅ MCPO started successfully on port {self.port} after {attempt}s")
return True
else:
logger.info(f"⏳ MCPO responding but OpenAPI not ready yet...")
except requests.exceptions.RequestException:
logger.info(f"⏳ MCPO responding but OpenAPI not ready yet...")
break
except requests.exceptions.RequestException:
continue
except Exception as e:
if attempt % 30 == 0 and attempt > 0:
logger.info(f"🔄 Health check exception (expected during startup): {e}")
continue
# Check if process died
if self.process and self.process.poll() is not None:
stdout, stderr = self.process.communicate()
logger.error(f"💀 MCPO process died after {attempt}s")
logger.error(f"📤 STDOUT: {stdout}")
logger.error(f"📤 STDERR: {stderr}")
return False
# Check for Serena dashboard (means Serena is working)
if attempt > 30 and attempt % 20 == 0: # Check after 30s, then every 20s
try:
# Look for Serena dashboard
for port in range(24290, 24300): # Serena uses ports in this range
dashboard_url = f"http://127.0.0.1:{port}/dashboard/index.html"
dash_response = requests.get(dashboard_url, timeout=2)
if dash_response.status_code == 200:
logger.info(f"🎛️ Serena dashboard found at {dashboard_url} - Serena is running!")
break
except requests.exceptions.RequestException:
pass
# Timeout reached
logger.error(f"⏰ Startup timeout after {timeout}s")
if self.process and self.process.poll() is None:
logger.info("📊 Process still running, getting current output...")
# Process is alive, maybe MCPO is working but endpoints are different
logger.info("🔍 Trying alternative endpoint checks...")
# Try checking if any port is open
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', self.port))
sock.close()
if result == 0:
logger.info(f"🔌 Port {self.port} is open - MCPO might be running!")
# Force success since port is open
return True
else:
logger.error(f"🚫 Port {self.port} is not open")
except Exception as e:
logger.error(f"❌ Socket check failed: {e}")
return False
def _cleanup_failed_process(self):
"""Clean up failed process"""
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=5)
except:
try:
self.process.kill()
self.process.wait()
except:
pass
self.process = None
def stop_mcpo(self):
"""Stop MCPO proxy server"""
try:
if self.process:
logger.info("Stopping MCPO process...")
self.process.terminate()
# Wait for graceful shutdown
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("MCPO didn't stop gracefully, killing...")
self.process.kill()
self.process.wait()
self.process = None
logger.info("MCPO stopped")
except Exception as e:
logger.error(f"Error stopping MCPO: {e}")
def is_running(self) -> bool:
"""Check if MCPO is running - try multiple endpoints"""
endpoints_to_try = [
f"{self.base_url}/health",
f"{self.base_url}/docs",
f"{self.base_url}/openapi.json",
f"{self.base_url}/"
]
for endpoint in endpoints_to_try:
try:
response = requests.get(endpoint, timeout=3)
if response.status_code in [200, 404]: # Both indicate server is up
return True
except requests.exceptions.RequestException:
continue
# Final check: is the port open?
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', self.port))
sock.close()
return result == 0 # 0 means connection successful
except:
return False
class SerenaMCPOClient:
"""Client for Serena via MCPO proxy"""
def __init__(self, mcpo_manager: MCPOManager):
self.mcpo_manager = mcpo_manager
self.tools = []
self.connected = False
def connect(self) -> tuple[bool, str, List[Dict[str, Any]]]:
"""Connect to Serena via MCPO"""
try:
# Start MCPO if not running
if not self.mcpo_manager.is_running():
success, message = self.mcpo_manager.start_mcpo()
if not success:
return False, message, []
# Get OpenAPI schema to discover tools
try:
response = requests.get(
f"{self.mcpo_manager.base_url}/openapi.json",
headers={"Authorization": f"Bearer {self.mcpo_manager.api_key}"},
timeout=10
)
response.raise_for_status()
openapi_schema = response.json()
# Extract tools from OpenAPI schema
self.tools = self._extract_tools_from_openapi(openapi_schema)
self.connected = True
logger.info(f"Connected to Serena via MCPO with {len(self.tools)} tools")
return True, f"Connected with {len(self.tools)} tools", self.tools
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get OpenAPI schema: {e}")
return False, f"Failed to get OpenAPI schema: {str(e)}", []
except Exception as e:
logger.error(f"Connection error: {e}")
return False, f"Connection failed: {str(e)}", []
def _extract_tools_from_openapi(self, schema: Dict) -> List[Dict[str, Any]]:
"""Extract tool definitions from OpenAPI schema with $ref resolution"""
tools = []
logger.info(f"🔍 Extracting tools from OpenAPI schema...")
# Get components for resolving $ref
components = schema.get("components", {})
schemas_components = components.get("schemas", {})
# Look for tool endpoints in paths
paths = schema.get("paths", {})
logger.info(f"📋 Found {len(paths)} paths in OpenAPI schema")
def resolve_ref(ref_schema):
"""Resolve $ref references to actual schema"""
if isinstance(ref_schema, dict) and "$ref" in ref_schema:
ref_path = ref_schema["$ref"]
if ref_path.startswith("#/components/schemas/"):
schema_name = ref_path.replace("#/components/schemas/", "")
resolved = schemas_components.get(schema_name, {})
logger.info(f"🔗 Resolved $ref {schema_name}: {resolved}")
return resolved
return ref_schema
# MCPO exposes Serena tools directly at root level (e.g., /read_file, /get_current_config)
for path, methods in paths.items():
logger.info(f"🔗 Checking path: {path}")
# Skip non-tool paths (health, docs, etc.)
if path in ["/", "/health", "/docs", "/openapi.json", "/redoc"]:
logger.info(f"⏭️ Skipping system path: {path}")
continue
# Extract tool name from path (remove leading slash)
tool_name = path.lstrip("/")
# Get POST method (tool execution)
post_method = methods.get("post", {})
if not post_method:
logger.warning(f"⚠️ No POST method for path: {path}")
continue
# Extract tool info
tool_info = {
"name": tool_name,
"description": post_method.get("summary", post_method.get("description", f"Execute {tool_name}")),
"inputSchema": {"type": "object", "properties": {}}
}
# Extract parameters from request body
request_body = post_method.get("requestBody", {})
content = request_body.get("content", {})
json_content = content.get("application/json", {})
schema_def = json_content.get("schema", {})
if schema_def:
# Resolve $ref if present
resolved_schema = resolve_ref(schema_def)
tool_info["inputSchema"] = resolved_schema
logger.info(f"🔧 Tool {tool_name} resolved schema: {resolved_schema}")
else:
logger.warning(f"⚠️ No input schema found for {tool_name}")
tools.append(tool_info)
logger.info(f"✅ Added tool: {tool_name}")
logger.info(f"🎯 Successfully extracted {len(tools)} tools")
if tools:
tool_names = [t["name"] for t in tools]
logger.info(f"📝 Tool names: {', '.join(tool_names[:10])}{'...' if len(tool_names) > 10 else ''}")
return tools
def _clean_error_message(self, content: str) -> str:
"""Clean up verbose error messages from Serena"""
# Handle multiple projects error
if "Multiple projects found with name" in content and "Traceback" in content:
lines = content.split('\n')
for line in lines:
if "Multiple projects found with name" in line and "Locations:" in line:
# Extract the clean error message
parts = line.split("Locations:")
if len(parts) == 2:
locations_str = parts[1].strip()
# Clean up the locations format
locations_str = locations_str.replace("PosixPath('", "").replace("')", "").replace("[", "").replace("]", "")
locations = [loc.strip() for loc in locations_str.split(",")]
project_name = parts[0].split("'")[1] if "'" in parts[0] else "project"
return f"Multiple projects found with name '{project_name}'. Please specify the full path instead.\n\nAvailable locations:\n" + "\n".join(f"• {loc}" for loc in locations)
# Fallback if parsing fails
return "Multiple projects found with that name. Please specify the full path instead."
# Handle other common error patterns
if "Traceback (most recent call last):" in content:
# Extract just the last meaningful error line before the traceback
lines = content.split('\n')
for i, line in enumerate(lines):
if "Traceback (most recent call last):" in line:
# Look for the actual error before the traceback
for j in range(i-1, -1, -1):
if lines[j].strip() and not lines[j].startswith("Error executing tool:"):
return lines[j].strip()
break
# If we find a ValueError at the end, use that
for line in reversed(lines):
if line.strip().startswith("ValueError:"):
return line.replace("ValueError:", "").strip()
elif line.strip().startswith("FileNotFoundError:"):
return line.replace("FileNotFoundError:", "File not found:").strip()
elif line.strip().startswith("PermissionError:"):
return line.replace("PermissionError:", "Permission denied:").strip()
return content
def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> tuple[bool, str]:
"""Call tool via MCPO proxy - using direct tool paths with better error handling"""
try:
if not self.connected:
return False, "Not connected to MCPO"
# Make HTTP request to MCPO - tools are at root level
url = f"{self.mcpo_manager.base_url}/{tool_name}"
headers = {
"Authorization": f"Bearer {self.mcpo_manager.api_key}",
"Content-Type": "application/json"
}
logger.info(f"Calling tool {tool_name} via MCPO: {url}")
response = requests.post(
url,
headers=headers,
json=arguments,
timeout=120 # 2 minute timeout
)
if response.status_code == 200:
result = response.json()
# Extract content from result and clean up error messages
content = ""
if isinstance(result, dict) and "content" in result:
content = str(result["content"])
else:
content = str(result)
# Clean up common Serena error patterns
content = self._clean_error_message(content)
return True, content
else:
# Parse error for common issues
error_text = response.text
# Handle common parameter errors
if "Field required" in error_text:
try:
error_json = response.json()
missing_fields = []
for detail in error_json.get("detail", []):
if detail.get("type") == "missing":
field_name = detail.get("loc", [])[-1] if detail.get("loc") else "unknown"
missing_fields.append(field_name)
if missing_fields:
return False, f"Missing required parameter(s): {', '.join(missing_fields)}"
except:
pass
# Handle Serena-specific errors more gracefully
if "Multiple projects found" in error_text:
# Extract project locations from error
try:
lines = error_text.split('\n')
for line in lines:
if "Locations:" in line and "[" in line:
locations_str = line.split("Locations:")[1].strip()
# Clean up the locations list
locations_str = locations_str.replace("PosixPath('", "").replace("')", "").replace("[", "").replace("]", "")
locations = [loc.strip() for loc in locations_str.split(",")]
return False, f"Multiple projects found. Please specify the full path. Available: {', '.join(locations)}"
except:
pass
return False, "Multiple projects found with that name. Please specify the full path instead."
# Handle other common errors
if "No such file or directory" in error_text:
return False, "File or directory not found."
if "Permission denied" in error_text:
return False, "Permission denied."
# Generic error with just the status code
error_msg = f"Tool error (HTTP {response.status_code})"
logger.error(f"Tool call failed: {error_text}")
return False, error_msg
except requests.exceptions.Timeout:
return False, "Tool call timed out after 2 minutes"
except requests.exceptions.RequestException as e:
logger.error(f"HTTP request failed: {e}")
return False, f"Connection error: {str(e)}"
except Exception as e:
logger.error(f"Tool call error: {e}")
return False, f"Unexpected error: {str(e)}"
def disconnect(self):
"""Disconnect from MCPO"""
self.connected = False
self.tools = []
class LLMClient:
"""Client for LiteLLM communication"""
def __init__(self, base_url: str, model_name: str):
self.base_url = base_url
self.model_name = model_name
def chat_completion(self, messages: List[Dict[str, str]], tools: Optional[List[Dict]] = None) -> Dict[str, Any]:
"""Call LLM with optional tools"""
payload = {
"model": self.model_name,
"messages": messages,
"stream": False,
"max_tokens": 2048,
"temperature": 0.7
}
if tools:
payload["tools"] = tools
payload["tool_choice"] = "auto"
try:
response = requests.post(
self.base_url,
headers={
"Content-Type": "application/json",
"Authorization": "Bearer any-key"
},
json=payload,
timeout=30
)
if response.status_code != 200:
error_detail = response.text
raise Exception(f"HTTP {response.status_code}: {error_detail}")
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"LLM request failed: {str(e)}")
except Exception as e:
raise Exception(f"LLM error: {str(e)}")
def load_custom_css():
"""Load custom CSS for professional appearance"""
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
font-weight: 600;
color: #1f2937;
margin-bottom: 0.5rem;
}
.sub-header {
font-size: 1.1rem;
color: #6b7280;
margin-bottom: 2rem;
}
.status-card {
padding: 1rem;
border-radius: 0.5rem;
border-left: 4px solid;
margin: 0.5rem 0;
}
.status-connected {
background-color: #d1fae5;
color: #065f46;
border-left-color: #10b981;
}
.status-disconnected {
background-color: #fee2e2;
color: #991b1b;
border-left-color: #ef4444;
}
.status-warning {
background-color: #fef3c7;
color: #92400e;
border-left-color: #f59e0b;
}
.tool-execution {
background-color: #f8fafc;
border: 1px solid #e2e8f0;
border-radius: 0.375rem;
padding: 1rem;
margin: 0.5rem 0;
}
</style>
""", unsafe_allow_html=True)
def main():
"""Main application"""
load_custom_css()
# Header
st.markdown('<div class="main-header">Serena MCP Interface (MCPO)</div>', unsafe_allow_html=True)
st.markdown('<div class="sub-header">Professional coding assistant via MCPO proxy - Git Repo Only</div>', unsafe_allow_html=True)
# Configuration
st.sidebar.header("⚙️ Configuration")
# MCPO configuration only
mcpo_port = st.sidebar.number_input(
"MCPO Port",
value=MCPO_PORT,
min_value=8001,
max_value=9999,
help="Port for MCPO proxy server",
key="mcpo_port_input"
)
# Initialize clients (simplified - no local paths needed)
if ("mcpo_manager" not in st.session_state or
st.session_state.get("mcpo_port") != mcpo_port):
# Stop old MCPO if exists
if "mcpo_manager" in st.session_state:
try:
st.session_state.mcpo_manager.stop_mcpo()
except:
pass
st.session_state.mcpo_manager = MCPOManager(mcpo_port, MCPO_API_KEY)
st.session_state.serena_client = SerenaMCPOClient(st.session_state.mcpo_manager)
st.session_state.mcpo_port = mcpo_port
llm_client = LLMClient(LITELLM_URL, MODEL_NAME)
# Initialize session state
if "messages" not in st.session_state:
st.session_state.messages = [
{
"role": "assistant",
"content": f"""Welcome to Serena MCP Interface via MCPO! 🚀
I'm your coding assistant powered by MCPO proxy, which converts Serena's MCP protocol to standard HTTP OpenAPI.
**Git Repo Approach Only:**
- Uses `uvx --from git+https://github.com/oraios/serena serena-mcp-server`
- Always gets the latest Serena version
- Works anywhere without local installation
- Zero setup required!
**Current Configuration:**
- MCPO Port: `{mcpo_port}`
- MCPO API: `http://localhost:{mcpo_port}`
**How it works:**
1. MCPO pulls Serena from git repo automatically
2. Converts MCP stdio to HTTP OpenAPI endpoints
3. We use standard HTTP requests
4. Clean, portable, always up-to-date!
Click 'Connect' to start MCPO and download Serena from git automatically."""
}
]
# Sidebar
with st.sidebar:
st.header("🔧 Connection Management")
# Connection status
mcpo_manager = st.session_state.mcpo_manager
serena_client = st.session_state.serena_client
# Check MCPO status
mcpo_running = mcpo_manager.is_running()
if serena_client.connected and mcpo_running:
st.markdown(f'''
<div class="status-card status-connected">
✅ Connected via MCPO<br>
<small>{len(serena_client.tools)} tools available</small><br>
<small>Port: {mcpo_port}</small>
</div>
''', unsafe_allow_html=True)
elif mcpo_running:
st.markdown(f'''
<div class="status-card status-warning">
⚠️ MCPO running, not connected<br>
<small>Port: {mcpo_port}</small>
</div>
''', unsafe_allow_html=True)
else:
st.markdown(f'''
<div class="status-card status-disconnected">
❌ MCPO not running<br>
<small>Ready to start on port {mcpo_port}</small>
</div>
''', unsafe_allow_html=True)
# Connection controls
col1, col2 = st.columns(2)
with col1:
if st.button("Connect", type="primary", disabled=serena_client.connected, use_container_width=True):
with st.spinner("Starting MCPO proxy and downloading Serena from git (this may take 1-2 minutes)..."):
try:
success, message, tools = serena_client.connect()
if success:
st.success(f"✅ {message}")
st.rerun()
else:
st.error(f"❌ {message}")
st.info("💡 Check the console logs for detailed error information")
except Exception as e:
st.error(f"❌ Connection failed: {str(e)}")
st.info("💡 Check the console logs for detailed error information")
with col2:
if st.button("Disconnect", disabled=not serena_client.connected, use_container_width=True):
with st.spinner("Disconnecting..."):
try:
serena_client.disconnect()
st.info("ℹ️ Disconnected")
st.rerun()
except Exception as e:
st.error(f"❌ Disconnect failed: {str(e)}")
# MCPO controls
st.write("")
st.subheader("🔗 MCPO Proxy")
col1, col2 = st.columns(2)
with col1:
if st.button("Stop MCPO", disabled=not mcpo_running, use_container_width=True):
with st.spinner("Stopping MCPO..."):
mcpo_manager.stop_mcpo()
serena_client.disconnect()
st.info("MCPO stopped")
st.rerun()
with col2:
if st.button("OpenAPI Docs", disabled=not mcpo_running, use_container_width=True):
st.markdown(f"[Open Docs]({mcpo_manager.base_url}/docs)", unsafe_allow_html=True)
# Debug button
if mcpo_running:
if st.button("🔍 Debug OpenAPI", use_container_width=True):
try:
response = requests.get(
f"{mcpo_manager.base_url}/openapi.json",
headers={"Authorization": f"Bearer {mcpo_manager.api_key}"},
timeout=5
)
if response.status_code == 200:
schema = response.json()
st.json(schema.get("paths", {}))
else:
st.error(f"Failed to get OpenAPI schema: {response.status_code}")
except Exception as e:
st.error(f"Debug failed: {e}")
# Tools overview
if serena_client.connected and serena_client.tools:
st.write("")
st.subheader("🔧 Available Tools")
# Show tools by category
file_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['file', 'dir', 'read', 'write', 'create', 'find', 'delete', 'insert'])]
code_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['symbol', 'overview', 'referencing', 'replace'])]
memory_tools = [t for t in serena_client.tools if 'memory' in t['name']]
project_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['project', 'onboarding', 'config', 'activate'])]
shell_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['shell', 'execute'])]
other_tools = [t for t in serena_client.tools if t not in file_tools + code_tools + memory_tools + project_tools + shell_tools]
with st.expander(f"📁 File Operations ({len(file_tools)})", expanded=False):
for tool in file_tools[:8]:
st.text(f"• {tool['name']}")
if len(file_tools) > 8:
st.text(f"... +{len(file_tools) - 8} more")
with st.expander(f"🔍 Code Analysis ({len(code_tools)})", expanded=False):
for tool in code_tools:
st.text(f"• {tool['name']}")
with st.expander(f"💾 Memory ({len(memory_tools)})", expanded=False):
for tool in memory_tools:
st.text(f"• {tool['name']}")
with st.expander(f"⚙️ Project ({len(project_tools)})", expanded=False):
for tool in project_tools:
st.text(f"• {tool['name']}")
with st.expander(f"🖥️ Shell ({len(shell_tools)})", expanded=False):
for tool in shell_tools:
st.text(f"• {tool['name']}")
if other_tools:
with st.expander(f"🔧 Other Tools ({len(other_tools)})", expanded=False):
for tool in other_tools:
st.text(f"• {tool['name']}")
# Setup instructions
st.write("")
with st.expander("📋 Setup Instructions", expanded=False):
st.markdown(f"""
**1. Install requirements:**
```bash
pip install mcpo streamlit requests
# That's it! Serena will be downloaded from git automatically
```
**2. Verify Python 3.11 (required by MCPO):**
```bash
python3.11 --version # Should show 3.11.x
```
**3. Start LiteLLM:**
```bash
litellm --config litellm_config.yaml --port 8000
```
**4. Click 'Connect' above**
**Benefits:**
- ✅ Zero setup - no local Serena installation needed
- ✅ Always latest Serena version from git
- ✅ Works anywhere with internet connection
- ✅ Clean, portable solution
**Architecture:**
```
Streamlit → HTTP → MCPO → Git Repo → Serena
```
""")
# Main interface
st.subheader("💬 Conversation")
# Chat container
chat_container = st.container(height=500)
with chat_container:
for message in st.session_state.messages:
with st.chat_message(message["role"]):
if message.get("tool_call"):
st.markdown(f'''
<div class="tool-execution">
<strong>🔧 Tool: {message["tool_call"]["name"]} (via MCPO)</strong>
</div>
''', unsafe_allow_html=True)
with st.expander("Parameters", expanded=False):
st.json(message["tool_call"]["arguments"])
if message.get("tool_result"):
with st.expander("Result", expanded=False): # Collapsed by default
# Try to determine if it's code and highlight accordingly
result_text = message["tool_result"]
if any(keyword in result_text.lower() for keyword in ['def ', 'class ', 'import ', 'function', 'const ', 'let ', 'var ', '#!/']):
# Determine language
if any(kw in result_text for kw in ['def ', 'import ', '#!/usr/bin/env python']):
language = "python"
elif any(kw in result_text for kw in ['const ', 'let ', 'var ', 'function']):
language = "javascript"
elif '#!/bin/bash' in result_text or '#!/bin/sh' in result_text:
language = "bash"
else:
language = "python" # default
st.code(result_text, language=language)
else:
st.text(result_text)
else:
st.markdown(message["content"])
# Chat input
if prompt := st.chat_input("Enter your request..."):
# Validate connection
if not st.session_state.serena_client.connected:
st.error("⚠️ Please connect to MCPO/Serena first.")
st.stop()
# Add user message
st.session_state.messages.append({"role": "user", "content": prompt})
# Prepare tools for LLM
tools = []
for tool in st.session_state.serena_client.tools:
tools.append({
"type": "function",
"function": {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["inputSchema"]
}
})
# Process with LLM
with st.spinner("Processing request..."):
try:
response = llm_client.chat_completion(st.session_state.messages, tools)
assistant_message = response["choices"][0]["message"]
# Handle tool calls
if assistant_message.get("tool_calls"):
for tool_call in assistant_message["tool_calls"]:
tool_name = tool_call["function"]["name"]
try:
tool_args = json.loads(tool_call["function"]["arguments"])
except json.JSONDecodeError:
tool_args = {}
# Add tool call to conversation
st.session_state.messages.append({
"role": "assistant",
"content": f"Executing {tool_name} via MCPO...",
"tool_call": {
"name": tool_name,
"arguments": tool_args
}
})
# Execute tool via MCPO
with st.spinner(f"Executing {tool_name} via MCPO..."):
success, result = st.session_state.serena_client.call_tool(tool_name, tool_args)
if success:
tool_result = result
else:
tool_result = f"Error: {result}"
# Add result
st.session_state.messages[-1]["tool_result"] = tool_result
# Add to LLM conversation
st.session_state.messages.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"content": tool_result
})
# Get final response
final_response = llm_client.chat_completion(st.session_state.messages)
final_content = final_response["choices"][0]["message"]["content"]
st.session_state.messages.append({
"role": "assistant",
"content": final_content
})
else:
# Direct response
content = assistant_message["content"]
st.session_state.messages.append({
"role": "assistant",
"content": content
})
st.rerun()
except Exception as e:
st.error(f"Request failed: {str(e)}")
# Cleanup on app exit
def cleanup():
"""Clean up MCPO process on exit"""
if "mcpo_manager" in st.session_state:
try:
st.session_state.mcpo_manager.stop_mcpo()
except:
pass
atexit.register(cleanup)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment