Created
July 23, 2025 16:32
-
-
Save unixsysdev/27cab50974d6a98317c57b8a82cfab9d to your computer and use it in GitHub Desktop.
Serena MCP Interface via MCPO is a Streamlit web app that bridges AI models to Serena's powerful coding tools by converting the complex MCP (Model Context Protocol) into simple HTTP calls, letting you have natural conversations with an AI assistant that can actually read, write, and analyze code in real projects.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Streamlit Interface for Serena MCP using MCPO Proxy | |
| Uses MCPO to convert Serena MCP server to OpenAPI HTTP endpoints. | |
| Downloads Serena directly from git repository | |
| # streamlit run serena.py | |
| """ | |
| import streamlit as st | |
| import json | |
| import requests | |
| import subprocess | |
| import os | |
| import time | |
| import tempfile | |
| import stat | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| import atexit | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configure Streamlit | |
| st.set_page_config( | |
| page_title="Serena MCP Interface (MCPO)", | |
| page_icon="🔧", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Configuration | |
| LITELLM_URL = "http://localhost:8000/v1/chat/completions" | |
| MODEL_NAME = "kimi-k2-instruct-tools" | |
| MCPO_PORT = 8001 | |
| MCPO_API_KEY = "serena-secret-key" | |
| class MCPOManager: | |
| """Manages MCPO proxy server process for git repo approach only""" | |
| def __init__(self, port: int = MCPO_PORT, api_key: str = MCPO_API_KEY): | |
| self.port = port | |
| self.api_key = api_key | |
| self.process = None | |
| self.base_url = f"http://localhost:{port}" | |
| def start_mcpo(self) -> tuple[bool, str]: | |
| """Start MCPO proxy server with git repo approach only""" | |
| try: | |
| # Check if MCPO is available | |
| mcpo_available = False | |
| error_msg = "" | |
| # Try uvx first | |
| try: | |
| result = subprocess.run(["uvx", "mcpo", "--help"], | |
| capture_output=True, text=True, timeout=10) | |
| if result.returncode == 0: | |
| mcpo_available = True | |
| else: | |
| error_msg = f"uvx mcpo failed: {result.stderr}" | |
| except (subprocess.TimeoutExpired, FileNotFoundError): | |
| error_msg = "uvx not found, trying direct mcpo command" | |
| # Try direct mcpo command as fallback | |
| if not mcpo_available: | |
| try: | |
| result = subprocess.run(["mcpo", "--help"], | |
| capture_output=True, text=True, timeout=10) | |
| if result.returncode == 0: | |
| mcpo_available = True | |
| logger.info("Using direct mcpo command") | |
| else: | |
| error_msg += f"; mcpo direct failed: {result.stderr}" | |
| except (subprocess.TimeoutExpired, FileNotFoundError): | |
| error_msg += "; mcpo not found in PATH" | |
| if not mcpo_available: | |
| return False, f"MCPO not available. {error_msg}. Install with: pip install mcpo" | |
| # Check if port is already in use | |
| try: | |
| response = requests.get(f"{self.base_url}/health", timeout=2) | |
| if response.status_code == 200: | |
| logger.info("MCPO already running, using existing instance") | |
| return True, "MCPO already running" | |
| except requests.exceptions.RequestException: | |
| pass # Port is free, good to start | |
| # Git repo approach only | |
| logger.info("🚀 Starting MCPO with git repo approach (latest Serena)...") | |
| success, message = self._start_git_repo_approach() | |
| if success: | |
| return True, f"✅ Git repo approach successful: {message}" | |
| else: | |
| return False, f"❌ Git repo approach failed: {message}" | |
| except Exception as e: | |
| logger.error(f"Failed to start MCPO: {e}") | |
| return False, f"Failed to start MCPO: {str(e)}" | |
| def _start_git_repo_approach(self) -> tuple[bool, str]: | |
| """Start MCPO with git repo Serena download - with verbose logging""" | |
| try: | |
| # Find Python 3.11 | |
| python311_path = self._find_python311() | |
| # Build command for git repo approach | |
| if python311_path: | |
| mcpo_cmd = [ | |
| "uvx", "--python", python311_path, "mcpo", | |
| "--port", str(self.port), | |
| "--api-key", self.api_key, | |
| "--", | |
| "uvx", "--from", "git+https://github.com/oraios/serena", "serena-mcp-server", "--transport", "stdio" | |
| ] | |
| logger.info(f"🐍 Using Python 3.11: {python311_path}") | |
| else: | |
| mcpo_cmd = [ | |
| "uvx", "mcpo", | |
| "--port", str(self.port), | |
| "--api-key", self.api_key, | |
| "--", | |
| "uvx", "--from", "git+https://github.com/oraios/serena", "serena-mcp-server", "--transport", "stdio" | |
| ] | |
| logger.warning("⚠️ Python 3.11 not found, using default Python") | |
| logger.info(f"🚀 Starting MCPO with command:") | |
| logger.info(f" {' '.join(mcpo_cmd)}") | |
| logger.info(f"📁 Working directory: {os.getcwd()}") | |
| logger.info(f"🌐 Expected endpoint: {self.base_url}") | |
| # Start MCPO process | |
| logger.info("🔄 Launching subprocess...") | |
| self.process = subprocess.Popen( | |
| mcpo_cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, # Line buffered | |
| universal_newlines=True | |
| ) | |
| logger.info(f"📋 Process started with PID: {self.process.pid}") | |
| logger.info("⏳ Waiting for MCPO to download Serena and start up...") | |
| logger.info("💡 This may take 1-2 minutes for git download and build...") | |
| # Wait for startup with verbose logging | |
| if self._wait_for_startup(): | |
| logger.info("🎉 Git repo approach successful!") | |
| return True, "Git repo approach successful" | |
| else: | |
| logger.error("❌ Git repo approach failed during startup") | |
| self._cleanup_failed_process() | |
| return False, "Git repo approach: startup timeout or failure" | |
| except Exception as e: | |
| self._cleanup_failed_process() | |
| logger.error(f"💥 Git repo approach exception: {e}") | |
| return False, f"Git repo approach failed: {str(e)}" | |
| def _find_python311(self) -> Optional[str]: | |
| """Find Python 3.11 path""" | |
| for possible_path in ["/Users/marcel/WorkSpace/UI/mcp_chat_env/bin/python3.11", | |
| "python3.11", | |
| "/usr/local/bin/python3.11", | |
| "/opt/homebrew/bin/python3.11"]: | |
| try: | |
| result = subprocess.run([possible_path, "--version"], | |
| capture_output=True, text=True, timeout=5) | |
| if result.returncode == 0 and "3.11" in result.stdout: | |
| logger.info(f"Found Python 3.11 at: {possible_path}") | |
| return possible_path | |
| except (subprocess.TimeoutExpired, FileNotFoundError): | |
| continue | |
| return None | |
| def _wait_for_startup(self, timeout: int = 120) -> bool: | |
| """Wait for MCPO to start up - check multiple endpoints""" | |
| logger.info(f"⏳ Waiting for MCPO startup (timeout: {timeout}s)...") | |
| for attempt in range(timeout): | |
| try: | |
| time.sleep(1) | |
| # Log every 10 seconds | |
| if attempt % 10 == 0 and attempt > 0: | |
| logger.info(f"⏳ Still waiting... {attempt}s elapsed (timeout in {timeout-attempt}s)") | |
| # Try multiple MCPO endpoints | |
| endpoints_to_check = [ | |
| f"{self.base_url}/health", | |
| f"{self.base_url}/docs", | |
| f"{self.base_url}/openapi.json", | |
| f"{self.base_url}/" | |
| ] | |
| for endpoint in endpoints_to_check: | |
| try: | |
| response = requests.get(endpoint, timeout=3) | |
| if response.status_code in [200, 404]: # 404 is fine, means server is up | |
| logger.info(f"✅ MCPO responding at {endpoint} (status: {response.status_code})") | |
| # Double check with OpenAPI endpoint | |
| try: | |
| api_response = requests.get(f"{self.base_url}/openapi.json", timeout=3) | |
| if api_response.status_code == 200: | |
| logger.info(f"🎯 OpenAPI endpoint confirmed - MCPO fully ready!") | |
| logger.info(f"✅ MCPO started successfully on port {self.port} after {attempt}s") | |
| return True | |
| else: | |
| logger.info(f"⏳ MCPO responding but OpenAPI not ready yet...") | |
| except requests.exceptions.RequestException: | |
| logger.info(f"⏳ MCPO responding but OpenAPI not ready yet...") | |
| break | |
| except requests.exceptions.RequestException: | |
| continue | |
| except Exception as e: | |
| if attempt % 30 == 0 and attempt > 0: | |
| logger.info(f"🔄 Health check exception (expected during startup): {e}") | |
| continue | |
| # Check if process died | |
| if self.process and self.process.poll() is not None: | |
| stdout, stderr = self.process.communicate() | |
| logger.error(f"💀 MCPO process died after {attempt}s") | |
| logger.error(f"📤 STDOUT: {stdout}") | |
| logger.error(f"📤 STDERR: {stderr}") | |
| return False | |
| # Check for Serena dashboard (means Serena is working) | |
| if attempt > 30 and attempt % 20 == 0: # Check after 30s, then every 20s | |
| try: | |
| # Look for Serena dashboard | |
| for port in range(24290, 24300): # Serena uses ports in this range | |
| dashboard_url = f"http://127.0.0.1:{port}/dashboard/index.html" | |
| dash_response = requests.get(dashboard_url, timeout=2) | |
| if dash_response.status_code == 200: | |
| logger.info(f"🎛️ Serena dashboard found at {dashboard_url} - Serena is running!") | |
| break | |
| except requests.exceptions.RequestException: | |
| pass | |
| # Timeout reached | |
| logger.error(f"⏰ Startup timeout after {timeout}s") | |
| if self.process and self.process.poll() is None: | |
| logger.info("📊 Process still running, getting current output...") | |
| # Process is alive, maybe MCPO is working but endpoints are different | |
| logger.info("🔍 Trying alternative endpoint checks...") | |
| # Try checking if any port is open | |
| import socket | |
| try: | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| result = sock.connect_ex(('localhost', self.port)) | |
| sock.close() | |
| if result == 0: | |
| logger.info(f"🔌 Port {self.port} is open - MCPO might be running!") | |
| # Force success since port is open | |
| return True | |
| else: | |
| logger.error(f"🚫 Port {self.port} is not open") | |
| except Exception as e: | |
| logger.error(f"❌ Socket check failed: {e}") | |
| return False | |
| def _cleanup_failed_process(self): | |
| """Clean up failed process""" | |
| if self.process: | |
| try: | |
| self.process.terminate() | |
| self.process.wait(timeout=5) | |
| except: | |
| try: | |
| self.process.kill() | |
| self.process.wait() | |
| except: | |
| pass | |
| self.process = None | |
| def stop_mcpo(self): | |
| """Stop MCPO proxy server""" | |
| try: | |
| if self.process: | |
| logger.info("Stopping MCPO process...") | |
| self.process.terminate() | |
| # Wait for graceful shutdown | |
| try: | |
| self.process.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| logger.warning("MCPO didn't stop gracefully, killing...") | |
| self.process.kill() | |
| self.process.wait() | |
| self.process = None | |
| logger.info("MCPO stopped") | |
| except Exception as e: | |
| logger.error(f"Error stopping MCPO: {e}") | |
| def is_running(self) -> bool: | |
| """Check if MCPO is running - try multiple endpoints""" | |
| endpoints_to_try = [ | |
| f"{self.base_url}/health", | |
| f"{self.base_url}/docs", | |
| f"{self.base_url}/openapi.json", | |
| f"{self.base_url}/" | |
| ] | |
| for endpoint in endpoints_to_try: | |
| try: | |
| response = requests.get(endpoint, timeout=3) | |
| if response.status_code in [200, 404]: # Both indicate server is up | |
| return True | |
| except requests.exceptions.RequestException: | |
| continue | |
| # Final check: is the port open? | |
| try: | |
| import socket | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| result = sock.connect_ex(('localhost', self.port)) | |
| sock.close() | |
| return result == 0 # 0 means connection successful | |
| except: | |
| return False | |
| class SerenaMCPOClient: | |
| """Client for Serena via MCPO proxy""" | |
| def __init__(self, mcpo_manager: MCPOManager): | |
| self.mcpo_manager = mcpo_manager | |
| self.tools = [] | |
| self.connected = False | |
| def connect(self) -> tuple[bool, str, List[Dict[str, Any]]]: | |
| """Connect to Serena via MCPO""" | |
| try: | |
| # Start MCPO if not running | |
| if not self.mcpo_manager.is_running(): | |
| success, message = self.mcpo_manager.start_mcpo() | |
| if not success: | |
| return False, message, [] | |
| # Get OpenAPI schema to discover tools | |
| try: | |
| response = requests.get( | |
| f"{self.mcpo_manager.base_url}/openapi.json", | |
| headers={"Authorization": f"Bearer {self.mcpo_manager.api_key}"}, | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| openapi_schema = response.json() | |
| # Extract tools from OpenAPI schema | |
| self.tools = self._extract_tools_from_openapi(openapi_schema) | |
| self.connected = True | |
| logger.info(f"Connected to Serena via MCPO with {len(self.tools)} tools") | |
| return True, f"Connected with {len(self.tools)} tools", self.tools | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to get OpenAPI schema: {e}") | |
| return False, f"Failed to get OpenAPI schema: {str(e)}", [] | |
| except Exception as e: | |
| logger.error(f"Connection error: {e}") | |
| return False, f"Connection failed: {str(e)}", [] | |
| def _extract_tools_from_openapi(self, schema: Dict) -> List[Dict[str, Any]]: | |
| """Extract tool definitions from OpenAPI schema with $ref resolution""" | |
| tools = [] | |
| logger.info(f"🔍 Extracting tools from OpenAPI schema...") | |
| # Get components for resolving $ref | |
| components = schema.get("components", {}) | |
| schemas_components = components.get("schemas", {}) | |
| # Look for tool endpoints in paths | |
| paths = schema.get("paths", {}) | |
| logger.info(f"📋 Found {len(paths)} paths in OpenAPI schema") | |
| def resolve_ref(ref_schema): | |
| """Resolve $ref references to actual schema""" | |
| if isinstance(ref_schema, dict) and "$ref" in ref_schema: | |
| ref_path = ref_schema["$ref"] | |
| if ref_path.startswith("#/components/schemas/"): | |
| schema_name = ref_path.replace("#/components/schemas/", "") | |
| resolved = schemas_components.get(schema_name, {}) | |
| logger.info(f"🔗 Resolved $ref {schema_name}: {resolved}") | |
| return resolved | |
| return ref_schema | |
| # MCPO exposes Serena tools directly at root level (e.g., /read_file, /get_current_config) | |
| for path, methods in paths.items(): | |
| logger.info(f"🔗 Checking path: {path}") | |
| # Skip non-tool paths (health, docs, etc.) | |
| if path in ["/", "/health", "/docs", "/openapi.json", "/redoc"]: | |
| logger.info(f"⏭️ Skipping system path: {path}") | |
| continue | |
| # Extract tool name from path (remove leading slash) | |
| tool_name = path.lstrip("/") | |
| # Get POST method (tool execution) | |
| post_method = methods.get("post", {}) | |
| if not post_method: | |
| logger.warning(f"⚠️ No POST method for path: {path}") | |
| continue | |
| # Extract tool info | |
| tool_info = { | |
| "name": tool_name, | |
| "description": post_method.get("summary", post_method.get("description", f"Execute {tool_name}")), | |
| "inputSchema": {"type": "object", "properties": {}} | |
| } | |
| # Extract parameters from request body | |
| request_body = post_method.get("requestBody", {}) | |
| content = request_body.get("content", {}) | |
| json_content = content.get("application/json", {}) | |
| schema_def = json_content.get("schema", {}) | |
| if schema_def: | |
| # Resolve $ref if present | |
| resolved_schema = resolve_ref(schema_def) | |
| tool_info["inputSchema"] = resolved_schema | |
| logger.info(f"🔧 Tool {tool_name} resolved schema: {resolved_schema}") | |
| else: | |
| logger.warning(f"⚠️ No input schema found for {tool_name}") | |
| tools.append(tool_info) | |
| logger.info(f"✅ Added tool: {tool_name}") | |
| logger.info(f"🎯 Successfully extracted {len(tools)} tools") | |
| if tools: | |
| tool_names = [t["name"] for t in tools] | |
| logger.info(f"📝 Tool names: {', '.join(tool_names[:10])}{'...' if len(tool_names) > 10 else ''}") | |
| return tools | |
| def _clean_error_message(self, content: str) -> str: | |
| """Clean up verbose error messages from Serena""" | |
| # Handle multiple projects error | |
| if "Multiple projects found with name" in content and "Traceback" in content: | |
| lines = content.split('\n') | |
| for line in lines: | |
| if "Multiple projects found with name" in line and "Locations:" in line: | |
| # Extract the clean error message | |
| parts = line.split("Locations:") | |
| if len(parts) == 2: | |
| locations_str = parts[1].strip() | |
| # Clean up the locations format | |
| locations_str = locations_str.replace("PosixPath('", "").replace("')", "").replace("[", "").replace("]", "") | |
| locations = [loc.strip() for loc in locations_str.split(",")] | |
| project_name = parts[0].split("'")[1] if "'" in parts[0] else "project" | |
| return f"Multiple projects found with name '{project_name}'. Please specify the full path instead.\n\nAvailable locations:\n" + "\n".join(f"• {loc}" for loc in locations) | |
| # Fallback if parsing fails | |
| return "Multiple projects found with that name. Please specify the full path instead." | |
| # Handle other common error patterns | |
| if "Traceback (most recent call last):" in content: | |
| # Extract just the last meaningful error line before the traceback | |
| lines = content.split('\n') | |
| for i, line in enumerate(lines): | |
| if "Traceback (most recent call last):" in line: | |
| # Look for the actual error before the traceback | |
| for j in range(i-1, -1, -1): | |
| if lines[j].strip() and not lines[j].startswith("Error executing tool:"): | |
| return lines[j].strip() | |
| break | |
| # If we find a ValueError at the end, use that | |
| for line in reversed(lines): | |
| if line.strip().startswith("ValueError:"): | |
| return line.replace("ValueError:", "").strip() | |
| elif line.strip().startswith("FileNotFoundError:"): | |
| return line.replace("FileNotFoundError:", "File not found:").strip() | |
| elif line.strip().startswith("PermissionError:"): | |
| return line.replace("PermissionError:", "Permission denied:").strip() | |
| return content | |
| def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> tuple[bool, str]: | |
| """Call tool via MCPO proxy - using direct tool paths with better error handling""" | |
| try: | |
| if not self.connected: | |
| return False, "Not connected to MCPO" | |
| # Make HTTP request to MCPO - tools are at root level | |
| url = f"{self.mcpo_manager.base_url}/{tool_name}" | |
| headers = { | |
| "Authorization": f"Bearer {self.mcpo_manager.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| logger.info(f"Calling tool {tool_name} via MCPO: {url}") | |
| response = requests.post( | |
| url, | |
| headers=headers, | |
| json=arguments, | |
| timeout=120 # 2 minute timeout | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| # Extract content from result and clean up error messages | |
| content = "" | |
| if isinstance(result, dict) and "content" in result: | |
| content = str(result["content"]) | |
| else: | |
| content = str(result) | |
| # Clean up common Serena error patterns | |
| content = self._clean_error_message(content) | |
| return True, content | |
| else: | |
| # Parse error for common issues | |
| error_text = response.text | |
| # Handle common parameter errors | |
| if "Field required" in error_text: | |
| try: | |
| error_json = response.json() | |
| missing_fields = [] | |
| for detail in error_json.get("detail", []): | |
| if detail.get("type") == "missing": | |
| field_name = detail.get("loc", [])[-1] if detail.get("loc") else "unknown" | |
| missing_fields.append(field_name) | |
| if missing_fields: | |
| return False, f"Missing required parameter(s): {', '.join(missing_fields)}" | |
| except: | |
| pass | |
| # Handle Serena-specific errors more gracefully | |
| if "Multiple projects found" in error_text: | |
| # Extract project locations from error | |
| try: | |
| lines = error_text.split('\n') | |
| for line in lines: | |
| if "Locations:" in line and "[" in line: | |
| locations_str = line.split("Locations:")[1].strip() | |
| # Clean up the locations list | |
| locations_str = locations_str.replace("PosixPath('", "").replace("')", "").replace("[", "").replace("]", "") | |
| locations = [loc.strip() for loc in locations_str.split(",")] | |
| return False, f"Multiple projects found. Please specify the full path. Available: {', '.join(locations)}" | |
| except: | |
| pass | |
| return False, "Multiple projects found with that name. Please specify the full path instead." | |
| # Handle other common errors | |
| if "No such file or directory" in error_text: | |
| return False, "File or directory not found." | |
| if "Permission denied" in error_text: | |
| return False, "Permission denied." | |
| # Generic error with just the status code | |
| error_msg = f"Tool error (HTTP {response.status_code})" | |
| logger.error(f"Tool call failed: {error_text}") | |
| return False, error_msg | |
| except requests.exceptions.Timeout: | |
| return False, "Tool call timed out after 2 minutes" | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"HTTP request failed: {e}") | |
| return False, f"Connection error: {str(e)}" | |
| except Exception as e: | |
| logger.error(f"Tool call error: {e}") | |
| return False, f"Unexpected error: {str(e)}" | |
| def disconnect(self): | |
| """Disconnect from MCPO""" | |
| self.connected = False | |
| self.tools = [] | |
| class LLMClient: | |
| """Client for LiteLLM communication""" | |
| def __init__(self, base_url: str, model_name: str): | |
| self.base_url = base_url | |
| self.model_name = model_name | |
| def chat_completion(self, messages: List[Dict[str, str]], tools: Optional[List[Dict]] = None) -> Dict[str, Any]: | |
| """Call LLM with optional tools""" | |
| payload = { | |
| "model": self.model_name, | |
| "messages": messages, | |
| "stream": False, | |
| "max_tokens": 2048, | |
| "temperature": 0.7 | |
| } | |
| if tools: | |
| payload["tools"] = tools | |
| payload["tool_choice"] = "auto" | |
| try: | |
| response = requests.post( | |
| self.base_url, | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Authorization": "Bearer any-key" | |
| }, | |
| json=payload, | |
| timeout=30 | |
| ) | |
| if response.status_code != 200: | |
| error_detail = response.text | |
| raise Exception(f"HTTP {response.status_code}: {error_detail}") | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise Exception(f"LLM request failed: {str(e)}") | |
| except Exception as e: | |
| raise Exception(f"LLM error: {str(e)}") | |
| def load_custom_css(): | |
| """Load custom CSS for professional appearance""" | |
| st.markdown(""" | |
| <style> | |
| .main-header { | |
| font-size: 2.5rem; | |
| font-weight: 600; | |
| color: #1f2937; | |
| margin-bottom: 0.5rem; | |
| } | |
| .sub-header { | |
| font-size: 1.1rem; | |
| color: #6b7280; | |
| margin-bottom: 2rem; | |
| } | |
| .status-card { | |
| padding: 1rem; | |
| border-radius: 0.5rem; | |
| border-left: 4px solid; | |
| margin: 0.5rem 0; | |
| } | |
| .status-connected { | |
| background-color: #d1fae5; | |
| color: #065f46; | |
| border-left-color: #10b981; | |
| } | |
| .status-disconnected { | |
| background-color: #fee2e2; | |
| color: #991b1b; | |
| border-left-color: #ef4444; | |
| } | |
| .status-warning { | |
| background-color: #fef3c7; | |
| color: #92400e; | |
| border-left-color: #f59e0b; | |
| } | |
| .tool-execution { | |
| background-color: #f8fafc; | |
| border: 1px solid #e2e8f0; | |
| border-radius: 0.375rem; | |
| padding: 1rem; | |
| margin: 0.5rem 0; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def main(): | |
| """Main application""" | |
| load_custom_css() | |
| # Header | |
| st.markdown('<div class="main-header">Serena MCP Interface (MCPO)</div>', unsafe_allow_html=True) | |
| st.markdown('<div class="sub-header">Professional coding assistant via MCPO proxy - Git Repo Only</div>', unsafe_allow_html=True) | |
| # Configuration | |
| st.sidebar.header("⚙️ Configuration") | |
| # MCPO configuration only | |
| mcpo_port = st.sidebar.number_input( | |
| "MCPO Port", | |
| value=MCPO_PORT, | |
| min_value=8001, | |
| max_value=9999, | |
| help="Port for MCPO proxy server", | |
| key="mcpo_port_input" | |
| ) | |
| # Initialize clients (simplified - no local paths needed) | |
| if ("mcpo_manager" not in st.session_state or | |
| st.session_state.get("mcpo_port") != mcpo_port): | |
| # Stop old MCPO if exists | |
| if "mcpo_manager" in st.session_state: | |
| try: | |
| st.session_state.mcpo_manager.stop_mcpo() | |
| except: | |
| pass | |
| st.session_state.mcpo_manager = MCPOManager(mcpo_port, MCPO_API_KEY) | |
| st.session_state.serena_client = SerenaMCPOClient(st.session_state.mcpo_manager) | |
| st.session_state.mcpo_port = mcpo_port | |
| llm_client = LLMClient(LITELLM_URL, MODEL_NAME) | |
| # Initialize session state | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [ | |
| { | |
| "role": "assistant", | |
| "content": f"""Welcome to Serena MCP Interface via MCPO! 🚀 | |
| I'm your coding assistant powered by MCPO proxy, which converts Serena's MCP protocol to standard HTTP OpenAPI. | |
| **Git Repo Approach Only:** | |
| - Uses `uvx --from git+https://github.com/oraios/serena serena-mcp-server` | |
| - Always gets the latest Serena version | |
| - Works anywhere without local installation | |
| - Zero setup required! | |
| **Current Configuration:** | |
| - MCPO Port: `{mcpo_port}` | |
| - MCPO API: `http://localhost:{mcpo_port}` | |
| **How it works:** | |
| 1. MCPO pulls Serena from git repo automatically | |
| 2. Converts MCP stdio to HTTP OpenAPI endpoints | |
| 3. We use standard HTTP requests | |
| 4. Clean, portable, always up-to-date! | |
| Click 'Connect' to start MCPO and download Serena from git automatically.""" | |
| } | |
| ] | |
| # Sidebar | |
| with st.sidebar: | |
| st.header("🔧 Connection Management") | |
| # Connection status | |
| mcpo_manager = st.session_state.mcpo_manager | |
| serena_client = st.session_state.serena_client | |
| # Check MCPO status | |
| mcpo_running = mcpo_manager.is_running() | |
| if serena_client.connected and mcpo_running: | |
| st.markdown(f''' | |
| <div class="status-card status-connected"> | |
| ✅ Connected via MCPO<br> | |
| <small>{len(serena_client.tools)} tools available</small><br> | |
| <small>Port: {mcpo_port}</small> | |
| </div> | |
| ''', unsafe_allow_html=True) | |
| elif mcpo_running: | |
| st.markdown(f''' | |
| <div class="status-card status-warning"> | |
| ⚠️ MCPO running, not connected<br> | |
| <small>Port: {mcpo_port}</small> | |
| </div> | |
| ''', unsafe_allow_html=True) | |
| else: | |
| st.markdown(f''' | |
| <div class="status-card status-disconnected"> | |
| ❌ MCPO not running<br> | |
| <small>Ready to start on port {mcpo_port}</small> | |
| </div> | |
| ''', unsafe_allow_html=True) | |
| # Connection controls | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Connect", type="primary", disabled=serena_client.connected, use_container_width=True): | |
| with st.spinner("Starting MCPO proxy and downloading Serena from git (this may take 1-2 minutes)..."): | |
| try: | |
| success, message, tools = serena_client.connect() | |
| if success: | |
| st.success(f"✅ {message}") | |
| st.rerun() | |
| else: | |
| st.error(f"❌ {message}") | |
| st.info("💡 Check the console logs for detailed error information") | |
| except Exception as e: | |
| st.error(f"❌ Connection failed: {str(e)}") | |
| st.info("💡 Check the console logs for detailed error information") | |
| with col2: | |
| if st.button("Disconnect", disabled=not serena_client.connected, use_container_width=True): | |
| with st.spinner("Disconnecting..."): | |
| try: | |
| serena_client.disconnect() | |
| st.info("ℹ️ Disconnected") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"❌ Disconnect failed: {str(e)}") | |
| # MCPO controls | |
| st.write("") | |
| st.subheader("🔗 MCPO Proxy") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Stop MCPO", disabled=not mcpo_running, use_container_width=True): | |
| with st.spinner("Stopping MCPO..."): | |
| mcpo_manager.stop_mcpo() | |
| serena_client.disconnect() | |
| st.info("MCPO stopped") | |
| st.rerun() | |
| with col2: | |
| if st.button("OpenAPI Docs", disabled=not mcpo_running, use_container_width=True): | |
| st.markdown(f"[Open Docs]({mcpo_manager.base_url}/docs)", unsafe_allow_html=True) | |
| # Debug button | |
| if mcpo_running: | |
| if st.button("🔍 Debug OpenAPI", use_container_width=True): | |
| try: | |
| response = requests.get( | |
| f"{mcpo_manager.base_url}/openapi.json", | |
| headers={"Authorization": f"Bearer {mcpo_manager.api_key}"}, | |
| timeout=5 | |
| ) | |
| if response.status_code == 200: | |
| schema = response.json() | |
| st.json(schema.get("paths", {})) | |
| else: | |
| st.error(f"Failed to get OpenAPI schema: {response.status_code}") | |
| except Exception as e: | |
| st.error(f"Debug failed: {e}") | |
| # Tools overview | |
| if serena_client.connected and serena_client.tools: | |
| st.write("") | |
| st.subheader("🔧 Available Tools") | |
| # Show tools by category | |
| file_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['file', 'dir', 'read', 'write', 'create', 'find', 'delete', 'insert'])] | |
| code_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['symbol', 'overview', 'referencing', 'replace'])] | |
| memory_tools = [t for t in serena_client.tools if 'memory' in t['name']] | |
| project_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['project', 'onboarding', 'config', 'activate'])] | |
| shell_tools = [t for t in serena_client.tools if any(x in t['name'] for x in ['shell', 'execute'])] | |
| other_tools = [t for t in serena_client.tools if t not in file_tools + code_tools + memory_tools + project_tools + shell_tools] | |
| with st.expander(f"📁 File Operations ({len(file_tools)})", expanded=False): | |
| for tool in file_tools[:8]: | |
| st.text(f"• {tool['name']}") | |
| if len(file_tools) > 8: | |
| st.text(f"... +{len(file_tools) - 8} more") | |
| with st.expander(f"🔍 Code Analysis ({len(code_tools)})", expanded=False): | |
| for tool in code_tools: | |
| st.text(f"• {tool['name']}") | |
| with st.expander(f"💾 Memory ({len(memory_tools)})", expanded=False): | |
| for tool in memory_tools: | |
| st.text(f"• {tool['name']}") | |
| with st.expander(f"⚙️ Project ({len(project_tools)})", expanded=False): | |
| for tool in project_tools: | |
| st.text(f"• {tool['name']}") | |
| with st.expander(f"🖥️ Shell ({len(shell_tools)})", expanded=False): | |
| for tool in shell_tools: | |
| st.text(f"• {tool['name']}") | |
| if other_tools: | |
| with st.expander(f"🔧 Other Tools ({len(other_tools)})", expanded=False): | |
| for tool in other_tools: | |
| st.text(f"• {tool['name']}") | |
| # Setup instructions | |
| st.write("") | |
| with st.expander("📋 Setup Instructions", expanded=False): | |
| st.markdown(f""" | |
| **1. Install requirements:** | |
| ```bash | |
| pip install mcpo streamlit requests | |
| # That's it! Serena will be downloaded from git automatically | |
| ``` | |
| **2. Verify Python 3.11 (required by MCPO):** | |
| ```bash | |
| python3.11 --version # Should show 3.11.x | |
| ``` | |
| **3. Start LiteLLM:** | |
| ```bash | |
| litellm --config litellm_config.yaml --port 8000 | |
| ``` | |
| **4. Click 'Connect' above** | |
| **Benefits:** | |
| - ✅ Zero setup - no local Serena installation needed | |
| - ✅ Always latest Serena version from git | |
| - ✅ Works anywhere with internet connection | |
| - ✅ Clean, portable solution | |
| **Architecture:** | |
| ``` | |
| Streamlit → HTTP → MCPO → Git Repo → Serena | |
| ``` | |
| """) | |
| # Main interface | |
| st.subheader("💬 Conversation") | |
| # Chat container | |
| chat_container = st.container(height=500) | |
| with chat_container: | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| if message.get("tool_call"): | |
| st.markdown(f''' | |
| <div class="tool-execution"> | |
| <strong>🔧 Tool: {message["tool_call"]["name"]} (via MCPO)</strong> | |
| </div> | |
| ''', unsafe_allow_html=True) | |
| with st.expander("Parameters", expanded=False): | |
| st.json(message["tool_call"]["arguments"]) | |
| if message.get("tool_result"): | |
| with st.expander("Result", expanded=False): # Collapsed by default | |
| # Try to determine if it's code and highlight accordingly | |
| result_text = message["tool_result"] | |
| if any(keyword in result_text.lower() for keyword in ['def ', 'class ', 'import ', 'function', 'const ', 'let ', 'var ', '#!/']): | |
| # Determine language | |
| if any(kw in result_text for kw in ['def ', 'import ', '#!/usr/bin/env python']): | |
| language = "python" | |
| elif any(kw in result_text for kw in ['const ', 'let ', 'var ', 'function']): | |
| language = "javascript" | |
| elif '#!/bin/bash' in result_text or '#!/bin/sh' in result_text: | |
| language = "bash" | |
| else: | |
| language = "python" # default | |
| st.code(result_text, language=language) | |
| else: | |
| st.text(result_text) | |
| else: | |
| st.markdown(message["content"]) | |
| # Chat input | |
| if prompt := st.chat_input("Enter your request..."): | |
| # Validate connection | |
| if not st.session_state.serena_client.connected: | |
| st.error("⚠️ Please connect to MCPO/Serena first.") | |
| st.stop() | |
| # Add user message | |
| st.session_state.messages.append({"role": "user", "content": prompt}) | |
| # Prepare tools for LLM | |
| tools = [] | |
| for tool in st.session_state.serena_client.tools: | |
| tools.append({ | |
| "type": "function", | |
| "function": { | |
| "name": tool["name"], | |
| "description": tool["description"], | |
| "parameters": tool["inputSchema"] | |
| } | |
| }) | |
| # Process with LLM | |
| with st.spinner("Processing request..."): | |
| try: | |
| response = llm_client.chat_completion(st.session_state.messages, tools) | |
| assistant_message = response["choices"][0]["message"] | |
| # Handle tool calls | |
| if assistant_message.get("tool_calls"): | |
| for tool_call in assistant_message["tool_calls"]: | |
| tool_name = tool_call["function"]["name"] | |
| try: | |
| tool_args = json.loads(tool_call["function"]["arguments"]) | |
| except json.JSONDecodeError: | |
| tool_args = {} | |
| # Add tool call to conversation | |
| st.session_state.messages.append({ | |
| "role": "assistant", | |
| "content": f"Executing {tool_name} via MCPO...", | |
| "tool_call": { | |
| "name": tool_name, | |
| "arguments": tool_args | |
| } | |
| }) | |
| # Execute tool via MCPO | |
| with st.spinner(f"Executing {tool_name} via MCPO..."): | |
| success, result = st.session_state.serena_client.call_tool(tool_name, tool_args) | |
| if success: | |
| tool_result = result | |
| else: | |
| tool_result = f"Error: {result}" | |
| # Add result | |
| st.session_state.messages[-1]["tool_result"] = tool_result | |
| # Add to LLM conversation | |
| st.session_state.messages.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call["id"], | |
| "content": tool_result | |
| }) | |
| # Get final response | |
| final_response = llm_client.chat_completion(st.session_state.messages) | |
| final_content = final_response["choices"][0]["message"]["content"] | |
| st.session_state.messages.append({ | |
| "role": "assistant", | |
| "content": final_content | |
| }) | |
| else: | |
| # Direct response | |
| content = assistant_message["content"] | |
| st.session_state.messages.append({ | |
| "role": "assistant", | |
| "content": content | |
| }) | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Request failed: {str(e)}") | |
| # Cleanup on app exit | |
| def cleanup(): | |
| """Clean up MCPO process on exit""" | |
| if "mcpo_manager" in st.session_state: | |
| try: | |
| st.session_state.mcpo_manager.stop_mcpo() | |
| except: | |
| pass | |
| atexit.register(cleanup) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment