| title | component | tags | level | scope | phase | lines | related | last_updated | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
MCP Server Lifecycle Management |
mcp |
|
implementation |
v1.0.0 |
implementation |
3900 |
|
2025-01-15 |
Last Updated: January 2025
Status: Implementation Guide
Prerequisite Reading: MCP Server Schema
- Overview
- MCP Protocol Fundamentals
- Transport Types
- MCPServerManager Implementation
- Stdio Transport Implementation
- HTTP Transport Implementation
- SSE Transport Implementation
- Server Discovery and Registration
- Health Check Implementation
- Connection Pool Management
- Automatic Reconnection
- Graceful Shutdown
- Resource Cleanup
- Tool Loading Integration
- Configuration Examples
- Complete Working Example
- Testing
- Performance Considerations
- Troubleshooting
The MCP Server Lifecycle Management system handles the complete lifecycle of Model Context Protocol (MCP) servers in the LIVE+AI framework. This includes initialization, health monitoring, automatic reconnection, and graceful shutdown across multiple transport types.
Manage MCP servers that provide tools and resources to AI agents, ensuring:
- Reliable Connectivity: Maintain stable connections across all transport types
- Health Monitoring: Continuous health checks with automatic recovery
- Resource Management: Proper cleanup and connection pooling
- Tool Discovery: Automatic loading and registration of server tools
- Graceful Degradation: Fallback mechanisms when servers fail
MCP servers integrate with the Agent Framework to provide external tools:
ChatAgent → AgentFrameworkAdapter → MCPServerManager → MCP Server
↓
Tool Discovery
Health Checks
Connection Pooling
MCPServerManager: Central lifecycle coordinator- Transport Handlers: Protocol-specific communication (stdio, HTTP, SSE)
ServerHealthChecker: Health monitoring across transportsMCPConnectionPool: Connection pooling for HTTP/SSEMCPToolLoader: Integration with Agent Framework
The Model Context Protocol (MCP) is an open protocol that standardizes how AI applications communicate with external tools and data sources. Key concepts:
1. Tools: Functions that agents can invoke with structured inputs 2. Resources: Data sources that agents can read 3. Prompts: Reusable prompt templates 4. JSON-RPC 2.0: Message format for client-server communication
Client Server
| |
|----(initialize)-------------->|
|<---(capabilities)-------------|
| |
|----(tools/list)-------------->|
|<---(tool definitions)---------|
| |
|----(tools/call)-------------->|
|<---(tool result)--------------|
Based on MCP Python SDK research, LIVE+AI supports three transport mechanisms:
| Transport | Protocol | Use Case | Bidirectional |
|---|---|---|---|
| stdio | Standard I/O | Local processes, development | ✅ Yes |
| HTTP | Request-Response | Remote servers, stateless | ❌ No |
| SSE | Server-Sent Events | Streaming, long-running |
Request:
{
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 1
}Response:
{
"jsonrpc": "2.0",
"result": {
"tools": [
{
"name": "get-weather",
"description": "Get current weather",
"inputSchema": {
"type": "object",
"properties": {
"city": {"type": "string"}
}
}
}
]
},
"id": 1
}Error:
{
"jsonrpc": "2.0",
"error": {
"code": -32601,
"message": "Method not found"
},
"id": 1
}Mechanism: Local subprocess communicating via stdin/stdout streams.
Characteristics:
- Fast startup (~100ms)
- Direct process control
- No network overhead
- Suitable for development
Process Lifecycle:
# 1. Spawn subprocess
process = subprocess.Popen(
["npx", "-y", "@upstash/context7-mcp"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 2. Write JSON-RPC to stdin
request = {"jsonrpc": "2.0", "method": "tools/list", "id": 1}
process.stdin.write(json.dumps(request).encode() + b'\n')
process.stdin.flush()
# 3. Read JSON-RPC from stdout
response = process.stdout.readline()
result = json.loads(response)
# 4. Cleanup on shutdown
process.terminate()
process.wait(timeout=5)Mechanism: RESTful HTTP requests to remote MCP server.
Characteristics:
- Stateless by default
- Firewall-friendly (port 443)
- Supports connection pooling
- Production-ready
Request Flow:
# POST to /mcp/tools/list
response = await http_client.post(
f"{base_url}/tools/list",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={"Authorization": f"Bearer {token}"}
)
result = response.json()Mechanism: Server-Sent Events for streaming responses.
Characteristics:
- One-way: Server → Client
- Long-lived HTTP connection
- Automatic reconnection
- Progress updates for long tasks
Connection Pattern:
# Establish SSE connection
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}/sse") as response:
# Listen for server-sent events
async for line in response.content:
if line.startswith(b'data: '):
event_data = json.loads(line[6:])
# Process eventThe MCPServerManager class orchestrates the complete lifecycle of all MCP servers in the LIVE+AI system.
"""Manage MCP server lifecycle and coordination.
This manager handles initialization, health monitoring, reconnection,
and shutdown for all registered MCP servers across multiple transports.
Thread Safety:
All public methods are async and thread-safe via asyncio locks.
Resource Management:
- Stdio servers: Process handles tracked and terminated on shutdown
- HTTP/SSE servers: Connection pools managed with max limits
- All resources cleaned up in graceful shutdown sequence
"""
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Set
from enum import Enum
import aiohttp
from azure.ai.projects.aio import AIProjectClient
class TransportType(Enum):
"""MCP transport protocol types."""
STDIO = "stdio"
HTTP = "http"
SSE = "sse"
class ServerStatus(Enum):
"""MCP server lifecycle status."""
PENDING = "pending" # Initializing
RUNNING = "running" # Active but not verified
HEALTHY = "healthy" # Verified healthy
UNHEALTHY = "unhealthy" # Health check failed
RECONNECTING = "reconnecting" # Attempting reconnect
STOPPED = "stopped" # Intentionally stopped
FAILED = "failed" # Unrecoverable error
@dataclass
class MCPServerConfig:
"""Configuration for a single MCP server.
Attributes:
name: Unique server identifier
transport: Transport type (stdio, http, sse)
command: Command for stdio servers
args: Arguments for stdio command
env: Environment variables for stdio
base_url: URL for HTTP/SSE servers
headers: Custom headers for HTTP/SSE
timeout: Request timeout in seconds
auth_token: Bearer token for authentication
"""
name: str
transport: TransportType
# Stdio-specific
command: Optional[str] = None
args: Optional[List[str]] = None
env: Optional[Dict[str, str]] = None
# HTTP/SSE-specific
base_url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
timeout: int = 30
auth_token: Optional[str] = None
@dataclass
class MCPServerState:
"""Runtime state for an MCP server.
Tracks connection status, health metrics, and resources.
"""
config: MCPServerConfig
status: ServerStatus = ServerStatus.PENDING
process: Optional[asyncio.subprocess.Process] = None
http_session: Optional[aiohttp.ClientSession] = None
last_health_check: Optional[datetime] = None
consecutive_failures: int = 0
tools_loaded: int = 0
error_message: Optional[str] = None
class MCPServerManager:
"""Central manager for MCP server lifecycle.
Responsibilities:
- Initialize servers based on configuration
- Maintain connection pools for HTTP/SSE
- Monitor server health continuously
- Handle automatic reconnection with backoff
- Gracefully shutdown all servers
- Load tools into Agent Framework
Example:
>>> manager = MCPServerManager(db_pool, config)
>>> await manager.initialize_servers()
>>> health = await manager.health_check_all()
>>> await manager.shutdown_all()
"""
def __init__(
self,
ai_client: AIProjectClient,
health_check_interval: int = 60,
max_reconnect_attempts: int = 3
):
"""Initialize the MCP server manager.
Args:
ai_client: Azure AI Project client for agent integration
health_check_interval: Seconds between health checks
max_reconnect_attempts: Maximum reconnection retries
"""
self._ai_client = ai_client
self._health_check_interval = health_check_interval
self._max_reconnect_attempts = max_reconnect_attempts
# Server registry
self._servers: Dict[str, MCPServerState] = {}
self._lock = asyncio.Lock()
# Connection pooling
self._http_sessions: Dict[str, aiohttp.ClientSession] = {}
# Background tasks
self._health_check_task: Optional[asyncio.Task] = None
self._shutdown_event = asyncio.Event()
async def initialize_servers(
self,
configs: List[MCPServerConfig]
) -> Dict[str, bool]:
"""Initialize all configured MCP servers.
Starts servers based on their transport type and performs
initial health verification.
Args:
configs: List of server configurations
Returns:
Dict mapping server name to initialization success
Raises:
ValueError: If server name is duplicate
RuntimeError: If critical initialization fails
"""
async with self._lock:
results = {}
for config in configs:
if config.name in self._servers:
raise ValueError(
f"Duplicate server name: {config.name}"
)
try:
# Create server state
state = MCPServerState(config=config)
self._servers[config.name] = state
# Initialize based on transport
if config.transport == TransportType.STDIO:
success = await self._start_stdio_server(state)
elif config.transport == TransportType.HTTP:
success = await self._connect_http_server(state)
elif config.transport == TransportType.SSE:
success = await self._connect_sse_server(state)
else:
raise ValueError(
f"Unsupported transport: {config.transport}"
)
results[config.name] = success
except Exception as e:
state.status = ServerStatus.FAILED
state.error_message = str(e)
results[config.name] = False
# Start health monitoring
if not self._health_check_task:
self._health_check_task = asyncio.create_task(
self._health_check_loop()
)
return resultsAsync-First Architecture: All I/O operations use async/await to prevent blocking. This is critical for:
- Stdio: Non-blocking process communication
- HTTP/SSE: Concurrent request handling
- Health checks: Parallel server monitoring
Resource Tracking:
Each MCPServerState tracks transport-specific resources:
- Stdio:
subprocess.Processhandle - HTTP/SSE:
aiohttp.ClientSessionfor connection pooling
Status State Machine:
PENDING → RUNNING → HEALTHY
↓ ↓
UNHEALTHY → RECONNECTING → HEALTHY
↓
FAILED
Error Resilience:
- Individual server failures don't crash the manager
- Failed servers marked
FAILEDbut kept in registry - Reconnection attempts configurable per deployment
This concludes Pass 1. The document now has the foundation, overview, protocol fundamentals, transport descriptions, and the core MCPServerManager class definition.
Next passes will add:
- Pass 2: Transport-specific implementations (stdio, HTTP, SSE)
- Pass 3: Health checks and reconnection logic
- Pass 4: Shutdown, cleanup, and tool loading
- Pass 5: Configuration, testing, and troubleshooting
The stdio transport manages MCP servers as local subprocesses, communicating via standard input/output streams.
"""Stdio-based MCP server implementation.
Manages a subprocess running an MCP server, handling JSON-RPC 2.0
communication over stdin/stdout streams.
Invariants:
- Process must remain alive for the duration of the session
- Each request gets a unique incrementing ID
- Responses matched to requests via ID field
Resource Management:
- Process terminated on cleanup
- Streams closed gracefully
- No orphaned processes allowed
"""
import asyncio
import json
import subprocess
from typing import Any, Dict, List, Optional
class StdioMCPServer:
"""MCP server using stdio (subprocess) transport.
Spawns a subprocess and communicates via JSON-RPC 2.0 over
stdin/stdout. Handles process lifecycle and stream management.
Example:
>>> server = StdioMCPServer()
>>> await server.start(["npx", "-y", "@upstash/context7-mcp"], {})
>>> result = await server.send_request("tools/list", {})
>>> await server.shutdown()
"""
def __init__(self):
"""Initialize stdio server (process not started yet)."""
self._process: Optional[asyncio.subprocess.Process] = None
self._request_id = 0
self._lock = asyncio.Lock()
self._response_queue: asyncio.Queue = asyncio.Queue()
self._reader_task: Optional[asyncio.Task] = None
async def start(
self,
command: List[str],
env: Optional[Dict[str, str]] = None
) -> None:
"""Start subprocess and establish stdio communication.
Args:
command: Command and arguments to execute
env: Environment variables for subprocess
Raises:
RuntimeError: If process fails to start
FileNotFoundError: If command not found
"""
full_env = {**os.environ, **(env or {})}
try:
self._process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=full_env
)
except FileNotFoundError as e:
raise FileNotFoundError(
f"Command not found: {command[0]}"
) from e
# Verify process started
await asyncio.sleep(0.1)
if self._process.returncode is not None:
stderr = await self._process.stderr.read()
raise RuntimeError(
f"Process exited immediately: {stderr.decode()}"
)
# Start background reader
self._reader_task = asyncio.create_task(self._read_responses())
async def _read_responses(self) -> None:
"""Background task to read JSON-RPC responses from stdout.
Reads line-by-line, parses JSON, and queues responses.
Terminates when process exits or stdout closes.
"""
while True:
try:
line = await self._process.stdout.readline()
if not line:
break # EOF
# Parse JSON-RPC response
response = json.loads(line.decode().strip())
await self._response_queue.put(response)
except json.JSONDecodeError as e:
# Log malformed response but continue
print(f"Invalid JSON from server: {e}")
except Exception as e:
print(f"Error reading response: {e}")
break
async def send_request(
self,
method: str,
params: Dict[str, Any],
timeout: float = 30.0
) -> Dict[str, Any]:
"""Send JSON-RPC request over stdin and await response.
Args:
method: JSON-RPC method name
params: Method parameters
timeout: Response timeout in seconds
Returns:
JSON-RPC response result
Raises:
TimeoutError: If response not received within timeout
RuntimeError: If process died or JSON-RPC error returned
"""
async with self._lock:
# Verify process is alive
if self._process.returncode is not None:
raise RuntimeError("Process has terminated")
# Create request
self._request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": self._request_id
}
# Send request
request_json = json.dumps(request) + "\n"
self._process.stdin.write(request_json.encode())
await self._process.stdin.drain()
# Wait for response
request_id = self._request_id
# Wait for matching response (outside lock)
deadline = asyncio.get_event_loop().time() + timeout
while True:
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
raise TimeoutError(
f"No response for {method} after {timeout}s"
)
try:
response = await asyncio.wait_for(
self._response_queue.get(),
timeout=remaining
)
except asyncio.TimeoutError:
raise TimeoutError(
f"No response for {method} after {timeout}s"
)
# Check if this is our response
if response.get("id") == request_id:
if "error" in response:
raise RuntimeError(
f"JSON-RPC error: {response['error']}"
)
return response.get("result", {})
# Wrong ID, put back and continue
await self._response_queue.put(response)
await asyncio.sleep(0.01)
async def shutdown(self, timeout: float = 5.0) -> None:
"""Gracefully shutdown subprocess.
Sends SIGTERM, waits for exit, sends SIGKILL if necessary.
Args:
timeout: Seconds to wait before SIGKILL
"""
if not self._process:
return
# Stop reader task
if self._reader_task:
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
# Terminate process
if self._process.returncode is None:
self._process.terminate()
try:
await asyncio.wait_for(
self._process.wait(),
timeout=timeout
)
except asyncio.TimeoutError:
# Force kill
self._process.kill()
await self._process.wait()# In MCPServerManager._start_stdio_server()
async def _start_stdio_server(self, state: MCPServerState) -> bool:
"""Start a stdio-based MCP server.
Args:
state: Server state with stdio configuration
Returns:
True if started successfully
"""
config = state.config
try:
# Create stdio server
server = StdioMCPServer()
# Start subprocess
await server.start(
command=[config.command] + (config.args or []),
env=config.env
)
# Store in state
state.process = server
state.status = ServerStatus.RUNNING
# Verify with initial request
tools = await server.send_request("tools/list", {})
state.tools_loaded = len(tools.get("tools", []))
state.status = ServerStatus.HEALTHY
return True
except Exception as e:
state.status = ServerStatus.FAILED
state.error_message = str(e)
return FalseThe HTTP transport connects to remote MCP servers over HTTP/HTTPS using request-response pattern.
"""HTTP-based MCP server implementation.
Communicates with remote MCP servers via HTTP POST requests.
Uses connection pooling for efficiency and supports authentication.
Performance:
- Connection pooling reduces latency (~50ms per request)
- Keep-alive connections maintained by aiohttp
- Automatic retry on transient failures (429, 502, 503, 504)
"""
import aiohttp
from typing import Any, Dict, Optional
class HTTPMCPServer:
"""MCP server using HTTP transport.
Connects to remote MCP server via HTTP, sending JSON-RPC
requests as POST bodies.
Example:
>>> server = HTTPMCPServer()
>>> await server.connect(
... "https://api.example.com/mcp",
... {"Authorization": "Bearer token"}
... )
>>> result = await server.send_request("tools/list", {})
>>> await server.close()
"""
def __init__(self):
"""Initialize HTTP server (not connected yet)."""
self._base_url: Optional[str] = None
self._session: Optional[aiohttp.ClientSession] = None
self._request_id = 0
async def connect(
self,
base_url: str,
headers: Optional[Dict[str, str]] = None,
timeout: int = 30
) -> None:
"""Establish HTTP connection with connection pooling.
Args:
base_url: Base URL of MCP server
headers: Custom HTTP headers (auth, etc.)
timeout: Default request timeout
"""
self._base_url = base_url.rstrip('/')
# Create session with connection pooling
timeout_config = aiohttp.ClientTimeout(total=timeout)
connector = aiohttp.TCPConnector(
limit=50, # Max 50 concurrent connections
limit_per_host=10, # Max 10 per host
ttl_dns_cache=300, # DNS cache for 5 minutes
keepalive_timeout=60 # Keep connections alive
)
self._session = aiohttp.ClientSession(
headers=headers or {},
timeout=timeout_config,
connector=connector
)
async def send_request(
self,
method: str,
params: Dict[str, Any],
timeout: Optional[float] = None
) -> Dict[str, Any]:
"""Send HTTP POST request with JSON-RPC payload.
Args:
method: JSON-RPC method name
params: Method parameters
timeout: Request timeout (override default)
Returns:
JSON-RPC response result
Raises:
ConnectionError: If server unreachable
TimeoutError: If request times out
RuntimeError: If JSON-RPC error returned
"""
if not self._session:
raise RuntimeError("Not connected - call connect() first")
self._request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": self._request_id
}
# Determine endpoint (convention: /tools/list → POST /tools/list)
endpoint = f"{self._base_url}/{method.replace('/', '/')}"
try:
async with self._session.post(
endpoint,
json=request,
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
response.raise_for_status()
result = await response.json()
if "error" in result:
raise RuntimeError(
f"JSON-RPC error: {result['error']}"
)
return result.get("result", {})
except aiohttp.ClientConnectorError as e:
raise ConnectionError(
f"Cannot connect to {self._base_url}: {e}"
) from e
except asyncio.TimeoutError as e:
raise TimeoutError(
f"Request to {endpoint} timed out"
) from e
async def health_check(self) -> bool:
"""GET /health endpoint check.
Returns:
True if server responds with 200
"""
if not self._session:
return False
try:
async with self._session.get(
f"{self._base_url}/health",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
return response.status == 200
except Exception:
return False
async def close(self) -> None:
"""Close HTTP session and release connections."""
if self._session:
await self._session.close()
self._session = None# In MCPServerManager._connect_http_server()
async def _connect_http_server(self, state: MCPServerState) -> bool:
"""Connect to HTTP-based MCP server.
Args:
state: Server state with HTTP configuration
Returns:
True if connected successfully
"""
config = state.config
try:
# Create HTTP server
server = HTTPMCPServer()
# Prepare headers
headers = dict(config.headers or {})
if config.auth_token:
headers["Authorization"] = f"Bearer {config.auth_token}"
# Connect
await server.connect(
base_url=config.base_url,
headers=headers,
timeout=config.timeout
)
# Store in state
state.http_session = server
state.status = ServerStatus.RUNNING
# Verify with health check
healthy = await server.health_check()
if healthy:
# Load tools
tools = await server.send_request("tools/list", {})
state.tools_loaded = len(tools.get("tools", []))
state.status = ServerStatus.HEALTHY
else:
state.status = ServerStatus.UNHEALTHY
state.error_message = "Health check failed"
return healthy
except Exception as e:
state.status = ServerStatus.FAILED
state.error_message = str(e)
return FalseThe SSE (Server-Sent Events) transport receives streaming updates from MCP servers over a persistent HTTP connection.
"""SSE-based MCP server implementation.
Establishes a long-lived HTTP connection to receive server-sent events.
Useful for streaming responses or long-running tool executions.
Characteristics:
- One-way: Server → Client only
- Automatic reconnection on disconnect
- Line-based event stream format
- Client sends requests via separate HTTP POST
Note:
SSE is read-only. Requests still sent via HTTP POST,
but responses/updates streamed back via SSE connection.
"""
import aiohttp
from typing import Any, Callable, Dict, Optional
class SSEMCPServer:
"""MCP server using Server-Sent Events transport.
Maintains persistent connection for streaming server updates
while sending requests via standard HTTP POST.
Example:
>>> server = SSEMCPServer()
>>> await server.connect("https://api.example.com/sse")
>>>
>>> # Start listening for events
>>> async def handle_event(event):
... print(f"Received: {event}")
>>>
>>> listen_task = asyncio.create_task(
... server.listen_events(handle_event)
... )
>>>
>>> # Send request via HTTP
>>> result = await server.send_request("tools/call", {...})
>>>
>>> await server.close()
"""
def __init__(self):
"""Initialize SSE server (not connected yet)."""
self._sse_url: Optional[str] = None
self._http_session: Optional[aiohttp.ClientSession] = None
self._sse_response: Optional[aiohttp.ClientResponse] = None
self._request_id = 0
async def connect(
self,
sse_url: str,
http_base_url: Optional[str] = None,
headers: Optional[Dict[str, str]] = None
) -> None:
"""Establish SSE connection for streaming.
Args:
sse_url: URL for SSE event stream
http_base_url: Base URL for HTTP requests (default: sse_url)
headers: Custom headers for both SSE and HTTP
"""
self._sse_url = sse_url
self._http_base_url = http_base_url or sse_url.rsplit('/sse', 1)[0]
# Create session
self._http_session = aiohttp.ClientSession(
headers=headers or {}
)
# Establish SSE connection
self._sse_response = await self._http_session.get(
sse_url,
timeout=aiohttp.ClientTimeout(total=None) # No timeout for SSE
)
---
## Server Discovery and Registration
Server discovery loads MCP server configurations from the database and YAML files, then registers them with the manager.
### Discovery Flow
```python
"""Load and register MCP servers from configuration sources.
Discovery Order:
1. Database configurations (highest priority)
2. YAML files in registry directories
3. Environment-based overrides
Validation:
- Schema validation against mcp-server.schema.json
- Duplicate name detection
- Transport type verification
"""
from pathlib import Path
from typing import List
import yaml
async def discover_and_register_servers(
manager: MCPServerManager,
db_pool: DatabasePool,
config_dirs: List[Path]
) -> Dict[str, bool]:
"""Discover MCP servers and register with manager.
Args:
manager: MCPServerManager instance
db_pool: Database connection pool
config_dirs: Directories to scan for YAML configs
Returns:
Dict mapping server name to registration success
"""
configs: List[MCPServerConfig] = []
# 1. Load from database
db_configs = await load_from_database(db_pool)
configs.extend(db_configs)
# 2. Load from YAML files
for config_dir in config_dirs:
yaml_configs = await load_from_yaml_dir(config_dir)
configs.extend(yaml_configs)
# 3. Apply environment overrides
configs = apply_env_overrides(configs)
# 4. Register all servers
return await manager.initialize_servers(configs)
async def load_from_database(
db_pool: DatabasePool
) -> List[MCPServerConfig]:
"""Load MCP server configs from database.
Query:
SELECT name, spec FROM mcp_servers WHERE enabled = true
"""
async with db_pool.acquire() as conn:
rows = await conn.fetch(
"SELECT name, spec FROM mcp_servers WHERE enabled = true"
)
configs = []
for row in rows:
config = MCPServerConfig(
name=row['name'],
transport=TransportType(row['spec']['transport']),
# ... parse spec fields
)
configs.append(config)
return configs
async def load_from_yaml_dir(config_dir: Path) -> List[MCPServerConfig]:
"""Load MCP server configs from YAML directory.
Scans for *.yml and *.yaml files, validates schema.
"""
configs = []
for yaml_file in config_dir.glob("*.y*ml"):
with open(yaml_file) as f:
data = yaml.safe_load(f)
# Validate schema
if data.get('kind') != 'MCPServer':
continue
config = MCPServerConfig(
name=data['metadata']['name'],
transport=TransportType(data['spec']['provider']),
# ... parse configuration
)
configs.append(config)
return configsContinuous health monitoring ensures servers remain responsive and automatically detects failures.
"""Health checking for all MCP server types.
Implements transport-specific health verification with
configurable intervals and failure thresholds.
Failure Detection:
- Stdio: Process alive + responsive to ping
- HTTP: GET /health returns 200
- SSE: Connection active + events flowing
Thresholds:
- 3 consecutive failures → Mark UNHEALTHY
- 5 consecutive failures → Trigger reconnection
"""
from datetime import datetime, timedelta
from typing import Dict, Optional
class ServerHealthChecker:
"""Health checking for all MCP server types.
Monitors server health via transport-specific checks,
tracks failure counts, and triggers recovery actions.
Example:
>>> checker = ServerHealthChecker(manager)
>>> await checker.check_all_servers()
>>> health_status = checker.get_health_summary()
"""
def __init__(
self,
manager: MCPServerManager,
check_interval: int = 60,
failure_threshold: int = 3
):
"""Initialize health checker.
Args:
manager: MCPServerManager to monitor
check_interval: Seconds between checks
failure_threshold: Failures before marking unhealthy
"""
self._manager = manager
self._check_interval = check_interval
self._failure_threshold = failure_threshold
async def check_stdio_server(
self,
server: StdioMCPServer,
state: MCPServerState
) -> bool:
"""Verify subprocess is alive and responsive.
Checks:
1. Process returncode is None (still running)
2. Can send/receive JSON-RPC ping
Args:
server: Stdio server instance
state: Server state tracking
Returns:
True if healthy
"""
# Check process alive
if server._process.returncode is not None:
return False
try:
# Send ping request
result = await server.send_request(
"ping",
{},
timeout=5.0
)
return True
except Exception as e:
print(f"Stdio health check failed: {e}")
return False
async def check_http_server(
self,
server: HTTPMCPServer,
state: MCPServerState
) -> bool:
"""HTTP GET /health endpoint check.
Expected Response:
HTTP 200 OK
{"status": "healthy"}
Args:
server: HTTP server instance
state: Server state tracking
Returns:
True if returns 200
"""
return await server.health_check()
async def check_sse_server(
self,
server: SSEMCPServer,
state: MCPServerState
) -> bool:
"""Verify SSE connection is active.
Checks:
1. HTTP session exists
2. SSE response stream open
3. Received event in last 2 minutes (heartbeat)
Args:
server: SSE server instance
state: Server state tracking
Returns:
True if connection active
"""
if not server._sse_response:
return False
if server._sse_response.closed:
return False
# Check recent activity
if state.last_health_check:
age = datetime.now() - state.last_health_check
if age > timedelta(minutes=2):
# No events for 2 minutes, likely stale
return False
return True
async def check_all_servers(self) -> Dict[str, bool]:
"""Check health of all registered servers.
Updates server state with health status and
increments failure counters.
Returns:
Dict mapping server name to health status
"""
results = {}
async with self._manager._lock:
for name, state in self._manager._servers.items():
healthy = await self._check_server(state)
results[name] = healthy
# Update state
state.last_health_check = datetime.now()
if healthy:
state.consecutive_failures = 0
if state.status != ServerStatus.HEALTHY:
state.status = ServerStatus.HEALTHY
else:
state.consecutive_failures += 1
# Mark unhealthy after threshold
if state.consecutive_failures >= self._failure_threshold:
state.status = ServerStatus.UNHEALTHY
return results
async def _check_server(self, state: MCPServerState) -> bool:
"""Check single server based on transport type."""
config = state.config
try:
if config.transport == TransportType.STDIO:
return await self.check_stdio_server(
state.process,
state
)
elif config.transport == TransportType.HTTP:
return await self.check_http_server(
state.http_session,
state
)
elif config.transport == TransportType.SSE:
return await self.check_sse_server(
state.http_session,
state
)
except Exception as e:
print(f"Health check error for {config.name}: {e}")
return False# In MCPServerManager
async def _health_check_loop(self) -> None:
"""Background task for continuous health monitoring.
Runs until shutdown event is set. Checks all servers
at configured interval and triggers reconnection for
unhealthy servers.
"""
checker = ServerHealthChecker(
self,
check_interval=self._health_check_interval,
failure_threshold=3
)
while not self._shutdown_event.is_set():
try:
# Check all servers
health_status = await checker.check_all_servers()
# Trigger reconnection for failed servers
for name, healthy in health_status.items():
if not healthy:
state = self._servers[name]
if state.consecutive_failures >= 5:
# Reconnect after 5 failures
await self.reconnect_server(name)
# Wait for next interval
await asyncio.sleep(self._health_check_interval)
except asyncio.CancelledError:
break
except Exception as e:
print(f"Health check loop error: {e}")
await asyncio.sleep(5) # Brief pause on errorConnection pooling for HTTP/SSE servers reduces latency and manages resource limits.
"""Connection pool for HTTP/SSE MCP servers.
Maintains a pool of reusable connections with:
- Maximum connection limits
- Connection lifetime management
- Automatic cleanup of stale connections
- Pool statistics and monitoring
Performance:
Reduces connection overhead from ~50ms to <5ms for
subsequent requests by reusing TCP connections.
"""
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional
import aiohttp
@dataclass
class PooledConnection:
"""A single pooled connection."""
session: aiohttp.ClientSession
created_at: datetime
last_used: datetime
use_count: int = 0
class MCPConnectionPool:
"""Connection pool for HTTP/SSE MCP servers.
Manages connection lifecycle with configurable limits
and automatic cleanup of stale connections.
Example:
>>> pool = MCPConnectionPool(max_per_server=10)
>>> session = await pool.acquire("weather-api")
>>> # ... use session ...
>>> await pool.release("weather-api", session)
"""
def __init__(
self,
max_per_server: int = 10,
max_lifetime: int = 3600, # 1 hour
idle_timeout: int = 300 # 5 minutes
):
"""Initialize connection pool.
Args:
max_per_server: Max connections per server
max_lifetime: Max connection age in seconds
idle_timeout: Max idle time before cleanup
"""
self._max_per_server = max_per_server
self._max_lifetime = max_lifetime
self._idle_timeout = idle_timeout
# Pool storage: server_name → [PooledConnection]
self._pools: Dict[str, List[PooledConnection]] = defaultdict(list)
self._lock = asyncio.Lock()
async def acquire(
self,
server_name: str,
headers: Optional[Dict[str, str]] = None
) -> aiohttp.ClientSession:
"""Acquire connection from pool.
Returns existing connection if available, creates new
if pool not full, raises error if at limit.
Args:
server_name: Server identifier
headers: Headers for new connections
Returns:
Active ClientSession
Raises:
RuntimeError: If pool exhausted
"""
async with self._lock:
pool = self._pools[server_name]
# Clean up stale connections first
await self._cleanup_stale(server_name)
# Try to reuse existing connection
for conn in pool:
age = datetime.now() - conn.last_used
if age.total_seconds() < self._idle_timeout:
conn.last_used = datetime.now()
conn.use_count += 1
return conn.session
# Create new if under limit
if len(pool) < self._max_per_server:
session = aiohttp.ClientSession(headers=headers or {})
conn = PooledConnection(
session=session,
created_at=datetime.now(),
last_used=datetime.now(),
use_count=1
)
pool.append(conn)
return session
# Pool exhausted
raise RuntimeError(
f"Connection pool exhausted for {server_name}"
)
async def release(
self,
server_name: str,
session: aiohttp.ClientSession
---
## Graceful Shutdown
Graceful shutdown ensures all MCP servers and resources are cleanly terminated without data loss or orphaned processes.
### Shutdown Sequence
```python
"""Graceful shutdown of all MCP servers.
Shutdown Order (critical for correctness):
1. Stop accepting new requests
2. Wait for active requests to complete (timeout: 30s)
3. Stop health check loop
4. Close HTTP/SSE connections
5. Terminate stdio subprocesses (SIGTERM → SIGKILL)
6. Close connection pools
7. Update server status in database
Invariants:
- No orphaned processes after shutdown
- All network connections closed
- No resource leaks
- Database state reflects shutdown
"""
# In MCPServerManager
async def shutdown_all(self, timeout: float = 30.0) -> None:
"""Gracefully shutdown all MCP servers.
Implements multi-phase shutdown to ensure clean termination:
1. Signal shutdown to prevent new operations
2. Wait for active operations to complete
3. Stop background tasks (health checks)
4. Close all server connections
5. Release all resources
Args:
timeout: Maximum seconds to wait for graceful shutdown
Example:
>>> manager = MCPServerManager(...)
>>> await manager.initialize_servers(configs)
>>> # ... use servers ...
>>> await manager.shutdown_all()
"""
print("Starting graceful shutdown of MCP servers...")
# Phase 1: Signal shutdown
self._shutdown_event.set()
# Phase 2: Stop health check loop
if self._health_check_task:
self._health_check_task.cancel()
try:
await asyncio.wait_for(
self._health_check_task,
timeout=5.0
)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
# Phase 3: Wait for active requests to complete
deadline = asyncio.get_event_loop().time() + timeout
while True:
# Check if any servers have active operations
has_active = await self._has_active_operations()
if not has_active:
break
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
print("Warning: Timeout waiting for active operations")
break
await asyncio.sleep(0.5)
# Phase 4: Shutdown servers by transport type
async with self._lock:
shutdown_tasks = []
for name, state in self._servers.items():
task = self._shutdown_server(state)
shutdown_tasks.append(task)
# Execute shutdowns concurrently
results = await asyncio.gather(
*shutdown_tasks,
return_exceptions=True
)
# Log any shutdown errors
for name, result in zip(self._servers.keys(), results):
if isinstance(result, Exception):
print(f"Error shutting down {name}: {result}")
# Phase 5: Close connection pool
if hasattr(self, '_connection_pool'):
await self._connection_pool.close_all()
print("MCP server shutdown complete")
async def _has_active_operations(self) -> bool:
"""Check if any servers have active operations.
Used during shutdown to wait for ongoing requests.
Returns:
True if any server is processing requests
"""
# Implementation depends on request tracking
# For simplicity, assume no tracking needed if health checks stopped
return False
async def _shutdown_server(self, state: MCPServerState) -> None:
"""Shutdown a single MCP server.
Transport-specific cleanup procedures.
Args:
state: Server state to shutdown
"""
config = state.config
try:
if config.transport == TransportType.STDIO:
await self._shutdown_stdio_server(state)
elif config.transport == TransportType.HTTP:
await self._shutdown_http_server(state)
elif config.transport == TransportType.SSE:
await self._shutdown_sse_server(state)
state.status = ServerStatus.STOPPED
except Exception as e:
print(f"Error shutting down {config.name}: {e}")
state.status = ServerStatus.FAILED
state.error_message = str(e)
async def _shutdown_stdio_server(self, state: MCPServerState) -> None:
"""Shutdown stdio server subprocess.
Termination sequence:
1. Send SIGTERM
2. Wait up to 5 seconds
3. Send SIGKILL if still alive
4. Reap zombie process
"""
if not state.process:
return
server = state.process
try:
await server.shutdown(timeout=5.0)
print(f"Stdio server {state.config.name} terminated")
except Exception as e:
print(f"Error terminating {state.config.name}: {e}")
finally:
state.process = None
async def _shutdown_http_server(self, state: MCPServerState) -> None:
"""Shutdown HTTP server connection.
Closes ClientSession gracefully, releasing all connections.
"""
if not state.http_session:
return
server = state.http_session
try:
await server.close()
print(f"HTTP server {state.config.name} disconnected")
except Exception as e:
print(f"Error closing HTTP {state.config.name}: {e}")
finally:
state.http_session = None
async def _shutdown_sse_server(self, state: MCPServerState) -> None:
"""Shutdown SSE server connection.
Closes both SSE stream and HTTP session.
"""
if not state.http_session:
return
server = state.http_session
try:
await server.close()
print(f"SSE server {state.config.name} disconnected")
except Exception as e:
print(f"Error closing SSE {state.config.name}: {e}")
finally:
state.http_session = None"""Handle OS signals for graceful shutdown.
Catches SIGTERM and SIGINT to trigger clean shutdown.
Critical for production deployments where container
orchestrators send termination signals.
"""
import signal
def setup_signal_handlers(manager: MCPServerManager) -> None:
"""Setup OS signal handlers for graceful shutdown.
Registers handlers for SIGTERM and SIGINT that trigger
manager shutdown.
Args:
manager: MCPServerManager to shutdown on signal
"""
loop = asyncio.get_event_loop()
def signal_handler(sig):
print(f"Received signal {sig}, initiating shutdown...")
asyncio.create_task(manager.shutdown_all())
# Handle SIGTERM (container orchestrator)
loop.add_signal_handler(
signal.SIGTERM,
lambda: signal_handler(signal.SIGTERM)
)
# Handle SIGINT (Ctrl+C)
loop.add_signal_handler(
signal.SIGINT,
lambda: signal_handler(signal.SIGINT)
)
# In main application
async def main():
manager = MCPServerManager(...)
# Setup signal handlers
setup_signal_handlers(manager)
# Initialize servers
await manager.initialize_servers(configs)
# Keep running until shutdown
await manager._shutdown_event.wait()Comprehensive resource cleanup prevents leaks and ensures proper termination.
"""Resource cleanup on server failure or shutdown.
Handles cleanup for:
- Zombie subprocesses (stdio)
- Stale HTTP connections
- Connection pool resources
- Database connection state
- File descriptors and sockets
"""
async def cleanup_failed_server(
state: MCPServerState,
connection_pool: MCPConnectionPool
) -> None:
"""Clean up resources after server failure.
Comprehensive cleanup ensuring no resource leaks:
- Terminate processes forcefully if needed
- Close all network connections
- Remove from connection pool
- Update database status
Args:
state: Failed server state
connection_pool: Connection pool to clean from
"""
config = state.config
# 1. Clean up zombie processes (stdio)
if config.transport == TransportType.STDIO and state.process:
try:
if state.process._process.returncode is None:
# Force kill if still alive
state.process._process.kill()
await state.process._process.wait()
print(f"Force-killed zombie process: {config.name}")
except Exception as e:
print(f"Error cleaning zombie: {e}")
finally:
state.process = None
# 2. Close stale HTTP connections
if state.http_session:
try:
# Close without waiting for pending requests
await asyncio.wait_for(
state.http_session.close(),
timeout=2.0
)
except asyncio.TimeoutError:
print(f"Force-closed stale connection: {config.name}")
except Exception as e:
print(f"Error closing connection: {e}")
finally:
state.http_session = None
# 3. Release connection pool resources
if config.transport in (TransportType.HTTP, TransportType.SSE):
# Remove server from pool
if config.name in connection_pool._pools:
pool = connection_pool._pools[config.name]
for conn in pool:
try:
await conn.session.close()
except Exception:
pass
del connection_pool._pools[config.name]
# 4. Update database status
await update_server_status_in_db(config.name, ServerStatus.FAILED)
print(f"Cleaned up resources for {config.name}")
async def update_server_status_in_db(
server_name: str,
status: ServerStatus
) -> None:
"""Update MCP server status in database.
Persists server state for auditing and monitoring.
Args:
server_name: Server identifier
status: New status to persist
"""
# Pseudo-code for database update
async with db_pool.acquire() as conn:
await conn.execute(
"""
UPDATE mcp_servers
SET status = $1, updated_at = NOW()
WHERE name = $2
""",
status.value,
server_name
)Integration with the Agent Framework to load MCP tools into ChatAgent instances.
"""Load tools from MCP servers into Agent Framework.
Bridges MCP protocol with Microsoft Agent Framework by:
- Discovering tools via MCP protocol
- Converting to Agent Framework tool format
- Registering with ChatAgent instances
- Handling tool invocation delegation
"""
from azure.ai.projects.models import FunctionTool
from typing import List
class MCPToolLoader:
"""Load tools from MCP servers into Agent Framework.
Discovers tools from MCP servers and registers them with
ChatAgent instances for agent use.
Example:
>>> loader = MCPToolLoader(manager)
>>> tools = await loader.load_tools_from_server("context7")
>>> await loader.register_tools_with_agent(agent, tools)
"""
---
## Configuration Examples
Complete YAML configuration examples for all transport types.
### Stdio Server Configuration
```yaml
# examples/mcp-servers/local-filesystem.yml
apiVersion: mcp-servers.liveai.io/v1alpha1
kind: MCPServer
metadata:
name: filesystem-tools
labels:
type: utility
environment: development
transport: stdio
spec:
displayName: "Filesystem Tools Server"
description: "Local filesystem access via MCP"
provider: stdio
configuration:
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
workingDir: "/home/user/projects"
environment:
NODE_ENV: "development"
LOG_LEVEL: "debug"
discovery:
enabled: true
refreshInterval: "5m"
onStartup: true
fallbackToStatic: false# examples/mcp-servers/weather-api-http.yml
apiVersion: mcp-servers.liveai.io/v1alpha1
kind: MCPServer
metadata:
name: weather-api
labels:
type: api
environment: production
transport: http
spec:
displayName: "Weather API Server"
description: "Weather data via HTTP MCP server"
provider: http
configuration:
baseUrl: "https://weather-mcp.example.com/api"
timeout: 60
headers:
X-API-Version: "v1"
X-Client-ID: "liveai-framework"
retries:
maxAttempts: 3
backoff: "exponential"
initialDelay: 1000
authentication:
type: "bearer"
credentials:
token: "${WEATHER_API_TOKEN}"
discovery:
enabled: true
refreshInterval: "1h"
onStartup: true
fallbackToStatic: true
tools:
- name: "get-current-weather"
description: "Get current weather for a city"
parameters:
city:
type: "string"
description: "City name"
required: true
units:
type: "string"
description: "Temperature units"
required: false
default: "metric"
enum: ["metric", "imperial"]# examples/mcp-servers/event-stream-sse.yml
apiVersion: mcp-servers.liveai.io/v1alpha1
kind: MCPServer
metadata:
name: analytics-stream
labels:
type: streaming
environment: production
transport: sse
spec:
displayName: "Analytics Event Stream"
description: "Real-time analytics via SSE"
provider: http # SSE uses HTTP transport
configuration:
baseUrl: "https://analytics-mcp.example.com"
transport: "sse" # Specify SSE explicitly
timeout: 300 # Longer for streaming
headers:
X-Stream-Type: "analytics"
authentication:
type: "api-key"
credentials:
key: "X-API-Key"
value: "${ANALYTICS_API_KEY}"
discovery:
enabled: true
refreshInterval: "30m"
onStartup: true
fallbackToStatic: true# .env - Local development
WEATHER_API_TOKEN=your-weather-token-here
ANALYTICS_API_KEY=your-analytics-key-here
CONTEXT7_API_KEY=your-context7-key-here
# Database
DATABASE_URL=postgresql://localhost:5432/liveai
# Azure AI
AZURE_AI_PROJECT_ENDPOINT=https://your-project.cognitiveservices.azure.com/End-to-end example demonstrating the full MCP server lifecycle.
"""Complete MCP server lifecycle example.
Demonstrates:
- Server initialization with multiple transports
- Health monitoring
- Tool loading into agents
- Graceful shutdown
Run:
python examples/mcp_lifecycle_demo.py
"""
import asyncio
import os
from pathlib import Path
from typing import List
from azure.ai.projects.aio import AIProjectClient
from azure.identity.aio import DefaultAzureCredential
# Import MCP components (pseudo-imports)
from mcp_server_lifecycle import (
MCPServerManager,
MCPServerConfig,
TransportType,
MCPToolLoader,
setup_signal_handlers
)
async def main():
"""Run complete MCP lifecycle demonstration."""
# 1. Initialize Azure AI client
credential = DefaultAzureCredential()
ai_client = AIProjectClient(
endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
credential=credential
)
# 2. Create MCP server manager
manager = MCPServerManager(
ai_client=ai_client,
health_check_interval=60,
max_reconnect_attempts=3
)
# 3. Setup graceful shutdown
setup_signal_handlers(manager)
# 4. Define server configurations
configs = [
# Stdio server - local development
MCPServerConfig(
name="context7",
transport=TransportType.STDIO,
command="npx",
args=["-y", "@upstash/context7-mcp"],
env={"NODE_ENV": "production"}
),
# HTTP server - remote API
MCPServerConfig(
name="weather-api",
transport=TransportType.HTTP,
base_url="https://weather-mcp.example.com/api",
timeout=60,
headers={"X-Client-ID": "liveai"},
auth_token=os.environ.get("WEATHER_API_TOKEN")
),
# SSE server - streaming
MCPServerConfig(
name="analytics",
transport=TransportType.SSE,
base_url="https://analytics.example.com",
timeout=300,
auth_token=os.environ.get("ANALYTICS_API_KEY")
)
]
# 5. Initialize all servers
print("Initializing MCP servers...")
init_results = await manager.initialize_servers(configs)
for server_name, success in init_results.items():
status = "✓" if success else "✗"
print(f" {status} {server_name}")
# 6. Verify health
print("\nChecking server health...")
health_status = await manager.health_check_all()
for server_name, healthy in health_status.items():
status = "Healthy" if healthy else "Unhealthy"
print(f" {server_name}: {status}")
# 7. Load tools into agent
print("\nLoading MCP tools into agent...")
agent = await ai_client.agents.create_agent(
model="gpt-4",
name="MCP Demo Agent",
instructions="""
You are an assistant with access to documentation,
weather data, and analytics via MCP tools.
"""
)
tool_loader = MCPToolLoader(manager)
for server_name in ["context7", "weather-api"]:
tools = await tool_loader.load_tools_from_server(server_name)
await tool_loader.register_tools_with_agent(agent, tools)
print(f" Loaded {len(tools)} tools from {server_name}")
# 8. Use agent with MCP tools
print("\nRunning agent with MCP tools...")
thread = await ai_client.agents.create_thread()
await ai_client.agents.create_message(
thread_id=thread.id,
role="user",
content="What's the weather in San Francisco and find React documentation on hooks?"
)
run = await ai_client.agents.create_and_process_run(
thread_id=thread.id,
assistant_id=agent.id
)
messages = await ai_client.agents.list_messages(thread.id)
print(f"\nAgent response:\n{messages.data[0].content[0].text.value}")
# 9. Monitor for a while
print("\nMonitoring servers (press Ctrl+C to stop)...")
try:
# Run until interrupted
await manager._shutdown_event.wait()
except KeyboardInterrupt:
print("\nShutdown signal received...")
# 10. Graceful shutdown
print("\nShutting down MCP servers...")
await manager.shutdown_all()
print("✓ Shutdown complete")
await ai_client.close()
await credential.close()
if __name__ == "__main__":
asyncio.run(main())Initializing MCP servers...
✓ context7
✓ weather-api
✓ analytics
Checking server health...
context7: Healthy
weather-api: Healthy
analytics: Healthy
Loading MCP tools into agent...
Loaded 2 tools from context7
Loaded 3 tools from weather-api
Running agent with MCP tools...
Agent response:
The current weather in San Francisco is 18°C (64°F) with partly cloudy
skies. Regarding React hooks, they are functions that let you use state
and lifecycle features in functional components. The most common hooks
are useState for state management and useEffect for side effects...
Monitoring servers (press Ctrl+C to stop)...
^C
Shutdown signal received...
Shutting down MCP servers...
Stdio server context7 terminated
HTTP server weather-api disconnected
SSE server analytics disconnected
✓ Shutdown complete
Comprehensive testing strategy for MCP server lifecycle.
"""Unit tests for MCP server lifecycle components.
Run:
pytest tests/test_mcp_lifecycle.py -v
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from mcp_server_lifecycle import (
MCPServerManager,
MCPServerConfig,
TransportType,
ServerStatus,
StdioMCPServer
)
@pytest.fixture
async def mock_ai_client():
"""Mock Azure AI client."""
client = AsyncMock()
return client
@pytest.fixture
async def manager(mock_ai_client):
"""Create MCPServerManager for testing."""
return MCPServerManager(
ai_client=mock_ai_client,
health_check_interval=1,
max_reconnect_attempts=2
)
@pytest.mark.asyncio
async def test_stdio_server_initialization(manager):
"""Test stdio server starts successfully."""
config = MCPServerConfig(
name="test-stdio",
transport=TransportType.STDIO,
command="echo",
args=["test"]
)
with patch('asyncio.create_subprocess_exec') as mock_subprocess:
# Mock successful process
mock_process = AsyncMock()
mock_process.returncode = None
mock_process.stdout.readline.return_value = b'{"result": {}}\n'
mock_subprocess.return_value = mock_process
results = await manager.initialize_servers([config])
assert results["test-stdio"] is True
assert manager._servers["test-stdio"].status == ServerStatus.HEALTHY
@pytest.mark.asyncio
async def test_http_server_connection(manager):
"""Test HTTP server connects successfully."""
config = MCPServerConfig(
name="test-http",
transport=TransportType.HTTP,
base_url="https://test.example.com",
timeout=30
)
with patch('aiohttp.ClientSession') as mock_session:
# Mock successful connection
mock_response = AsyncMock()
mock_response.status = 200
mock_session.return_value.get.return_value.__aenter__.return_value = mock_response
results = await manager.initialize_servers([config])
assert results["test-http"] is True
@pytest.mark.asyncio
async def test_health_check_failure_triggers_reconnect(manager):
"""Test automatic reconnection on health check failure."""
config = MCPServerConfig(
name="test-server",
transport=TransportType.HTTP,
base_url="https://test.example.com"
)
# Initialize server
await manager.initialize_servers([config])
state = manager._servers["test-server"]
# Simulate health check failures
state.consecutive_failures = 5
state.status = ServerStatus.UNHEALTHY
with patch.object(manager, 'reconnect_server') as mock_reconnect:
mock_reconnect.return_value = True
# Trigger health check
await manager._health_check_loop()
# Should trigger reconnection
mock_reconnect.assert_called_once_with("test-server")
@pytest.mark.asyncio
async def test_graceful_shutdown_all_servers(manager):
"""Test graceful shutdown terminates all servers."""
configs = [
MCPServerConfig(
name="stdio-server",
transport=TransportType.STDIO,
command="echo"
),
MCPServerConfig(
name="http-server",
transport=TransportType.HTTP,
base_url="https://test.example.com"
)
]
await manager.initialize_servers(configs)
# Shutdown
await manager.shutdown_all()
# Verify all stopped
for state in manager._servers.values():
assert state.status == ServerStatus.STOPPED
assert state.process is None or state.http_session is None
@pytest.mark.asyncio
async def test_exponential_backoff_reconnection():
"""Test reconnection uses exponential backoff."""
from mcp_server_lifecycle import reconnect_with_backoff
manager = AsyncMock()
with patch('asyncio.sleep') as mock_sleep:
# Simulate all retries failing
manager._servers = {
"test": MagicMock(config=MagicMock(transport=TransportType.HTTP))
}
manager._start_stdio_server.return_value = False
manager._connect_http_server.return_value = False
result = await reconnect_with_backoff(
"test",
manager,
max_retries=3,
base_delay=1.0
)
# Verify exponential delays: 1s, 2s, 4s
assert mock_sleep.call_count == 2 # Between retries
delays = [call.args[0] for call in mock_sleep.call_args_list]
assert delays == [1.0, 2.0]
assert result is False
# Run tests
# pytest tests/test_mcp_lifecycle.py -v --cov=mcp_server_lifecycle"""Integration tests with real MCP servers.
Requires:
- npx installed
- @upstash/context7-mcp package available
"""
import pytest
@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_stdio_server_lifecycle(mock_ai_client):
"""Test with actual stdio MCP server."""
manager = MCPServerManager(mock_ai_client)
config = MCPServerConfig(
name="context7-real",
transport=TransportType.STDIO,
command="npx",
args=["-y", "@upstash/context7-mcp"]
)
# Initialize
results = await manager.initialize_servers([config])
assert results["context7-real"] is True
# Verify tools loaded
state = manager._servers["context7-real"]
assert state.tools_loaded > 0
# Health check
health = await manager.health_check_all()
assert health["context7-real"] is True
# Shutdown
await manager.shutdown_all()
assert state.status == ServerStatus.STOPPED
# Run integration tests
# pytest tests/test_mcp_lifecycle.py -v -m integration"""Optimize connection pool for your workload.
Rules of thumb:
- max_per_server: 10-20 for typical workloads
- max_lifetime: 1 hour (3600s) default
- idle_timeout: 5 minutes (300s) default
Adjust based on:
- Request rate
- Server capacity
- Network latency
"""
# Low traffic (< 10 req/min)
pool = MCPConnectionPool(
max_per_server=5,
max_lifetime=1800, # 30 min
idle_timeout=180 # 3 min
)
# Medium traffic (10-100 req/min)
pool = MCPConnectionPool(
max_per_server=10,
max_lifetime=3600, # 1 hour
idle_timeout=300 # 5 min
)
# High traffic (> 100 req/min)
pool = MCPConnectionPool(
max_per_server=20,
max_lifetime=7200, # 2 hours
idle_timeout=600 # 10 min
)"""Balance between responsiveness and overhead.
Typical values:
- Development: 10-30 seconds
- Production: 60-300 seconds
Consider:
- Server reliability (more checks for unreliable servers)
- Network latency (longer intervals for slow networks)
- Cost (each check is a request)
"""
# Quick failure detection (development)
manager = MCPServerManager(
ai_client=client,
health_check_interval=10 # Every 10 seconds
)
# Balanced (production)
manager = MCPServerManager(
ai_client=client,
health_check_interval=60 # Every minute
)
# Conservative (stable servers)
manager = MCPServerManager(
ai_client=client,
health_check_interval=300 # Every 5 minutes
)"""Prevent resource exhaustion.
Set limits on:
- Maximum concurrent servers
- Maximum connections per server
- Request timeouts
- Subprocess limits
"""
# Global limits
MAX_MCP_SERVERS = 50
MAX_CONNECTIONS_PER_SERVER = 20
MAX_SUBPROCESS_COUNT = 10
# Enforce in manager
class MCPServerManager:
def __init__(self, ai_client, max_servers=50):
self._max_servers = max_servers
# ...
async def initialize_servers(self, configs):
if len(configs) > self._max_servers:
raise ValueError(
f"Too many servers: {len(configs)} > {self._max_servers}"
)
# ...Symptom:
RuntimeError: Process exited immediately: command not found
Causes:
- Command not in PATH
- Missing dependencies
- Incorrect arguments
Solutions:
# 1. Verify command exists
which npx
command -v npx
# 2. Test command manually
npx -y @upstash/context7-mcp
# 3. Use full path
spec:
configuration:
command: "/usr/local/bin/npx"
# 4. Check environment
env | grep PATHSymptom:
TimeoutError: Request to https://api.example.com/tools/list timed out
Causes:
- Network issues
- Server overloaded
- Firewall blocking
Solutions:
# 1. Increase timeout
config = MCPServerConfig(
name="slow-server",
transport=TransportType.HTTP,
base_url="https://api.example.com",
timeout=120 # 2 minutes
)
# 2. Check connectivity
curl -v https://api.example.com/health
# 3. Verify no proxy issues
export no_proxy=api.example.comSymptom:
Server marked UNHEALTHY after 3 consecutive failures
Causes:
- Server actually down
- Network intermittent
- Health endpoint incorrect
Solutions:
# 1. Verify health endpoint
curl https://api.example.com/health
# 2. Adjust failure threshold
checker = ServerHealthChecker(
manager,
failure_threshold=5 # More tolerant
)
# 3. Check server logs
# Look for actual server issuesSymptom:
Reconnection attempt 1/5 for weather-api
Reconnection attempt 2/5 for weather-api
...
Reconnection failed for weather-api
Causes:
- Server permanently down
- Authentication failed
- Network unreachable
Solutions:
# 1. Check server status manually
curl -H "Authorization: Bearer $TOKEN" \
https://api.example.com/health
# 2. Verify credentials
echo $WEATHER_API_TOKEN
# 3. Disable auto-reconnect if needed
manager._max_reconnect_attempts = 0
# 4. Check server logs for auth errorsSymptom:
ValueError: Server not healthy: context7 (status: unhealthy)
Causes:
- Server not initialized
- Discovery failed
- Server crashed
Solutions:
# 1. Check server status
health = await manager.health_check_all()
print(health)
# 2. Manually reconnect
await manager.reconnect_server("context7")
# 3. Use static tool definitions as fallback
spec:
discovery:
enabled: true
fallbackToStatic: true
tools:
- name: "resolve-library-id"
# ... static definition"""Enable detailed logging for troubleshooting."""
import logging
# Enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Log all MCP traffic
logger = logging.getLogger('mcp_server_lifecycle')
logger.setLevel(logging.DEBUG)
# Trace server state changes
async def log_state_change(old_state, new_state):
logger.info(
f"Server {old_state.config.name}: "
f"{old_state.status.value} → {new_state.status.value}"
)- MCP Server Schema - Configuration reference
- MCP Server Examples - Example configurations
- Agent Framework Integration - Using MCP tools in agents
- Model Context Protocol Specification - Official MCP spec
This implementation guide covered:
✅ MCP Protocol Fundamentals - JSON-RPC 2.0 over multiple transports
✅ Transport Implementations - Stdio, HTTP, and SSE with complete code
✅ Lifecycle Management - Initialization, health checks, reconnection
✅ Resource Management - Connection pooling, cleanup, graceful shutdown
✅ Agent Integration - Tool loading into Microsoft Agent Framework
✅ Production Ready - Testing, performance tuning, troubleshooting
- Transport Selection: Use stdio for development, HTTP/SSE for production
- Health Monitoring: Continuous checks with automatic recovery
- Connection Pooling: Essential for HTTP/SSE performance
- Graceful Shutdown: Multi-phase shutdown prevents resource leaks
- Error Resilience: Exponential backoff and fallback mechanisms
- Implement
MCPServerManagerfollowing this guide - Add transport-specific handlers for your deployment
- Configure health checks and reconnection policies
- Integrate with your Agent Framework
- Deploy and monitor in production
For questions or contributions:
- GitHub Issues: LIVE+AI Framework
- MCP Community: Model Context Protocol Discussions
End of MCP Server Lifecycle Management Documentation def init(self, manager: MCPServerManager): """Initialize tool loader.
Args:
manager: MCPServerManager with active servers
"""
self._manager = manager
async def load_tools_from_server(
self,
server_name: str
) -> List[FunctionTool]:
"""Query server for available tools and load them.
Sends "tools/list" request to MCP server and converts
the response to Agent Framework tool definitions.
Args:
server_name: MCP server to load tools from
Returns:
List of FunctionTool instances
Raises:
ValueError: If server not found or not healthy
"""
async with self._manager._lock:
state = self._manager._servers.get(server_name)
if not state:
raise ValueError(f"Server not found: {server_name}")
if state.status != ServerStatus.HEALTHY:
raise ValueError(
f"Server not healthy: {server_name} "
f"(status: {state.status.value})"
)
# Query tools from server
tools_response = await self._call_server_method(
state,
"tools/list",
{}
)
# Convert MCP tools to Agent Framework format
agent_tools = []
for mcp_tool in tools_response.get("tools", []):
agent_tool = self._convert_mcp_to_agent_tool(
mcp_tool,
server_name
)
agent_tools.append(agent_tool)
return agent_tools
async def _call_server_method(
self,
state: MCPServerState,
method: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call MCP server method based on transport type."""
config = state.config
if config.transport == TransportType.STDIO:
return await state.process.send_request(method, params)
elif config.transport in (TransportType.HTTP, TransportType.SSE):
return await state.http_session.send_request(method, params)
else:
raise ValueError(f"Unsupported transport: {config.transport}")
def _convert_mcp_to_agent_tool(
self,
mcp_tool: Dict[str, Any],
server_name: str
) -> FunctionTool:
"""Convert MCP tool definition to Agent Framework tool.
Args:
mcp_tool: MCP tool definition from tools/list
server_name: Source MCP server name
Returns:
FunctionTool for Agent Framework
"""
# Create tool function that delegates to MCP server
async def tool_function(**kwargs):
"""Dynamically created tool that calls MCP server."""
state = self._manager._servers[server_name]
result = await self._call_server_method(
state,
"tools/call",
{
"name": mcp_tool["name"],
"arguments": kwargs
}
)
return result.get("content", [{}])[0].get("text", "")
# Convert input schema to Agent Framework format
parameters = mcp_tool.get("inputSchema", {})
return FunctionTool(
name=f"{server_name}_{mcp_tool['name']}",
description=mcp_tool.get("description", ""),
parameters=parameters,
function=tool_function
)
async def register_tools_with_agent(
self,
agent: ChatAgent,
tools: List[FunctionTool]
) -> None:
"""Register loaded tools with ChatAgent.
Args:
agent: ChatAgent instance to register tools with
tools: Tools to register
"""
for tool in tools:
agent.register_tool(tool)
print(f"Registered {len(tools)} tools with agent")
### Agent Framework Integration Example
```python
"""Complete integration showing MCP tools in ChatAgent workflow."""
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import ChatAgent
async def create_agent_with_mcp_tools(
ai_client: AIProjectClient,
mcp_manager: MCPServerManager,
server_names: List[str]
) -> ChatAgent:
"""Create ChatAgent with tools from MCP servers.
Args:
ai_client: Azure AI Project client
mcp_manager: Initialized MCP server manager
server_names: MCP servers to load tools from
Returns:
ChatAgent with MCP tools registered
"""
# Create agent
agent = await ai_client.agents.create_agent(
model="gpt-4",
name="Documentation Assistant",
instructions="""
You are a helpful assistant with access to up-to-date
documentation via MCP tools. Use the tools to fetch
accurate information.
"""
)
# Load and register tools from each MCP server
tool_loader = MCPToolLoader(mcp_manager)
for server_name in server_names:
tools = await tool_loader.load_tools_from_server(server_name)
await tool_loader.register_tools_with_agent(agent, tools)
print(f"Loaded {len(tools)} tools from {server_name}")
return agent
# Usage example
async def main():
# Initialize MCP manager
ai_client = AIProjectClient(...)
mcp_manager = MCPServerManager(ai_client)
configs = [
MCPServerConfig(
name="context7",
transport=TransportType.STDIO,
command="npx",
args=["-y", "@upstash/context7-mcp"]
)
]
await mcp_manager.initialize_servers(configs)
# Create agent with MCP tools
agent = await create_agent_with_mcp_tools(
ai_client,
mcp_manager,
["context7"]
)
# Use agent with MCP tools
thread = await ai_client.agents.create_thread()
await ai_client.agents.create_message(
thread_id=thread.id,
role="user",
content="How do I use React hooks?"
)
# Agent automatically uses MCP tools
run = await ai_client.agents.create_and_process_run(
thread_id=thread.id,
assistant_id=agent.id
)
messages = await ai_client.agents.list_messages(thread.id)
print(messages.data[0].content[0].text.value)
# Cleanup
await mcp_manager.shutdown_all()
This concludes Pass 4. The document now includes graceful shutdown procedures, resource cleanup, and tool loading integration with the Agent Framework.
) -> None:
"""Return connection to pool.
Connection remains in pool for reuse unless stale.
Args:
server_name: Server identifier
session: Session to release
"""
async with self._lock:
# Connection stays in pool, just update timestamp
for conn in self._pools[server_name]:
if conn.session == session:
conn.last_used = datetime.now()
break
async def _cleanup_stale(self, server_name: str) -> None:
"""Remove stale connections from pool.
Closes and removes connections that exceed:
- Max lifetime
- Idle timeout
"""
pool = self._pools[server_name]
now = datetime.now()
to_remove = []
for conn in pool:
lifetime = (now - conn.created_at).total_seconds()
idle_time = (now - conn.last_used).total_seconds()
if lifetime > self._max_lifetime or idle_time > self._idle_timeout:
await conn.session.close()
to_remove.append(conn)
for conn in to_remove:
pool.remove(conn)
async def health_check_pool(self) -> Dict[str, Dict[str, int]]:
"""Get pool statistics.
Returns:
Dict with stats per server:
- total: Total connections
- active: Recently used connections
- stale: Stale connections
"""
stats = {}
now = datetime.now()
async with self._lock:
for server_name, pool in self._pools.items():
active = sum(
1 for c in pool
if (now - c.last_used).total_seconds() < self._idle_timeout
)
stats[server_name] = {
"total": len(pool),
"active": active,
"stale": len(pool) - active
}
return stats
async def close_all(self) -> None:
"""Close all pooled connections.
Used during shutdown to release all resources.
"""
async with self._lock:
for pool in self._pools.values():
for conn in pool:
await conn.session.close()
self._pools.clear()
---
## Automatic Reconnection
Automatic reconnection with exponential backoff handles transient failures gracefully.
### Reconnection with Exponential Backoff
```python
"""Reconnect to failed MCP server with exponential backoff.
Backoff Strategy:
- Attempt 1: Immediate
- Attempt 2: 1 second delay
- Attempt 3: 2 seconds delay
- Attempt 4: 4 seconds delay
- Attempt 5: 8 seconds delay
- Max delay: 60 seconds
Failure Handling:
- After max retries, mark server FAILED
- Log all reconnection attempts
- Preserve server configuration for manual restart
"""
async def reconnect_with_backoff(
server_name: str,
manager: MCPServerManager,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> bool:
"""Reconnect to failed MCP server with exponential backoff.
Args:
server_name: Server to reconnect
manager: MCPServerManager instance
max_retries: Maximum reconnection attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
Returns:
True if reconnection succeeded
"""
async with manager._lock:
state = manager._servers.get(server_name)
if not state:
raise ValueError(f"Unknown server: {server_name}")
state.status = ServerStatus.RECONNECTING
for attempt in range(1, max_retries + 1):
try:
print(
f"Reconnection attempt {attempt}/{max_retries} "
f"for {server_name}"
)
# Cleanup old connection
await _cleanup_server_resources(state)
# Attempt reconnection based on transport
success = False
if state.config.transport == TransportType.STDIO:
success = await manager._start_stdio_server(state)
elif state.config.transport == TransportType.HTTP:
success = await manager._connect_http_server(state)
elif state.config.transport == TransportType.SSE:
success = await manager._connect_sse_server(state)
if success:
print(f"Reconnected to {server_name}")
state.consecutive_failures = 0
return True
except Exception as e:
print(f"Reconnection attempt {attempt} failed: {e}")
# Exponential backoff
if attempt < max_retries:
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
print(f"Waiting {delay}s before retry...")
await asyncio.sleep(delay)
# All retries exhausted
async with manager._lock:
state.status = ServerStatus.FAILED
state.error_message = f"Failed after {max_retries} reconnection attempts"
print(f"Reconnection failed for {server_name}")
return False
async def _cleanup_server_resources(state: MCPServerState) -> None:
"""Clean up resources before reconnection attempt.
Ensures clean slate by:
- Terminating stdio processes
- Closing HTTP sessions
- Releasing connection pool resources
"""
if state.process:
try:
await state.process.shutdown(timeout=2.0)
except Exception:
pass
state.process = None
if state.http_session:
try:
await state.http_session.close()
except Exception:
pass
state.http_session = None
# In MCPServerManager
async def reconnect_server(
self,
server_name: str,
max_retries: int = 3
) -> bool:
"""Reconnect to a failed server.
Public method for manual or automatic reconnection.
Uses exponential backoff strategy internally.
Args:
server_name: Server to reconnect
max_retries: Maximum reconnection attempts
Returns:
True if reconnection succeeded
Example:
>>> success = await manager.reconnect_server("weather-api")
>>> if success:
... print("Server reconnected")
"""
return await reconnect_with_backoff(
server_name,
self,
max_retries=max_retries or self._max_reconnect_attempts
)
async def health_check_all(self) -> Dict[str, bool]:
"""Check health of all registered servers.
Returns:
Dict mapping server name to health status
"""
checker = ServerHealthChecker(self)
return await checker.check_all_servers()This concludes Pass 3. The document now includes health checking, connection pooling, and automatic reconnection with exponential backoff.
if self._sse_response.status != 200:
raise ConnectionError(
f"SSE connection failed: {self._sse_response.status}"
)
async def listen_events(
self,
callback: Callable[[Dict[str, Any]], None]
) -> None:
"""Listen for server-sent events.
Blocks and calls callback for each event received.
Runs until connection closes or cancelled.
Args:
callback: Async function called for each event
"""
if not self._sse_response:
raise RuntimeError("Not connected")
try:
async for line in self._sse_response.content:
line = line.decode().strip()
# SSE format: "data: {json}"
if line.startswith('data: '):
try:
event_data = json.loads(line[6:])
await callback(event_data)
except json.JSONDecodeError as e:
print(f"Invalid SSE data: {e}")
# Heartbeat or comment
elif line.startswith(':'):
pass # Ignore comments
except asyncio.CancelledError:
raise
except Exception as e:
print(f"SSE stream error: {e}")
async def send_request(
self,
method: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""Send HTTP request (responses come via SSE).
Args:
method: JSON-RPC method
params: Method parameters
Returns:
Immediate acknowledgment (actual result via SSE)
"""
if not self._http_session:
raise RuntimeError("Not connected")
self._request_id += 1
request = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": self._request_id
}
endpoint = f"{self._http_base_url}/{method}"
async with self._http_session.post(
endpoint,
json=request
) as response:
response.raise_for_status()
return await response.json()
async def close(self) -> None:
"""Close SSE connection and HTTP session."""
if self._sse_response:
self._sse_response.close()
self._sse_response = None
if self._http_session:
await self._http_session.close()
self._http_session = None
### SSE Server Lifecycle
```python
# In MCPServerManager._connect_sse_server()
async def _connect_sse_server(self, state: MCPServerState) -> bool:
"""Connect to SSE-based MCP server.
Args:
state: Server state with SSE configuration
Returns:
True if connected successfully
"""
config = state.config
try:
# Create SSE server
server = SSEMCPServer()
# Prepare headers
headers = dict(config.headers or {})
if config.auth_token:
headers["Authorization"] = f"Bearer {config.auth_token}"
# Connect to SSE stream
sse_url = f"{config.base_url}/sse"
await server.connect(
sse_url=sse_url,
http_base_url=config.base_url,
headers=headers
)
# Start event listener in background
async def handle_event(event: Dict[str, Any]) -> None:
# Process SSE events (tool results, notifications, etc.)
print(f"SSE event from {config.name}: {event}")
event_task = asyncio.create_task(
server.listen_events(handle_event)
)
# Store in state
state.http_session = server
state.status = ServerStatus.RUNNING
# Verify with initial request
tools = await server.send_request("tools/list", {})
state.tools_loaded = len(tools.get("tools", []))
state.status = ServerStatus.HEALTHY
return True
except Exception as e:
state.status = ServerStatus.FAILED
state.error_message = str(e)
return False
This concludes Pass 2. The document now includes complete transport-specific implementations for stdio, HTTP, and SSE.
Connection pooling for MCP servers reduces latency and manages resource limits by reusing connections across multiple tool calls.
"""Connection pool for MCP servers.
Maintains a pool of reusable connections to reduce overhead.
Particularly effective for HTTP/SSE transports where connection
establishment is expensive (~50ms per connection).
Performance Impact:
- First request: ~50ms (connection establishment)
- Pooled requests: ~5ms (reuse existing connection)
- Reduces server load and client latency
Invariants:
- Pool size never exceeds max_connections per server
- Stale connections automatically cleaned up
- Thread-safe via asyncio locks
"""
import asyncio
from typing import Dict, List, Optional
from logging import getLogger
from dataclasses import dataclass
logger = getLogger(__name__)
@dataclass
class MCPServerConfig:
"""Configuration for MCP server connection."""
name: str
transport: str # "stdio" or "http"
command: Optional[str] = None
args: Optional[List[str]] = None
env: Optional[Dict[str, str]] = None
url: Optional[str] = None
headers: Optional[Dict[str, str]] = None
class MCPConnectionPool:
"""Pool connections to MCP servers.
Maintains a pool of connections for each MCP server to reduce
connection overhead and improve performance for frequent tool calls.
"""
def __init__(self, max_connections: int = 5):
"""Initialize connection pool.
Args:
max_connections: Maximum connections per server
"""
self._pools: Dict[str, List[MCPClient]] = {}
self._max_connections = max_connections
self._locks: Dict[str, asyncio.Lock] = {}
logger.info(f"Initialized MCP connection pool (max={max_connections})")
async def acquire(self, server_name: str, config: MCPServerConfig) -> MCPClient:
"""Acquire connection from pool.
Returns existing healthy connection if available, otherwise
creates new connection up to max_connections limit.
Args:
server_name: Name of MCP server
config: Server configuration for new connections
Returns:
Connected MCP client ready for use
Raises:
ConnectionError: If unable to establish connection
"""
if server_name not in self._locks:
self._locks[server_name] = asyncio.Lock()
async with self._locks[server_name]:
# Try to get existing connection
if server_name in self._pools and self._pools[server_name]:
client = self._pools[server_name].pop()
# Verify connection is still healthy
if await self._check_health(client):
logger.debug(f"Reusing connection to '{server_name}'")
return client
else:
logger.warning(f"Stale connection to '{server_name}', reconnecting")
await client.disconnect()
# Create new connection
if server_name not in self._pools:
self._pools[server_name] = []
client = await self._connect(config)
logger.info(f"Created new connection to '{server_name}'")
return client
async def release(self, server_name: str, client: MCPClient):
"""Return connection to pool.
Connection is returned to pool for reuse if pool is not full.
Otherwise, connection is closed.
Args:
server_name: Name of MCP server
client: Client connection to return
"""
async with self._locks[server_name]:
if len(self._pools.get(server_name, [])) < self._max_connections:
self._pools[server_name].append(client)
logger.debug(f"Returned connection to '{server_name}' pool")
else:
await client.disconnect()
logger.debug(f"Pool full, closed connection to '{server_name}'")
async def _connect(self, config: MCPServerConfig) -> MCPClient:
"""Establish new MCP connection.
Args:
config: Server configuration
Returns:
Connected MCP client
Raises:
ConnectionError: If connection fails
"""
if config.transport == "stdio":
return await StdioMCPClient.connect(
command=config.command,
args=config.args,
env=config.env
)
elif config.transport == "http":
return HttpMCPClient(
base_url=config.url,
headers=config.headers
)
else:
raise ValueError(f"Unsupported transport: {config.transport}")
async def _check_health(self, client: MCPClient) -> bool:
"""Check if connection is healthy.
Args:
client: MCP client to check
Returns:
True if connection is healthy, False otherwise
"""
try:
await asyncio.wait_for(client.ping(), timeout=2.0)
return True
except (asyncio.TimeoutError, Exception):
return False
async def close_all(self):
"""Close all pooled connections.
Called during shutdown to clean up resources.
"""
for server_name, clients in self._pools.items():
for client in clients:
await client.disconnect()
logger.info(f"Closed {len(clients)} connections to '{server_name}'")
self._pools.clear()Transport-Specific Recommendations:
-
stdio transport: Pool size 1-2 (process overhead)
- Each connection spawns a subprocess
- High memory overhead per connection
- Use minimal pooling for development
-
HTTP transport: Pool size 3-5 (network connections)
- Balances connection reuse with memory
- Suitable for moderate request rates
- Default for production deployments
-
Adjust based on:
- Tool call frequency (higher = larger pool)
- Latency requirements (lower latency = larger pool)
- Available memory (constrain for resource-limited systems)
Example Configurations:
# Development (low traffic)
pool = MCPConnectionPool(max_connections=2)
# Production (moderate traffic)
pool = MCPConnectionPool(max_connections=5)
# High-throughput (many concurrent requests)
pool = MCPConnectionPool(max_connections=10)Comprehensive health monitoring ensures MCP servers remain operational with automatic failure detection and recovery.
"""Health checking for MCP servers.
Performs multi-level health verification:
1. Basic connectivity (ping/heartbeat)
2. Functional validation (tool listing)
3. Latency measurement
Thresholds:
- 2 consecutive failures → Log warning
- 5 consecutive failures → Trigger reconnection
- 10 consecutive failures → Mark as FAILED
"""
from enum import Enum
from typing import NamedTuple
from datetime import datetime
class HealthStatus(Enum):
"""Health check status."""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
DEGRADED = "degraded"
class HealthCheckResult(NamedTuple):
"""Result of health check."""
name: str
status: HealthStatus
message: str
latency_ms: Optional[float] = None
timestamp: datetime = datetime.now()
async def check_mcp_server_health(
server: MCPServer,
timeout: float = 5.0
) -> HealthCheckResult:
"""Comprehensive MCP server health check.
Performs multiple checks to verify server is operational:
1. Basic connectivity (ping)
2. Tool listing (verifies server is functional)
3. Latency measurement
Args:
server: MCP server to check
timeout: Maximum time for health check (seconds)
Returns:
HealthCheckResult with detailed status information
Example:
result = await check_mcp_server_health(mcp_server)
if result.status == HealthStatus.HEALTHY:
print(f"Server OK: {result.message}")
else:
print(f"Server issue: {result.message}")
"""
start_time = datetime.now()
try:
# Basic connectivity check
await asyncio.wait_for(server.client.ping(), timeout=timeout / 2)
# Functional check - list tools
tools = await asyncio.wait_for(
server.client.list_tools(),
timeout=timeout / 2
)
# Calculate latency
latency = (datetime.now() - start_time).total_seconds() * 1000
return HealthCheckResult(
name=f"mcp_{server.name}",
status=HealthStatus.HEALTHY,
message=f"Server responsive, {len(tools)} tools available",
latency_ms=latency
)
except asyncio.TimeoutError:
return HealthCheckResult(
name=f"mcp_{server.name}",
status=HealthStatus.UNHEALTHY,
message=f"Health check timeout after {timeout}s"
)
except ConnectionError as e:
return HealthCheckResult(
name=f"mcp_{server.name}",
status=HealthStatus.UNHEALTHY,
message=f"Connection error: {str(e)}"
)
except Exception as e:
return HealthCheckResult(
name=f"mcp_{server.name}",
status=HealthStatus.DEGRADED,
message=f"Unexpected error: {str(e)}"
)
async def periodic_health_checks(
servers: List[MCPServer],
interval: int = 60
):
"""Run periodic health checks on all MCP servers.
Continuously monitors server health and logs issues.
Args:
servers: List of MCP servers to monitor
interval: Seconds between health checks
Example:
# Start background health monitoring
asyncio.create_task(periodic_health_checks(mcp_servers, interval=30))
"""
while True:
for server in servers:
result = await check_mcp_server_health(server)
if result.status == HealthStatus.HEALTHY:
logger.debug(f"{result.name}: {result.message} ({result.latency_ms:.1f}ms)")
elif result.status == HealthStatus.DEGRADED:
logger.warning(f"{result.name}: {result.message}")
else:
logger.error(f"{result.name}: {result.message}")
await asyncio.sleep(interval)1. Timeout Configuration
-
Ping/connectivity: 2-5 seconds
- Quick check for basic connectivity
- Fails fast on network issues
-
Functional checks: 5-10 seconds
- Tool listing or resource queries
- Verifies server logic is working
2. Check Frequency
- Development: 10-30 seconds (quick feedback)
- Production: 30-60 seconds (balanced)
- Stable systems: 60-300 seconds (reduced overhead)
3. Alert Thresholds
- 2 failures: Log warning (transient issue)
- 5 failures: Trigger reconnection (persistent problem)
- 10 failures: Mark FAILED (server down)
4. Graceful Degradation
- Continue operation with degraded servers
- Disable only on complete failure
- Provide fallback mechanisms where possible
- Alert operations team for manual intervention
"""Combine connection pooling with health monitoring."""
class MCPServerManager:
"""Combine connection pooling with health monitoring."""
def __init__(self):
self.pool = MCPConnectionPool(max_connections=5)
self.health_status: Dict[str, HealthCheckResult] = {}
async def get_client(
self,
server_name: str,
config: MCPServerConfig
) -> Optional[MCPClient]:
"""Get client only if server is healthy.
Args:
server_name: Server name
config: Server configuration
Returns:
Client if healthy, None if unhealthy
"""
# Check health status
if server_name in self.health_status:
status = self.health_status[server_name]
if status.status == HealthStatus.UNHEALTHY:
logger.warning(f"Skipping unhealthy server '{server_name}'")
return None
# Acquire connection
return await self.pool.acquire(server_name, config)Usage Example:
# Initialize manager with pooling and health checks
manager = MCPServerManager()
# Start background health monitoring
asyncio.create_task(
periodic_health_checks(
servers=manager.get_all_servers(),
interval=60 # Check every minute
)
)
# Get client (automatically checks health)
client = await manager.get_client("context7", config)
if client:
result = await client.call_tool("resolve-library-id", {"libraryName": "react"})
await manager.pool.release("context7", client)
else:
logger.error("Server unavailable, using fallback")