Skip to content

Instantly share code, notes, and snippets.

@intlabs
Last active October 20, 2025 20:46
Show Gist options
  • Select an option

  • Save intlabs/05b3f8335caaa1b03f00980f07473383 to your computer and use it in GitHub Desktop.

Select an option

Save intlabs/05b3f8335caaa1b03f00980f07473383 to your computer and use it in GitHub Desktop.
title component tags level scope phase lines related last_updated
MCP Server Lifecycle Management
mcp
async
guide
integration
lifecycle
mcp
v1.0.0
implementation
v1.0.0
implementation
3900
tag docs
async
path title tags
architecture/adrs/ADR-003-async-only-architecture.md
ADR-003: Async-Only Architecture
adr
architecture
async
asyncio
design
v1.0.0
path title tags
implementation/async-execution-patterns.md
Async Execution Patterns
async
asyncio
guide
python
v1.0.0
path title tags
implementation/teams-bot-webhook-handler.md
Teams Bot Webhook Handler Implementation
async
azure
fastapi
guide
teams
v1.0.0
path title tags
implementation/workflow-thread-manager.md
Workflow Thread Manager
async
v1.0.0
workflow
tag docs
integration
path title tags
architecture/adrs/ADR-011-real-api-testing-only.md
ADR-011: No LLM Mocking - Real AI Model Testing Required
adr
architecture
azure
integration
pytest
testing
path title tags
architecture/microsoft-agent-framework.md
Microsoft Agent Framework Integration
agent
architecture
azure
design
integration
v1.0.0
path title tags
deployment/teams-integration.md
Teams Bot Integration Setup
azure
deployment
integration
operations
teams
v1.0.0
path title tags
implementation/agent-framework-integration.md
Agent Framework Integration Guide
agent
azure
design
guide
integration
v1.0.0
tag docs
mcp
path title tags
examples/mcp-servers/README.md
MCP Server Examples Guide
example
mcp
guide
reference
v1.0.0
path title tags
schemas/mcp-servers.md
MCP Server Configuration Schema
configuration
mcp
reference
schema
v1.0.0
2025-01-15

MCP Server Lifecycle Management

Last Updated: January 2025
Status: Implementation Guide
Prerequisite Reading: MCP Server Schema


Table of Contents

  1. Overview
  2. MCP Protocol Fundamentals
  3. Transport Types
  4. MCPServerManager Implementation
  5. Stdio Transport Implementation
  6. HTTP Transport Implementation
  7. SSE Transport Implementation
  8. Server Discovery and Registration
  9. Health Check Implementation
  10. Connection Pool Management
  11. Automatic Reconnection
  12. Graceful Shutdown
  13. Resource Cleanup
  14. Tool Loading Integration
  15. Configuration Examples
  16. Complete Working Example
  17. Testing
  18. Performance Considerations
  19. Troubleshooting

Overview

The MCP Server Lifecycle Management system handles the complete lifecycle of Model Context Protocol (MCP) servers in the LIVE+AI framework. This includes initialization, health monitoring, automatic reconnection, and graceful shutdown across multiple transport types.

Purpose

Manage MCP servers that provide tools and resources to AI agents, ensuring:

  • Reliable Connectivity: Maintain stable connections across all transport types
  • Health Monitoring: Continuous health checks with automatic recovery
  • Resource Management: Proper cleanup and connection pooling
  • Tool Discovery: Automatic loading and registration of server tools
  • Graceful Degradation: Fallback mechanisms when servers fail

Integration with LIVE+AI

MCP servers integrate with the Agent Framework to provide external tools:

ChatAgent → AgentFrameworkAdapter → MCPServerManager → MCP Server
                                           ↓
                                    Tool Discovery
                                    Health Checks
                                    Connection Pooling

Key Components


MCP Protocol Fundamentals

What is MCP?

The Model Context Protocol (MCP) is an open protocol that standardizes how AI applications communicate with external tools and data sources. Key concepts:

1. Tools: Functions that agents can invoke with structured inputs 2. Resources: Data sources that agents can read 3. Prompts: Reusable prompt templates 4. JSON-RPC 2.0: Message format for client-server communication

MCP Message Flow

Client                          Server
  |                               |
  |----(initialize)-------------->|
  |<---(capabilities)-------------|
  |                               |
  |----(tools/list)-------------->|
  |<---(tool definitions)---------|
  |                               |
  |----(tools/call)-------------->|
  |<---(tool result)--------------|

Supported Transports

Based on MCP Python SDK research, LIVE+AI supports three transport mechanisms:

Transport Protocol Use Case Bidirectional
stdio Standard I/O Local processes, development ✅ Yes
HTTP Request-Response Remote servers, stateless ❌ No
SSE Server-Sent Events Streaming, long-running ⚠️ Server→Client only

JSON-RPC 2.0 Message Format

Request:

{
  "jsonrpc": "2.0",
  "method": "tools/list",
  "params": {},
  "id": 1
}

Response:

{
  "jsonrpc": "2.0",
  "result": {
    "tools": [
      {
        "name": "get-weather",
        "description": "Get current weather",
        "inputSchema": {
          "type": "object",
          "properties": {
            "city": {"type": "string"}
          }
        }
      }
    ]
  },
  "id": 1
}

Error:

{
  "jsonrpc": "2.0",
  "error": {
    "code": -32601,
    "message": "Method not found"
  },
  "id": 1
}

Transport Types

Stdio Transport

Mechanism: Local subprocess communicating via stdin/stdout streams.

Characteristics:

  • Fast startup (~100ms)
  • Direct process control
  • No network overhead
  • Suitable for development

Process Lifecycle:

# 1. Spawn subprocess
process = subprocess.Popen(
    ["npx", "-y", "@upstash/context7-mcp"],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE
)

# 2. Write JSON-RPC to stdin
request = {"jsonrpc": "2.0", "method": "tools/list", "id": 1}
process.stdin.write(json.dumps(request).encode() + b'\n')
process.stdin.flush()

# 3. Read JSON-RPC from stdout
response = process.stdout.readline()
result = json.loads(response)

# 4. Cleanup on shutdown
process.terminate()
process.wait(timeout=5)

HTTP Transport

Mechanism: RESTful HTTP requests to remote MCP server.

Characteristics:

  • Stateless by default
  • Firewall-friendly (port 443)
  • Supports connection pooling
  • Production-ready

Request Flow:

# POST to /mcp/tools/list
response = await http_client.post(
    f"{base_url}/tools/list",
    json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
    headers={"Authorization": f"Bearer {token}"}
)
result = response.json()

SSE Transport

Mechanism: Server-Sent Events for streaming responses.

Characteristics:

  • One-way: Server → Client
  • Long-lived HTTP connection
  • Automatic reconnection
  • Progress updates for long tasks

Connection Pattern:

# Establish SSE connection
async with aiohttp.ClientSession() as session:
    async with session.get(f"{base_url}/sse") as response:
        # Listen for server-sent events
        async for line in response.content:
            if line.startswith(b'data: '):
                event_data = json.loads(line[6:])
                # Process event

MCPServerManager Implementation

The MCPServerManager class orchestrates the complete lifecycle of all MCP servers in the LIVE+AI system.

Class Definition

"""Manage MCP server lifecycle and coordination.

This manager handles initialization, health monitoring, reconnection,
and shutdown for all registered MCP servers across multiple transports.

Thread Safety:
    All public methods are async and thread-safe via asyncio locks.

Resource Management:
    - Stdio servers: Process handles tracked and terminated on shutdown
    - HTTP/SSE servers: Connection pools managed with max limits
    - All resources cleaned up in graceful shutdown sequence
"""

import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Set
from enum import Enum

import aiohttp
from azure.ai.projects.aio import AIProjectClient


class TransportType(Enum):
    """MCP transport protocol types."""
    STDIO = "stdio"
    HTTP = "http"
    SSE = "sse"


class ServerStatus(Enum):
    """MCP server lifecycle status."""
    PENDING = "pending"       # Initializing
    RUNNING = "running"       # Active but not verified
    HEALTHY = "healthy"       # Verified healthy
    UNHEALTHY = "unhealthy"   # Health check failed
    RECONNECTING = "reconnecting"  # Attempting reconnect
    STOPPED = "stopped"       # Intentionally stopped
    FAILED = "failed"         # Unrecoverable error


@dataclass
class MCPServerConfig:
    """Configuration for a single MCP server.
    
    Attributes:
        name: Unique server identifier
        transport: Transport type (stdio, http, sse)
        command: Command for stdio servers
        args: Arguments for stdio command
        env: Environment variables for stdio
        base_url: URL for HTTP/SSE servers
        headers: Custom headers for HTTP/SSE
        timeout: Request timeout in seconds
        auth_token: Bearer token for authentication
    """
    name: str
    transport: TransportType
    
    # Stdio-specific
    command: Optional[str] = None
    args: Optional[List[str]] = None
    env: Optional[Dict[str, str]] = None
    
    # HTTP/SSE-specific
    base_url: Optional[str] = None
    headers: Optional[Dict[str, str]] = None
    timeout: int = 30
    auth_token: Optional[str] = None


@dataclass
class MCPServerState:
    """Runtime state for an MCP server.
    
    Tracks connection status, health metrics, and resources.
    """
    config: MCPServerConfig
    status: ServerStatus = ServerStatus.PENDING
    process: Optional[asyncio.subprocess.Process] = None
    http_session: Optional[aiohttp.ClientSession] = None
    last_health_check: Optional[datetime] = None
    consecutive_failures: int = 0
    tools_loaded: int = 0
    error_message: Optional[str] = None


class MCPServerManager:
    """Central manager for MCP server lifecycle.
    
    Responsibilities:
        - Initialize servers based on configuration
        - Maintain connection pools for HTTP/SSE
        - Monitor server health continuously
        - Handle automatic reconnection with backoff
        - Gracefully shutdown all servers
        - Load tools into Agent Framework
    
    Example:
        >>> manager = MCPServerManager(db_pool, config)
        >>> await manager.initialize_servers()
        >>> health = await manager.health_check_all()
        >>> await manager.shutdown_all()
    """
    
    def __init__(
        self,
        ai_client: AIProjectClient,
        health_check_interval: int = 60,
        max_reconnect_attempts: int = 3
    ):
        """Initialize the MCP server manager.
        
        Args:
            ai_client: Azure AI Project client for agent integration
            health_check_interval: Seconds between health checks
            max_reconnect_attempts: Maximum reconnection retries
        """
        self._ai_client = ai_client
        self._health_check_interval = health_check_interval
        self._max_reconnect_attempts = max_reconnect_attempts
        
        # Server registry
        self._servers: Dict[str, MCPServerState] = {}
        self._lock = asyncio.Lock()
        
        # Connection pooling
        self._http_sessions: Dict[str, aiohttp.ClientSession] = {}
        
        # Background tasks
        self._health_check_task: Optional[asyncio.Task] = None
        self._shutdown_event = asyncio.Event()
    
    async def initialize_servers(
        self,
        configs: List[MCPServerConfig]
    ) -> Dict[str, bool]:
        """Initialize all configured MCP servers.
        
        Starts servers based on their transport type and performs
        initial health verification.
        
        Args:
            configs: List of server configurations
            
        Returns:
            Dict mapping server name to initialization success
            
        Raises:
            ValueError: If server name is duplicate
            RuntimeError: If critical initialization fails
        """
        async with self._lock:
            results = {}
            
            for config in configs:
                if config.name in self._servers:
                    raise ValueError(
                        f"Duplicate server name: {config.name}"
                    )
                
                try:
                    # Create server state
                    state = MCPServerState(config=config)
                    self._servers[config.name] = state
                    
                    # Initialize based on transport
                    if config.transport == TransportType.STDIO:
                        success = await self._start_stdio_server(state)
                    elif config.transport == TransportType.HTTP:
                        success = await self._connect_http_server(state)
                    elif config.transport == TransportType.SSE:
                        success = await self._connect_sse_server(state)
                    else:
                        raise ValueError(
                            f"Unsupported transport: {config.transport}"
                        )
                    
                    results[config.name] = success
                    
                except Exception as e:
                    state.status = ServerStatus.FAILED
                    state.error_message = str(e)
                    results[config.name] = False
            
            # Start health monitoring
            if not self._health_check_task:
                self._health_check_task = asyncio.create_task(
                    self._health_check_loop()
                )
            
            return results

Key Design Decisions

Async-First Architecture: All I/O operations use async/await to prevent blocking. This is critical for:

  • Stdio: Non-blocking process communication
  • HTTP/SSE: Concurrent request handling
  • Health checks: Parallel server monitoring

Resource Tracking: Each MCPServerState tracks transport-specific resources:

  • Stdio: subprocess.Process handle
  • HTTP/SSE: aiohttp.ClientSession for connection pooling

Status State Machine:

PENDING → RUNNING → HEALTHY
              ↓         ↓
           UNHEALTHY → RECONNECTING → HEALTHY
                           ↓
                        FAILED

Error Resilience:

  • Individual server failures don't crash the manager
  • Failed servers marked FAILED but kept in registry
  • Reconnection attempts configurable per deployment

This concludes Pass 1. The document now has the foundation, overview, protocol fundamentals, transport descriptions, and the core MCPServerManager class definition.

Next passes will add:

  • Pass 2: Transport-specific implementations (stdio, HTTP, SSE)
  • Pass 3: Health checks and reconnection logic
  • Pass 4: Shutdown, cleanup, and tool loading
  • Pass 5: Configuration, testing, and troubleshooting

Stdio Transport Implementation

The stdio transport manages MCP servers as local subprocesses, communicating via standard input/output streams.

StdioMCPServer Class

"""Stdio-based MCP server implementation.

Manages a subprocess running an MCP server, handling JSON-RPC 2.0
communication over stdin/stdout streams.

Invariants:
    - Process must remain alive for the duration of the session
    - Each request gets a unique incrementing ID
    - Responses matched to requests via ID field

Resource Management:
    - Process terminated on cleanup
    - Streams closed gracefully
    - No orphaned processes allowed
"""

import asyncio
import json
import subprocess
from typing import Any, Dict, List, Optional


class StdioMCPServer:
    """MCP server using stdio (subprocess) transport.
    
    Spawns a subprocess and communicates via JSON-RPC 2.0 over
    stdin/stdout. Handles process lifecycle and stream management.
    
    Example:
        >>> server = StdioMCPServer()
        >>> await server.start(["npx", "-y", "@upstash/context7-mcp"], {})
        >>> result = await server.send_request("tools/list", {})
        >>> await server.shutdown()
    """
    
    def __init__(self):
        """Initialize stdio server (process not started yet)."""
        self._process: Optional[asyncio.subprocess.Process] = None
        self._request_id = 0
        self._lock = asyncio.Lock()
        self._response_queue: asyncio.Queue = asyncio.Queue()
        self._reader_task: Optional[asyncio.Task] = None
    
    async def start(
        self,
        command: List[str],
        env: Optional[Dict[str, str]] = None
    ) -> None:
        """Start subprocess and establish stdio communication.
        
        Args:
            command: Command and arguments to execute
            env: Environment variables for subprocess
            
        Raises:
            RuntimeError: If process fails to start
            FileNotFoundError: If command not found
        """
        full_env = {**os.environ, **(env or {})}
        
        try:
            self._process = await asyncio.create_subprocess_exec(
                *command,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                env=full_env
            )
        except FileNotFoundError as e:
            raise FileNotFoundError(
                f"Command not found: {command[0]}"
            ) from e
        
        # Verify process started
        await asyncio.sleep(0.1)
        if self._process.returncode is not None:
            stderr = await self._process.stderr.read()
            raise RuntimeError(
                f"Process exited immediately: {stderr.decode()}"
            )
        
        # Start background reader
        self._reader_task = asyncio.create_task(self._read_responses())
    
    async def _read_responses(self) -> None:
        """Background task to read JSON-RPC responses from stdout.
        
        Reads line-by-line, parses JSON, and queues responses.
        Terminates when process exits or stdout closes.
        """
        while True:
            try:
                line = await self._process.stdout.readline()
                if not line:
                    break  # EOF
                
                # Parse JSON-RPC response
                response = json.loads(line.decode().strip())
                await self._response_queue.put(response)
                
            except json.JSONDecodeError as e:
                # Log malformed response but continue
                print(f"Invalid JSON from server: {e}")
            except Exception as e:
                print(f"Error reading response: {e}")
                break
    
    async def send_request(
        self,
        method: str,
        params: Dict[str, Any],
        timeout: float = 30.0
    ) -> Dict[str, Any]:
        """Send JSON-RPC request over stdin and await response.
        
        Args:
            method: JSON-RPC method name
            params: Method parameters
            timeout: Response timeout in seconds
            
        Returns:
            JSON-RPC response result
            
        Raises:
            TimeoutError: If response not received within timeout
            RuntimeError: If process died or JSON-RPC error returned
        """
        async with self._lock:
            # Verify process is alive
            if self._process.returncode is not None:
                raise RuntimeError("Process has terminated")
            
            # Create request
            self._request_id += 1
            request = {
                "jsonrpc": "2.0",
                "method": method,
                "params": params,
                "id": self._request_id
            }
            
            # Send request
            request_json = json.dumps(request) + "\n"
            self._process.stdin.write(request_json.encode())
            await self._process.stdin.drain()
            
            # Wait for response
            request_id = self._request_id
        
        # Wait for matching response (outside lock)
        deadline = asyncio.get_event_loop().time() + timeout
        while True:
            remaining = deadline - asyncio.get_event_loop().time()
            if remaining <= 0:
                raise TimeoutError(
                    f"No response for {method} after {timeout}s"
                )
            
            try:
                response = await asyncio.wait_for(
                    self._response_queue.get(),
                    timeout=remaining
                )
            except asyncio.TimeoutError:
                raise TimeoutError(
                    f"No response for {method} after {timeout}s"
                )
            
            # Check if this is our response
            if response.get("id") == request_id:
                if "error" in response:
                    raise RuntimeError(
                        f"JSON-RPC error: {response['error']}"
                    )
                return response.get("result", {})
            
            # Wrong ID, put back and continue
            await self._response_queue.put(response)
            await asyncio.sleep(0.01)
    
    async def shutdown(self, timeout: float = 5.0) -> None:
        """Gracefully shutdown subprocess.
        
        Sends SIGTERM, waits for exit, sends SIGKILL if necessary.
        
        Args:
            timeout: Seconds to wait before SIGKILL
        """
        if not self._process:
            return
        
        # Stop reader task
        if self._reader_task:
            self._reader_task.cancel()
            try:
                await self._reader_task
            except asyncio.CancelledError:
                pass
        
        # Terminate process
        if self._process.returncode is None:
            self._process.terminate()
            try:
                await asyncio.wait_for(
                    self._process.wait(),
                    timeout=timeout
                )
            except asyncio.TimeoutError:
                # Force kill
                self._process.kill()
                await self._process.wait()

Stdio Server Lifecycle

# In MCPServerManager._start_stdio_server()

async def _start_stdio_server(self, state: MCPServerState) -> bool:
    """Start a stdio-based MCP server.
    
    Args:
        state: Server state with stdio configuration
        
    Returns:
        True if started successfully
    """
    config = state.config
    
    try:
        # Create stdio server
        server = StdioMCPServer()
        
        # Start subprocess
        await server.start(
            command=[config.command] + (config.args or []),
            env=config.env
        )
        
        # Store in state
        state.process = server
        state.status = ServerStatus.RUNNING
        
        # Verify with initial request
        tools = await server.send_request("tools/list", {})
        state.tools_loaded = len(tools.get("tools", []))
        state.status = ServerStatus.HEALTHY
        
        return True
        
    except Exception as e:
        state.status = ServerStatus.FAILED
        state.error_message = str(e)
        return False

HTTP Transport Implementation

The HTTP transport connects to remote MCP servers over HTTP/HTTPS using request-response pattern.

HTTPMCPServer Class

"""HTTP-based MCP server implementation.

Communicates with remote MCP servers via HTTP POST requests.
Uses connection pooling for efficiency and supports authentication.

Performance:
    - Connection pooling reduces latency (~50ms per request)
    - Keep-alive connections maintained by aiohttp
    - Automatic retry on transient failures (429, 502, 503, 504)
"""

import aiohttp
from typing import Any, Dict, Optional


class HTTPMCPServer:
    """MCP server using HTTP transport.
    
    Connects to remote MCP server via HTTP, sending JSON-RPC
    requests as POST bodies.
    
    Example:
        >>> server = HTTPMCPServer()
        >>> await server.connect(
        ...     "https://api.example.com/mcp",
        ...     {"Authorization": "Bearer token"}
        ... )
        >>> result = await server.send_request("tools/list", {})
        >>> await server.close()
    """
    
    def __init__(self):
        """Initialize HTTP server (not connected yet)."""
        self._base_url: Optional[str] = None
        self._session: Optional[aiohttp.ClientSession] = None
        self._request_id = 0
    
    async def connect(
        self,
        base_url: str,
        headers: Optional[Dict[str, str]] = None,
        timeout: int = 30
    ) -> None:
        """Establish HTTP connection with connection pooling.
        
        Args:
            base_url: Base URL of MCP server
            headers: Custom HTTP headers (auth, etc.)
            timeout: Default request timeout
        """
        self._base_url = base_url.rstrip('/')
        
        # Create session with connection pooling
        timeout_config = aiohttp.ClientTimeout(total=timeout)
        connector = aiohttp.TCPConnector(
            limit=50,  # Max 50 concurrent connections
            limit_per_host=10,  # Max 10 per host
            ttl_dns_cache=300,  # DNS cache for 5 minutes
            keepalive_timeout=60  # Keep connections alive
        )
        
        self._session = aiohttp.ClientSession(
            headers=headers or {},
            timeout=timeout_config,
            connector=connector
        )
    
    async def send_request(
        self,
        method: str,
        params: Dict[str, Any],
        timeout: Optional[float] = None
    ) -> Dict[str, Any]:
        """Send HTTP POST request with JSON-RPC payload.
        
        Args:
            method: JSON-RPC method name
            params: Method parameters
            timeout: Request timeout (override default)
            
        Returns:
            JSON-RPC response result
            
        Raises:
            ConnectionError: If server unreachable
            TimeoutError: If request times out
            RuntimeError: If JSON-RPC error returned
        """
        if not self._session:
            raise RuntimeError("Not connected - call connect() first")
        
        self._request_id += 1
        request = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params,
            "id": self._request_id
        }
        
        # Determine endpoint (convention: /tools/list → POST /tools/list)
        endpoint = f"{self._base_url}/{method.replace('/', '/')}"
        
        try:
            async with self._session.post(
                endpoint,
                json=request,
                timeout=aiohttp.ClientTimeout(total=timeout)
            ) as response:
                response.raise_for_status()
                result = await response.json()
                
                if "error" in result:
                    raise RuntimeError(
                        f"JSON-RPC error: {result['error']}"
                    )
                
                return result.get("result", {})
                
        except aiohttp.ClientConnectorError as e:
            raise ConnectionError(
                f"Cannot connect to {self._base_url}: {e}"
            ) from e
        except asyncio.TimeoutError as e:
            raise TimeoutError(
                f"Request to {endpoint} timed out"
            ) from e
    
    async def health_check(self) -> bool:
        """GET /health endpoint check.
        
        Returns:
            True if server responds with 200
        """
        if not self._session:
            return False
        
        try:
            async with self._session.get(
                f"{self._base_url}/health",
                timeout=aiohttp.ClientTimeout(total=5)
            ) as response:
                return response.status == 200
        except Exception:
            return False
    
    async def close(self) -> None:
        """Close HTTP session and release connections."""
        if self._session:
            await self._session.close()
            self._session = None

HTTP Server Lifecycle

# In MCPServerManager._connect_http_server()

async def _connect_http_server(self, state: MCPServerState) -> bool:
    """Connect to HTTP-based MCP server.
    
    Args:
        state: Server state with HTTP configuration
        
    Returns:
        True if connected successfully
    """
    config = state.config
    
    try:
        # Create HTTP server
        server = HTTPMCPServer()
        
        # Prepare headers
        headers = dict(config.headers or {})
        if config.auth_token:
            headers["Authorization"] = f"Bearer {config.auth_token}"
        
        # Connect
        await server.connect(
            base_url=config.base_url,
            headers=headers,
            timeout=config.timeout
        )
        
        # Store in state
        state.http_session = server
        state.status = ServerStatus.RUNNING
        
        # Verify with health check
        healthy = await server.health_check()
        if healthy:
            # Load tools
            tools = await server.send_request("tools/list", {})
            state.tools_loaded = len(tools.get("tools", []))
            state.status = ServerStatus.HEALTHY
        else:
            state.status = ServerStatus.UNHEALTHY
            state.error_message = "Health check failed"
        
        return healthy
        
    except Exception as e:
        state.status = ServerStatus.FAILED
        state.error_message = str(e)
        return False

SSE Transport Implementation

The SSE (Server-Sent Events) transport receives streaming updates from MCP servers over a persistent HTTP connection.

SSEMCPServer Class

"""SSE-based MCP server implementation.

Establishes a long-lived HTTP connection to receive server-sent events.
Useful for streaming responses or long-running tool executions.

Characteristics:
    - One-way: Server → Client only
    - Automatic reconnection on disconnect
    - Line-based event stream format
    - Client sends requests via separate HTTP POST

Note:
    SSE is read-only. Requests still sent via HTTP POST,
    but responses/updates streamed back via SSE connection.
"""

import aiohttp
from typing import Any, Callable, Dict, Optional


class SSEMCPServer:
    """MCP server using Server-Sent Events transport.
    
    Maintains persistent connection for streaming server updates
    while sending requests via standard HTTP POST.
    
    Example:
        >>> server = SSEMCPServer()
        >>> await server.connect("https://api.example.com/sse")
        >>> 
        >>> # Start listening for events
        >>> async def handle_event(event):
        ...     print(f"Received: {event}")
        >>> 
        >>> listen_task = asyncio.create_task(
        ...     server.listen_events(handle_event)
        ... )
        >>> 
        >>> # Send request via HTTP
        >>> result = await server.send_request("tools/call", {...})
        >>> 
        >>> await server.close()
    """
    
    def __init__(self):
        """Initialize SSE server (not connected yet)."""
        self._sse_url: Optional[str] = None
        self._http_session: Optional[aiohttp.ClientSession] = None
        self._sse_response: Optional[aiohttp.ClientResponse] = None
        self._request_id = 0
    
    async def connect(
        self,
        sse_url: str,
        http_base_url: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> None:
        """Establish SSE connection for streaming.
        
        Args:
            sse_url: URL for SSE event stream
            http_base_url: Base URL for HTTP requests (default: sse_url)
            headers: Custom headers for both SSE and HTTP
        """
        self._sse_url = sse_url
        self._http_base_url = http_base_url or sse_url.rsplit('/sse', 1)[0]
        
        # Create session
        self._http_session = aiohttp.ClientSession(
            headers=headers or {}
        )
        
        # Establish SSE connection
        self._sse_response = await self._http_session.get(
            sse_url,
            timeout=aiohttp.ClientTimeout(total=None)  # No timeout for SSE
        )

---

## Server Discovery and Registration

Server discovery loads MCP server configurations from the database and YAML files, then registers them with the manager.

### Discovery Flow

```python
"""Load and register MCP servers from configuration sources.

Discovery Order:
    1. Database configurations (highest priority)
    2. YAML files in registry directories
    3. Environment-based overrides
    
Validation:
    - Schema validation against mcp-server.schema.json
    - Duplicate name detection
    - Transport type verification
"""

from pathlib import Path
from typing import List
import yaml


async def discover_and_register_servers(
    manager: MCPServerManager,
    db_pool: DatabasePool,
    config_dirs: List[Path]
) -> Dict[str, bool]:
    """Discover MCP servers and register with manager.
    
    Args:
        manager: MCPServerManager instance
        db_pool: Database connection pool
        config_dirs: Directories to scan for YAML configs
        
    Returns:
        Dict mapping server name to registration success
    """
    configs: List[MCPServerConfig] = []
    
    # 1. Load from database
    db_configs = await load_from_database(db_pool)
    configs.extend(db_configs)
    
    # 2. Load from YAML files
    for config_dir in config_dirs:
        yaml_configs = await load_from_yaml_dir(config_dir)
        configs.extend(yaml_configs)
    
    # 3. Apply environment overrides
    configs = apply_env_overrides(configs)
    
    # 4. Register all servers
    return await manager.initialize_servers(configs)


async def load_from_database(
    db_pool: DatabasePool
) -> List[MCPServerConfig]:
    """Load MCP server configs from database.
    
    Query:
        SELECT name, spec FROM mcp_servers WHERE enabled = true
    """
    async with db_pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT name, spec FROM mcp_servers WHERE enabled = true"
        )
        
        configs = []
        for row in rows:
            config = MCPServerConfig(
                name=row['name'],
                transport=TransportType(row['spec']['transport']),
                # ... parse spec fields
            )
            configs.append(config)
        
        return configs


async def load_from_yaml_dir(config_dir: Path) -> List[MCPServerConfig]:
    """Load MCP server configs from YAML directory.
    
    Scans for *.yml and *.yaml files, validates schema.
    """
    configs = []
    
    for yaml_file in config_dir.glob("*.y*ml"):
        with open(yaml_file) as f:
            data = yaml.safe_load(f)
        
        # Validate schema
        if data.get('kind') != 'MCPServer':
            continue
        
        config = MCPServerConfig(
            name=data['metadata']['name'],
            transport=TransportType(data['spec']['provider']),
            # ... parse configuration
        )
        configs.append(config)
    
    return configs

Health Check Implementation

Continuous health monitoring ensures servers remain responsive and automatically detects failures.

ServerHealthChecker Class

"""Health checking for all MCP server types.

Implements transport-specific health verification with
configurable intervals and failure thresholds.

Failure Detection:
    - Stdio: Process alive + responsive to ping
    - HTTP: GET /health returns 200
    - SSE: Connection active + events flowing
    
Thresholds:
    - 3 consecutive failures → Mark UNHEALTHY
    - 5 consecutive failures → Trigger reconnection
"""

from datetime import datetime, timedelta
from typing import Dict, Optional


class ServerHealthChecker:
    """Health checking for all MCP server types.
    
    Monitors server health via transport-specific checks,
    tracks failure counts, and triggers recovery actions.
    
    Example:
        >>> checker = ServerHealthChecker(manager)
        >>> await checker.check_all_servers()
        >>> health_status = checker.get_health_summary()
    """
    
    def __init__(
        self,
        manager: MCPServerManager,
        check_interval: int = 60,
        failure_threshold: int = 3
    ):
        """Initialize health checker.
        
        Args:
            manager: MCPServerManager to monitor
            check_interval: Seconds between checks
            failure_threshold: Failures before marking unhealthy
        """
        self._manager = manager
        self._check_interval = check_interval
        self._failure_threshold = failure_threshold
    
    async def check_stdio_server(
        self,
        server: StdioMCPServer,
        state: MCPServerState
    ) -> bool:
        """Verify subprocess is alive and responsive.
        
        Checks:
            1. Process returncode is None (still running)
            2. Can send/receive JSON-RPC ping
            
        Args:
            server: Stdio server instance
            state: Server state tracking
            
        Returns:
            True if healthy
        """
        # Check process alive
        if server._process.returncode is not None:
            return False
        
        try:
            # Send ping request
            result = await server.send_request(
                "ping",
                {},
                timeout=5.0
            )
            return True
            
        except Exception as e:
            print(f"Stdio health check failed: {e}")
            return False
    
    async def check_http_server(
        self,
        server: HTTPMCPServer,
        state: MCPServerState
    ) -> bool:
        """HTTP GET /health endpoint check.
        
        Expected Response:
            HTTP 200 OK
            {"status": "healthy"}
            
        Args:
            server: HTTP server instance
            state: Server state tracking
            
        Returns:
            True if returns 200
        """
        return await server.health_check()
    
    async def check_sse_server(
        self,
        server: SSEMCPServer,
        state: MCPServerState
    ) -> bool:
        """Verify SSE connection is active.
        
        Checks:
            1. HTTP session exists
            2. SSE response stream open
            3. Received event in last 2 minutes (heartbeat)
            
        Args:
            server: SSE server instance
            state: Server state tracking
            
        Returns:
            True if connection active
        """
        if not server._sse_response:
            return False
        
        if server._sse_response.closed:
            return False
        
        # Check recent activity
        if state.last_health_check:
            age = datetime.now() - state.last_health_check
            if age > timedelta(minutes=2):
                # No events for 2 minutes, likely stale
                return False
        
        return True
    
    async def check_all_servers(self) -> Dict[str, bool]:
        """Check health of all registered servers.
        
        Updates server state with health status and
        increments failure counters.
        
        Returns:
            Dict mapping server name to health status
        """
        results = {}
        
        async with self._manager._lock:
            for name, state in self._manager._servers.items():
                healthy = await self._check_server(state)
                results[name] = healthy
                
                # Update state
                state.last_health_check = datetime.now()
                
                if healthy:
                    state.consecutive_failures = 0
                    if state.status != ServerStatus.HEALTHY:
                        state.status = ServerStatus.HEALTHY
                else:
                    state.consecutive_failures += 1
                    
                    # Mark unhealthy after threshold
                    if state.consecutive_failures >= self._failure_threshold:
                        state.status = ServerStatus.UNHEALTHY
        
        return results
    
    async def _check_server(self, state: MCPServerState) -> bool:
        """Check single server based on transport type."""
        config = state.config
        
        try:
            if config.transport == TransportType.STDIO:
                return await self.check_stdio_server(
                    state.process,
                    state
                )
            elif config.transport == TransportType.HTTP:
                return await self.check_http_server(
                    state.http_session,
                    state
                )
            elif config.transport == TransportType.SSE:
                return await self.check_sse_server(
                    state.http_session,
                    state
                )
        except Exception as e:
            print(f"Health check error for {config.name}: {e}")
            return False

Health Check Loop

# In MCPServerManager

async def _health_check_loop(self) -> None:
    """Background task for continuous health monitoring.
    
    Runs until shutdown event is set. Checks all servers
    at configured interval and triggers reconnection for
    unhealthy servers.
    """
    checker = ServerHealthChecker(
        self,
        check_interval=self._health_check_interval,
        failure_threshold=3
    )
    
    while not self._shutdown_event.is_set():
        try:
            # Check all servers
            health_status = await checker.check_all_servers()
            
            # Trigger reconnection for failed servers
            for name, healthy in health_status.items():
                if not healthy:
                    state = self._servers[name]
                    if state.consecutive_failures >= 5:
                        # Reconnect after 5 failures
                        await self.reconnect_server(name)
            
            # Wait for next interval
            await asyncio.sleep(self._health_check_interval)
            
        except asyncio.CancelledError:
            break
        except Exception as e:
            print(f"Health check loop error: {e}")
            await asyncio.sleep(5)  # Brief pause on error

Connection Pool Management

Connection pooling for HTTP/SSE servers reduces latency and manages resource limits.

MCPConnectionPool Class

"""Connection pool for HTTP/SSE MCP servers.

Maintains a pool of reusable connections with:
    - Maximum connection limits
    - Connection lifetime management
    - Automatic cleanup of stale connections
    - Pool statistics and monitoring

Performance:
    Reduces connection overhead from ~50ms to <5ms for
    subsequent requests by reusing TCP connections.
"""

from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional
import aiohttp


@dataclass
class PooledConnection:
    """A single pooled connection."""
    session: aiohttp.ClientSession
    created_at: datetime
    last_used: datetime
    use_count: int = 0


class MCPConnectionPool:
    """Connection pool for HTTP/SSE MCP servers.
    
    Manages connection lifecycle with configurable limits
    and automatic cleanup of stale connections.
    
    Example:
        >>> pool = MCPConnectionPool(max_per_server=10)
        >>> session = await pool.acquire("weather-api")
        >>> # ... use session ...
        >>> await pool.release("weather-api", session)
    """
    
    def __init__(
        self,
        max_per_server: int = 10,
        max_lifetime: int = 3600,  # 1 hour
        idle_timeout: int = 300     # 5 minutes
    ):
        """Initialize connection pool.
        
        Args:
            max_per_server: Max connections per server
            max_lifetime: Max connection age in seconds
            idle_timeout: Max idle time before cleanup
        """
        self._max_per_server = max_per_server
        self._max_lifetime = max_lifetime
        self._idle_timeout = idle_timeout
        
        # Pool storage: server_name → [PooledConnection]
        self._pools: Dict[str, List[PooledConnection]] = defaultdict(list)
        self._lock = asyncio.Lock()
    
    async def acquire(
        self,
        server_name: str,
        headers: Optional[Dict[str, str]] = None
    ) -> aiohttp.ClientSession:
        """Acquire connection from pool.
        
        Returns existing connection if available, creates new
        if pool not full, raises error if at limit.
        
        Args:
            server_name: Server identifier
            headers: Headers for new connections
            
        Returns:
            Active ClientSession
            
        Raises:
            RuntimeError: If pool exhausted
        """
        async with self._lock:
            pool = self._pools[server_name]
            
            # Clean up stale connections first
            await self._cleanup_stale(server_name)
            
            # Try to reuse existing connection
            for conn in pool:
                age = datetime.now() - conn.last_used
                if age.total_seconds() < self._idle_timeout:
                    conn.last_used = datetime.now()
                    conn.use_count += 1
                    return conn.session
            
            # Create new if under limit
            if len(pool) < self._max_per_server:
                session = aiohttp.ClientSession(headers=headers or {})
                conn = PooledConnection(
                    session=session,
                    created_at=datetime.now(),
                    last_used=datetime.now(),
                    use_count=1
                )
                pool.append(conn)
                return session
            
            # Pool exhausted
            raise RuntimeError(
                f"Connection pool exhausted for {server_name}"
            )
    
    async def release(
        self,
        server_name: str,
        session: aiohttp.ClientSession

---

## Graceful Shutdown

Graceful shutdown ensures all MCP servers and resources are cleanly terminated without data loss or orphaned processes.

### Shutdown Sequence

```python
"""Graceful shutdown of all MCP servers.

Shutdown Order (critical for correctness):
    1. Stop accepting new requests
    2. Wait for active requests to complete (timeout: 30s)
    3. Stop health check loop
    4. Close HTTP/SSE connections
    5. Terminate stdio subprocesses (SIGTERM → SIGKILL)
    6. Close connection pools
    7. Update server status in database

Invariants:
    - No orphaned processes after shutdown
    - All network connections closed
    - No resource leaks
    - Database state reflects shutdown
"""

# In MCPServerManager

async def shutdown_all(self, timeout: float = 30.0) -> None:
    """Gracefully shutdown all MCP servers.
    
    Implements multi-phase shutdown to ensure clean termination:
        1. Signal shutdown to prevent new operations
        2. Wait for active operations to complete
        3. Stop background tasks (health checks)
        4. Close all server connections
        5. Release all resources
    
    Args:
        timeout: Maximum seconds to wait for graceful shutdown
        
    Example:
        >>> manager = MCPServerManager(...)
        >>> await manager.initialize_servers(configs)
        >>> # ... use servers ...
        >>> await manager.shutdown_all()
    """
    print("Starting graceful shutdown of MCP servers...")
    
    # Phase 1: Signal shutdown
    self._shutdown_event.set()
    
    # Phase 2: Stop health check loop
    if self._health_check_task:
        self._health_check_task.cancel()
        try:
            await asyncio.wait_for(
                self._health_check_task,
                timeout=5.0
            )
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass
    
    # Phase 3: Wait for active requests to complete
    deadline = asyncio.get_event_loop().time() + timeout
    while True:
        # Check if any servers have active operations
        has_active = await self._has_active_operations()
        if not has_active:
            break
        
        remaining = deadline - asyncio.get_event_loop().time()
        if remaining <= 0:
            print("Warning: Timeout waiting for active operations")
            break
        
        await asyncio.sleep(0.5)
    
    # Phase 4: Shutdown servers by transport type
    async with self._lock:
        shutdown_tasks = []
        
        for name, state in self._servers.items():
            task = self._shutdown_server(state)
            shutdown_tasks.append(task)
        
        # Execute shutdowns concurrently
        results = await asyncio.gather(
            *shutdown_tasks,
            return_exceptions=True
        )
        
        # Log any shutdown errors
        for name, result in zip(self._servers.keys(), results):
            if isinstance(result, Exception):
                print(f"Error shutting down {name}: {result}")
    
    # Phase 5: Close connection pool
    if hasattr(self, '_connection_pool'):
        await self._connection_pool.close_all()
    
    print("MCP server shutdown complete")


async def _has_active_operations(self) -> bool:
    """Check if any servers have active operations.
    
    Used during shutdown to wait for ongoing requests.
    
    Returns:
        True if any server is processing requests
    """
    # Implementation depends on request tracking
    # For simplicity, assume no tracking needed if health checks stopped
    return False


async def _shutdown_server(self, state: MCPServerState) -> None:
    """Shutdown a single MCP server.
    
    Transport-specific cleanup procedures.
    
    Args:
        state: Server state to shutdown
    """
    config = state.config
    
    try:
        if config.transport == TransportType.STDIO:
            await self._shutdown_stdio_server(state)
        elif config.transport == TransportType.HTTP:
            await self._shutdown_http_server(state)
        elif config.transport == TransportType.SSE:
            await self._shutdown_sse_server(state)
        
        state.status = ServerStatus.STOPPED
        
    except Exception as e:
        print(f"Error shutting down {config.name}: {e}")
        state.status = ServerStatus.FAILED
        state.error_message = str(e)


async def _shutdown_stdio_server(self, state: MCPServerState) -> None:
    """Shutdown stdio server subprocess.
    
    Termination sequence:
        1. Send SIGTERM
        2. Wait up to 5 seconds
        3. Send SIGKILL if still alive
        4. Reap zombie process
    """
    if not state.process:
        return
    
    server = state.process
    
    try:
        await server.shutdown(timeout=5.0)
        print(f"Stdio server {state.config.name} terminated")
    except Exception as e:
        print(f"Error terminating {state.config.name}: {e}")
    finally:
        state.process = None


async def _shutdown_http_server(self, state: MCPServerState) -> None:
    """Shutdown HTTP server connection.
    
    Closes ClientSession gracefully, releasing all connections.
    """
    if not state.http_session:
        return
    
    server = state.http_session
    
    try:
        await server.close()
        print(f"HTTP server {state.config.name} disconnected")
    except Exception as e:
        print(f"Error closing HTTP {state.config.name}: {e}")
    finally:
        state.http_session = None


async def _shutdown_sse_server(self, state: MCPServerState) -> None:
    """Shutdown SSE server connection.
    
    Closes both SSE stream and HTTP session.
    """
    if not state.http_session:
        return
    
    server = state.http_session
    
    try:
        await server.close()
        print(f"SSE server {state.config.name} disconnected")
    except Exception as e:
        print(f"Error closing SSE {state.config.name}: {e}")
    finally:
        state.http_session = None

Shutdown with Signal Handling

"""Handle OS signals for graceful shutdown.

Catches SIGTERM and SIGINT to trigger clean shutdown.
Critical for production deployments where container
orchestrators send termination signals.
"""

import signal


def setup_signal_handlers(manager: MCPServerManager) -> None:
    """Setup OS signal handlers for graceful shutdown.
    
    Registers handlers for SIGTERM and SIGINT that trigger
    manager shutdown.
    
    Args:
        manager: MCPServerManager to shutdown on signal
    """
    loop = asyncio.get_event_loop()
    
    def signal_handler(sig):
        print(f"Received signal {sig}, initiating shutdown...")
        asyncio.create_task(manager.shutdown_all())
    
    # Handle SIGTERM (container orchestrator)
    loop.add_signal_handler(
        signal.SIGTERM,
        lambda: signal_handler(signal.SIGTERM)
    )
    
    # Handle SIGINT (Ctrl+C)
    loop.add_signal_handler(
        signal.SIGINT,
        lambda: signal_handler(signal.SIGINT)
    )


# In main application
async def main():
    manager = MCPServerManager(...)
    
    # Setup signal handlers
    setup_signal_handlers(manager)
    
    # Initialize servers
    await manager.initialize_servers(configs)
    
    # Keep running until shutdown
    await manager._shutdown_event.wait()

Resource Cleanup

Comprehensive resource cleanup prevents leaks and ensures proper termination.

Cleanup Procedures

"""Resource cleanup on server failure or shutdown.

Handles cleanup for:
    - Zombie subprocesses (stdio)
    - Stale HTTP connections
    - Connection pool resources
    - Database connection state
    - File descriptors and sockets
"""

async def cleanup_failed_server(
    state: MCPServerState,
    connection_pool: MCPConnectionPool
) -> None:
    """Clean up resources after server failure.
    
    Comprehensive cleanup ensuring no resource leaks:
        - Terminate processes forcefully if needed
        - Close all network connections
        - Remove from connection pool
        - Update database status
    
    Args:
        state: Failed server state
        connection_pool: Connection pool to clean from
    """
    config = state.config
    
    # 1. Clean up zombie processes (stdio)
    if config.transport == TransportType.STDIO and state.process:
        try:
            if state.process._process.returncode is None:
                # Force kill if still alive
                state.process._process.kill()
                await state.process._process.wait()
                print(f"Force-killed zombie process: {config.name}")
        except Exception as e:
            print(f"Error cleaning zombie: {e}")
        finally:
            state.process = None
    
    # 2. Close stale HTTP connections
    if state.http_session:
        try:
            # Close without waiting for pending requests
            await asyncio.wait_for(
                state.http_session.close(),
                timeout=2.0
            )
        except asyncio.TimeoutError:
            print(f"Force-closed stale connection: {config.name}")
        except Exception as e:
            print(f"Error closing connection: {e}")
        finally:
            state.http_session = None
    
    # 3. Release connection pool resources
    if config.transport in (TransportType.HTTP, TransportType.SSE):
        # Remove server from pool
        if config.name in connection_pool._pools:
            pool = connection_pool._pools[config.name]
            for conn in pool:
                try:
                    await conn.session.close()
                except Exception:
                    pass
            del connection_pool._pools[config.name]
    
    # 4. Update database status
    await update_server_status_in_db(config.name, ServerStatus.FAILED)
    
    print(f"Cleaned up resources for {config.name}")


async def update_server_status_in_db(
    server_name: str,
    status: ServerStatus
) -> None:
    """Update MCP server status in database.
    
    Persists server state for auditing and monitoring.
    
    Args:
        server_name: Server identifier
        status: New status to persist
    """
    # Pseudo-code for database update
    async with db_pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE mcp_servers
            SET status = $1, updated_at = NOW()
            WHERE name = $2
            """,
            status.value,
            server_name
        )

Tool Loading Integration

Integration with the Agent Framework to load MCP tools into ChatAgent instances.

MCPToolLoader Class

"""Load tools from MCP servers into Agent Framework.

Bridges MCP protocol with Microsoft Agent Framework by:
    - Discovering tools via MCP protocol
    - Converting to Agent Framework tool format
    - Registering with ChatAgent instances
    - Handling tool invocation delegation
"""

from azure.ai.projects.models import FunctionTool
from typing import List


class MCPToolLoader:
    """Load tools from MCP servers into Agent Framework.
    
    Discovers tools from MCP servers and registers them with
    ChatAgent instances for agent use.
    
    Example:
        >>> loader = MCPToolLoader(manager)
        >>> tools = await loader.load_tools_from_server("context7")
        >>> await loader.register_tools_with_agent(agent, tools)
    """
    

---

## Configuration Examples

Complete YAML configuration examples for all transport types.

### Stdio Server Configuration

```yaml
# examples/mcp-servers/local-filesystem.yml
apiVersion: mcp-servers.liveai.io/v1alpha1
kind: MCPServer
metadata:
  name: filesystem-tools
  labels:
    type: utility
    environment: development
    transport: stdio

spec:
  displayName: "Filesystem Tools Server"
  description: "Local filesystem access via MCP"
  provider: stdio
  
  configuration:
    command: "npx"
    args: ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"]
    workingDir: "/home/user/projects"
    environment:
      NODE_ENV: "development"
      LOG_LEVEL: "debug"
  
  discovery:
    enabled: true
    refreshInterval: "5m"
    onStartup: true
    fallbackToStatic: false

HTTP Server Configuration

# examples/mcp-servers/weather-api-http.yml
apiVersion: mcp-servers.liveai.io/v1alpha1
kind: MCPServer
metadata:
  name: weather-api
  labels:
    type: api
    environment: production
    transport: http

spec:
  displayName: "Weather API Server"
  description: "Weather data via HTTP MCP server"
  provider: http
  
  configuration:
    baseUrl: "https://weather-mcp.example.com/api"
    timeout: 60
    headers:
      X-API-Version: "v1"
      X-Client-ID: "liveai-framework"
    retries:
      maxAttempts: 3
      backoff: "exponential"
      initialDelay: 1000
  
  authentication:
    type: "bearer"
    credentials:
      token: "${WEATHER_API_TOKEN}"
  
  discovery:
    enabled: true
    refreshInterval: "1h"
    onStartup: true
    fallbackToStatic: true
  
  tools:
    - name: "get-current-weather"
      description: "Get current weather for a city"
      parameters:
        city:
          type: "string"
          description: "City name"
          required: true
        units:
          type: "string"
          description: "Temperature units"
          required: false
          default: "metric"
          enum: ["metric", "imperial"]

SSE Server Configuration

# examples/mcp-servers/event-stream-sse.yml
apiVersion: mcp-servers.liveai.io/v1alpha1
kind: MCPServer
metadata:
  name: analytics-stream
  labels:
    type: streaming
    environment: production
    transport: sse

spec:
  displayName: "Analytics Event Stream"
  description: "Real-time analytics via SSE"
  provider: http  # SSE uses HTTP transport
  
  configuration:
    baseUrl: "https://analytics-mcp.example.com"
    transport: "sse"  # Specify SSE explicitly
    timeout: 300  # Longer for streaming
    headers:
      X-Stream-Type: "analytics"
  
  authentication:
    type: "api-key"
    credentials:
      key: "X-API-Key"
      value: "${ANALYTICS_API_KEY}"
  
  discovery:
    enabled: true
    refreshInterval: "30m"
    onStartup: true
    fallbackToStatic: true

Environment Variables File

# .env - Local development
WEATHER_API_TOKEN=your-weather-token-here
ANALYTICS_API_KEY=your-analytics-key-here
CONTEXT7_API_KEY=your-context7-key-here

# Database
DATABASE_URL=postgresql://localhost:5432/liveai

# Azure AI
AZURE_AI_PROJECT_ENDPOINT=https://your-project.cognitiveservices.azure.com/

Complete Working Example

End-to-end example demonstrating the full MCP server lifecycle.

Complete Integration

"""Complete MCP server lifecycle example.

Demonstrates:
    - Server initialization with multiple transports
    - Health monitoring
    - Tool loading into agents
    - Graceful shutdown

Run:
    python examples/mcp_lifecycle_demo.py
"""

import asyncio
import os
from pathlib import Path
from typing import List

from azure.ai.projects.aio import AIProjectClient
from azure.identity.aio import DefaultAzureCredential

# Import MCP components (pseudo-imports)
from mcp_server_lifecycle import (
    MCPServerManager,
    MCPServerConfig,
    TransportType,
    MCPToolLoader,
    setup_signal_handlers
)


async def main():
    """Run complete MCP lifecycle demonstration."""
    
    # 1. Initialize Azure AI client
    credential = DefaultAzureCredential()
    ai_client = AIProjectClient(
        endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
        credential=credential
    )
    
    # 2. Create MCP server manager
    manager = MCPServerManager(
        ai_client=ai_client,
        health_check_interval=60,
        max_reconnect_attempts=3
    )
    
    # 3. Setup graceful shutdown
    setup_signal_handlers(manager)
    
    # 4. Define server configurations
    configs = [
        # Stdio server - local development
        MCPServerConfig(
            name="context7",
            transport=TransportType.STDIO,
            command="npx",
            args=["-y", "@upstash/context7-mcp"],
            env={"NODE_ENV": "production"}
        ),
        
        # HTTP server - remote API
        MCPServerConfig(
            name="weather-api",
            transport=TransportType.HTTP,
            base_url="https://weather-mcp.example.com/api",
            timeout=60,
            headers={"X-Client-ID": "liveai"},
            auth_token=os.environ.get("WEATHER_API_TOKEN")
        ),
        
        # SSE server - streaming
        MCPServerConfig(
            name="analytics",
            transport=TransportType.SSE,
            base_url="https://analytics.example.com",
            timeout=300,
            auth_token=os.environ.get("ANALYTICS_API_KEY")
        )
    ]
    
    # 5. Initialize all servers
    print("Initializing MCP servers...")
    init_results = await manager.initialize_servers(configs)
    
    for server_name, success in init_results.items():
        status = "✓" if success else "✗"
        print(f"  {status} {server_name}")
    
    # 6. Verify health
    print("\nChecking server health...")
    health_status = await manager.health_check_all()
    
    for server_name, healthy in health_status.items():
        status = "Healthy" if healthy else "Unhealthy"
        print(f"  {server_name}: {status}")
    
    # 7. Load tools into agent
    print("\nLoading MCP tools into agent...")
    
    agent = await ai_client.agents.create_agent(
        model="gpt-4",
        name="MCP Demo Agent",
        instructions="""
        You are an assistant with access to documentation,
        weather data, and analytics via MCP tools.
        """
    )
    
    tool_loader = MCPToolLoader(manager)
    
    for server_name in ["context7", "weather-api"]:
        tools = await tool_loader.load_tools_from_server(server_name)
        await tool_loader.register_tools_with_agent(agent, tools)
        print(f"  Loaded {len(tools)} tools from {server_name}")
    
    # 8. Use agent with MCP tools
    print("\nRunning agent with MCP tools...")
    
    thread = await ai_client.agents.create_thread()
    
    await ai_client.agents.create_message(
        thread_id=thread.id,
        role="user",
        content="What's the weather in San Francisco and find React documentation on hooks?"
    )
    
    run = await ai_client.agents.create_and_process_run(
        thread_id=thread.id,
        assistant_id=agent.id
    )
    
    messages = await ai_client.agents.list_messages(thread.id)
    print(f"\nAgent response:\n{messages.data[0].content[0].text.value}")
    
    # 9. Monitor for a while
    print("\nMonitoring servers (press Ctrl+C to stop)...")
    
    try:
        # Run until interrupted
        await manager._shutdown_event.wait()
    except KeyboardInterrupt:
        print("\nShutdown signal received...")
    
    # 10. Graceful shutdown
    print("\nShutting down MCP servers...")
    await manager.shutdown_all()
    
    print("✓ Shutdown complete")
    
    await ai_client.close()
    await credential.close()


if __name__ == "__main__":
    asyncio.run(main())

Expected Output

Initializing MCP servers...
  ✓ context7
  ✓ weather-api
  ✓ analytics

Checking server health...
  context7: Healthy
  weather-api: Healthy
  analytics: Healthy

Loading MCP tools into agent...
  Loaded 2 tools from context7
  Loaded 3 tools from weather-api

Running agent with MCP tools...

Agent response:
The current weather in San Francisco is 18°C (64°F) with partly cloudy 
skies. Regarding React hooks, they are functions that let you use state 
and lifecycle features in functional components. The most common hooks 
are useState for state management and useEffect for side effects...

Monitoring servers (press Ctrl+C to stop)...
^C
Shutdown signal received...

Shutting down MCP servers...
Stdio server context7 terminated
HTTP server weather-api disconnected
SSE server analytics disconnected
✓ Shutdown complete

Testing

Comprehensive testing strategy for MCP server lifecycle.

Unit Tests

"""Unit tests for MCP server lifecycle components.

Run:
    pytest tests/test_mcp_lifecycle.py -v
"""

import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from mcp_server_lifecycle import (
    MCPServerManager,
    MCPServerConfig,
    TransportType,
    ServerStatus,
    StdioMCPServer
)


@pytest.fixture
async def mock_ai_client():
    """Mock Azure AI client."""
    client = AsyncMock()
    return client


@pytest.fixture
async def manager(mock_ai_client):
    """Create MCPServerManager for testing."""
    return MCPServerManager(
        ai_client=mock_ai_client,
        health_check_interval=1,
        max_reconnect_attempts=2
    )


@pytest.mark.asyncio
async def test_stdio_server_initialization(manager):
    """Test stdio server starts successfully."""
    config = MCPServerConfig(
        name="test-stdio",
        transport=TransportType.STDIO,
        command="echo",
        args=["test"]
    )
    
    with patch('asyncio.create_subprocess_exec') as mock_subprocess:
        # Mock successful process
        mock_process = AsyncMock()
        mock_process.returncode = None
        mock_process.stdout.readline.return_value = b'{"result": {}}\n'
        mock_subprocess.return_value = mock_process
        
        results = await manager.initialize_servers([config])
        
        assert results["test-stdio"] is True
        assert manager._servers["test-stdio"].status == ServerStatus.HEALTHY


@pytest.mark.asyncio
async def test_http_server_connection(manager):
    """Test HTTP server connects successfully."""
    config = MCPServerConfig(
        name="test-http",
        transport=TransportType.HTTP,
        base_url="https://test.example.com",
        timeout=30
    )
    
    with patch('aiohttp.ClientSession') as mock_session:
        # Mock successful connection
        mock_response = AsyncMock()
        mock_response.status = 200
        mock_session.return_value.get.return_value.__aenter__.return_value = mock_response
        
        results = await manager.initialize_servers([config])
        
        assert results["test-http"] is True


@pytest.mark.asyncio
async def test_health_check_failure_triggers_reconnect(manager):
    """Test automatic reconnection on health check failure."""
    config = MCPServerConfig(
        name="test-server",
        transport=TransportType.HTTP,
        base_url="https://test.example.com"
    )
    
    # Initialize server
    await manager.initialize_servers([config])
    state = manager._servers["test-server"]
    
    # Simulate health check failures
    state.consecutive_failures = 5
    state.status = ServerStatus.UNHEALTHY
    
    with patch.object(manager, 'reconnect_server') as mock_reconnect:
        mock_reconnect.return_value = True
        
        # Trigger health check
        await manager._health_check_loop()
        
        # Should trigger reconnection
        mock_reconnect.assert_called_once_with("test-server")


@pytest.mark.asyncio
async def test_graceful_shutdown_all_servers(manager):
    """Test graceful shutdown terminates all servers."""
    configs = [
        MCPServerConfig(
            name="stdio-server",
            transport=TransportType.STDIO,
            command="echo"
        ),
        MCPServerConfig(
            name="http-server",
            transport=TransportType.HTTP,
            base_url="https://test.example.com"
        )
    ]
    
    await manager.initialize_servers(configs)
    
    # Shutdown
    await manager.shutdown_all()
    
    # Verify all stopped
    for state in manager._servers.values():
        assert state.status == ServerStatus.STOPPED
        assert state.process is None or state.http_session is None


@pytest.mark.asyncio
async def test_exponential_backoff_reconnection():
    """Test reconnection uses exponential backoff."""
    from mcp_server_lifecycle import reconnect_with_backoff
    
    manager = AsyncMock()
    
    with patch('asyncio.sleep') as mock_sleep:
        # Simulate all retries failing
        manager._servers = {
            "test": MagicMock(config=MagicMock(transport=TransportType.HTTP))
        }
        manager._start_stdio_server.return_value = False
        manager._connect_http_server.return_value = False
        
        result = await reconnect_with_backoff(
            "test",
            manager,
            max_retries=3,
            base_delay=1.0
        )
        
        # Verify exponential delays: 1s, 2s, 4s
        assert mock_sleep.call_count == 2  # Between retries
        delays = [call.args[0] for call in mock_sleep.call_args_list]
        assert delays == [1.0, 2.0]
        assert result is False


# Run tests
# pytest tests/test_mcp_lifecycle.py -v --cov=mcp_server_lifecycle

Integration Tests

"""Integration tests with real MCP servers.

Requires:
    - npx installed
    - @upstash/context7-mcp package available
"""

import pytest


@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_stdio_server_lifecycle(mock_ai_client):
    """Test with actual stdio MCP server."""
    manager = MCPServerManager(mock_ai_client)
    
    config = MCPServerConfig(
        name="context7-real",
        transport=TransportType.STDIO,
        command="npx",
        args=["-y", "@upstash/context7-mcp"]
    )
    
    # Initialize
    results = await manager.initialize_servers([config])
    assert results["context7-real"] is True
    
    # Verify tools loaded
    state = manager._servers["context7-real"]
    assert state.tools_loaded > 0
    
    # Health check
    health = await manager.health_check_all()
    assert health["context7-real"] is True
    
    # Shutdown
    await manager.shutdown_all()
    assert state.status == ServerStatus.STOPPED


# Run integration tests
# pytest tests/test_mcp_lifecycle.py -v -m integration

Performance Considerations

Connection Pool Sizing

"""Optimize connection pool for your workload.

Rules of thumb:
    - max_per_server: 10-20 for typical workloads
    - max_lifetime: 1 hour (3600s) default
    - idle_timeout: 5 minutes (300s) default
    
Adjust based on:
    - Request rate
    - Server capacity
    - Network latency
"""

# Low traffic (< 10 req/min)
pool = MCPConnectionPool(
    max_per_server=5,
    max_lifetime=1800,  # 30 min
    idle_timeout=180     # 3 min
)

# Medium traffic (10-100 req/min)
pool = MCPConnectionPool(
    max_per_server=10,
    max_lifetime=3600,  # 1 hour
    idle_timeout=300     # 5 min
)

# High traffic (> 100 req/min)
pool = MCPConnectionPool(
    max_per_server=20,
    max_lifetime=7200,  # 2 hours
    idle_timeout=600     # 10 min
)

Health Check Frequency

"""Balance between responsiveness and overhead.

Typical values:
    - Development: 10-30 seconds
    - Production: 60-300 seconds
    
Consider:
    - Server reliability (more checks for unreliable servers)
    - Network latency (longer intervals for slow networks)
    - Cost (each check is a request)
"""

# Quick failure detection (development)
manager = MCPServerManager(
    ai_client=client,
    health_check_interval=10  # Every 10 seconds
)

# Balanced (production)
manager = MCPServerManager(
    ai_client=client,
    health_check_interval=60  # Every minute
)

# Conservative (stable servers)
manager = MCPServerManager(
    ai_client=client,
    health_check_interval=300  # Every 5 minutes
)

Resource Limits

"""Prevent resource exhaustion.

Set limits on:
    - Maximum concurrent servers
    - Maximum connections per server
    - Request timeouts
    - Subprocess limits
"""

# Global limits
MAX_MCP_SERVERS = 50
MAX_CONNECTIONS_PER_SERVER = 20
MAX_SUBPROCESS_COUNT = 10

# Enforce in manager
class MCPServerManager:
    def __init__(self, ai_client, max_servers=50):
        self._max_servers = max_servers
        # ...
    
    async def initialize_servers(self, configs):
        if len(configs) > self._max_servers:
            raise ValueError(
                f"Too many servers: {len(configs)} > {self._max_servers}"
            )
        # ...

Troubleshooting

Common Issues and Solutions

Issue: Stdio Process Exits Immediately

Symptom:

RuntimeError: Process exited immediately: command not found

Causes:

  • Command not in PATH
  • Missing dependencies
  • Incorrect arguments

Solutions:

# 1. Verify command exists
which npx
command -v npx

# 2. Test command manually
npx -y @upstash/context7-mcp

# 3. Use full path
spec:
  configuration:
    command: "/usr/local/bin/npx"

# 4. Check environment
env | grep PATH

Issue: HTTP Connection Timeout

Symptom:

TimeoutError: Request to https://api.example.com/tools/list timed out

Causes:

  • Network issues
  • Server overloaded
  • Firewall blocking

Solutions:

# 1. Increase timeout
config = MCPServerConfig(
    name="slow-server",
    transport=TransportType.HTTP,
    base_url="https://api.example.com",
    timeout=120  # 2 minutes
)

# 2. Check connectivity
curl -v https://api.example.com/health

# 3. Verify no proxy issues
export no_proxy=api.example.com

Issue: Health Check Failing

Symptom:

Server marked UNHEALTHY after 3 consecutive failures

Causes:

  • Server actually down
  • Network intermittent
  • Health endpoint incorrect

Solutions:

# 1. Verify health endpoint
curl https://api.example.com/health

# 2. Adjust failure threshold
checker = ServerHealthChecker(
    manager,
    failure_threshold=5  # More tolerant
)

# 3. Check server logs
# Look for actual server issues

Issue: Reconnection Loop

Symptom:

Reconnection attempt 1/5 for weather-api
Reconnection attempt 2/5 for weather-api
...
Reconnection failed for weather-api

Causes:

  • Server permanently down
  • Authentication failed
  • Network unreachable

Solutions:

# 1. Check server status manually
curl -H "Authorization: Bearer $TOKEN" \
  https://api.example.com/health

# 2. Verify credentials
echo $WEATHER_API_TOKEN

# 3. Disable auto-reconnect if needed
manager._max_reconnect_attempts = 0

# 4. Check server logs for auth errors

Issue: Tool Loading Fails

Symptom:

ValueError: Server not healthy: context7 (status: unhealthy)

Causes:

  • Server not initialized
  • Discovery failed
  • Server crashed

Solutions:

# 1. Check server status
health = await manager.health_check_all()
print(health)

# 2. Manually reconnect
await manager.reconnect_server("context7")

# 3. Use static tool definitions as fallback
spec:
  discovery:
    enabled: true
    fallbackToStatic: true
  tools:
    - name: "resolve-library-id"
      # ... static definition

Debugging Techniques

"""Enable detailed logging for troubleshooting."""

import logging

# Enable debug logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Log all MCP traffic
logger = logging.getLogger('mcp_server_lifecycle')
logger.setLevel(logging.DEBUG)

# Trace server state changes
async def log_state_change(old_state, new_state):
    logger.info(
        f"Server {old_state.config.name}: "
        f"{old_state.status.value}{new_state.status.value}"
    )

Related Documentation


Summary

This implementation guide covered:

MCP Protocol Fundamentals - JSON-RPC 2.0 over multiple transports
Transport Implementations - Stdio, HTTP, and SSE with complete code
Lifecycle Management - Initialization, health checks, reconnection
Resource Management - Connection pooling, cleanup, graceful shutdown
Agent Integration - Tool loading into Microsoft Agent Framework
Production Ready - Testing, performance tuning, troubleshooting

Key Takeaways

  1. Transport Selection: Use stdio for development, HTTP/SSE for production
  2. Health Monitoring: Continuous checks with automatic recovery
  3. Connection Pooling: Essential for HTTP/SSE performance
  4. Graceful Shutdown: Multi-phase shutdown prevents resource leaks
  5. Error Resilience: Exponential backoff and fallback mechanisms

Next Steps

  1. Implement MCPServerManager following this guide
  2. Add transport-specific handlers for your deployment
  3. Configure health checks and reconnection policies
  4. Integrate with your Agent Framework
  5. Deploy and monitor in production

For questions or contributions:


End of MCP Server Lifecycle Management Documentation def init(self, manager: MCPServerManager): """Initialize tool loader.

    Args:
        manager: MCPServerManager with active servers
    """
    self._manager = manager

async def load_tools_from_server(
    self,
    server_name: str
) -> List[FunctionTool]:
    """Query server for available tools and load them.
    
    Sends "tools/list" request to MCP server and converts
    the response to Agent Framework tool definitions.
    
    Args:
        server_name: MCP server to load tools from
        
    Returns:
        List of FunctionTool instances
        
    Raises:
        ValueError: If server not found or not healthy
    """
    async with self._manager._lock:
        state = self._manager._servers.get(server_name)
        if not state:
            raise ValueError(f"Server not found: {server_name}")
        
        if state.status != ServerStatus.HEALTHY:
            raise ValueError(
                f"Server not healthy: {server_name} "
                f"(status: {state.status.value})"
            )
    
    # Query tools from server
    tools_response = await self._call_server_method(
        state,
        "tools/list",
        {}
    )
    
    # Convert MCP tools to Agent Framework format
    agent_tools = []
    for mcp_tool in tools_response.get("tools", []):
        agent_tool = self._convert_mcp_to_agent_tool(
            mcp_tool,
            server_name
        )
        agent_tools.append(agent_tool)
    
    return agent_tools

async def _call_server_method(
    self,
    state: MCPServerState,
    method: str,
    params: Dict[str, Any]
) -> Dict[str, Any]:
    """Call MCP server method based on transport type."""
    config = state.config
    
    if config.transport == TransportType.STDIO:
        return await state.process.send_request(method, params)
    elif config.transport in (TransportType.HTTP, TransportType.SSE):
        return await state.http_session.send_request(method, params)
    else:
        raise ValueError(f"Unsupported transport: {config.transport}")

def _convert_mcp_to_agent_tool(
    self,
    mcp_tool: Dict[str, Any],
    server_name: str
) -> FunctionTool:
    """Convert MCP tool definition to Agent Framework tool.
    
    Args:
        mcp_tool: MCP tool definition from tools/list
        server_name: Source MCP server name
        
    Returns:
        FunctionTool for Agent Framework
    """
    # Create tool function that delegates to MCP server
    async def tool_function(**kwargs):
        """Dynamically created tool that calls MCP server."""
        state = self._manager._servers[server_name]
        result = await self._call_server_method(
            state,
            "tools/call",
            {
                "name": mcp_tool["name"],
                "arguments": kwargs
            }
        )
        return result.get("content", [{}])[0].get("text", "")
    
    # Convert input schema to Agent Framework format
    parameters = mcp_tool.get("inputSchema", {})
    
    return FunctionTool(
        name=f"{server_name}_{mcp_tool['name']}",
        description=mcp_tool.get("description", ""),
        parameters=parameters,
        function=tool_function
    )

async def register_tools_with_agent(
    self,
    agent: ChatAgent,
    tools: List[FunctionTool]
) -> None:
    """Register loaded tools with ChatAgent.
    
    Args:
        agent: ChatAgent instance to register tools with
        tools: Tools to register
    """
    for tool in tools:
        agent.register_tool(tool)
    
    print(f"Registered {len(tools)} tools with agent")

### Agent Framework Integration Example

```python
"""Complete integration showing MCP tools in ChatAgent workflow."""

from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import ChatAgent


async def create_agent_with_mcp_tools(
    ai_client: AIProjectClient,
    mcp_manager: MCPServerManager,
    server_names: List[str]
) -> ChatAgent:
    """Create ChatAgent with tools from MCP servers.
    
    Args:
        ai_client: Azure AI Project client
        mcp_manager: Initialized MCP server manager
        server_names: MCP servers to load tools from
        
    Returns:
        ChatAgent with MCP tools registered
    """
    # Create agent
    agent = await ai_client.agents.create_agent(
        model="gpt-4",
        name="Documentation Assistant",
        instructions="""
        You are a helpful assistant with access to up-to-date
        documentation via MCP tools. Use the tools to fetch
        accurate information.
        """
    )
    
    # Load and register tools from each MCP server
    tool_loader = MCPToolLoader(mcp_manager)
    
    for server_name in server_names:
        tools = await tool_loader.load_tools_from_server(server_name)
        await tool_loader.register_tools_with_agent(agent, tools)
        print(f"Loaded {len(tools)} tools from {server_name}")
    
    return agent


# Usage example
async def main():
    # Initialize MCP manager
    ai_client = AIProjectClient(...)
    mcp_manager = MCPServerManager(ai_client)
    
    configs = [
        MCPServerConfig(
            name="context7",
            transport=TransportType.STDIO,
            command="npx",
            args=["-y", "@upstash/context7-mcp"]
        )
    ]
    
    await mcp_manager.initialize_servers(configs)
    
    # Create agent with MCP tools
    agent = await create_agent_with_mcp_tools(
        ai_client,
        mcp_manager,
        ["context7"]
    )
    
    # Use agent with MCP tools
    thread = await ai_client.agents.create_thread()
    
    await ai_client.agents.create_message(
        thread_id=thread.id,
        role="user",
        content="How do I use React hooks?"
    )
    
    # Agent automatically uses MCP tools
    run = await ai_client.agents.create_and_process_run(
        thread_id=thread.id,
        assistant_id=agent.id
    )
    
    messages = await ai_client.agents.list_messages(thread.id)
    print(messages.data[0].content[0].text.value)
    
    # Cleanup
    await mcp_manager.shutdown_all()

This concludes Pass 4. The document now includes graceful shutdown procedures, resource cleanup, and tool loading integration with the Agent Framework.

) -> None:
    """Return connection to pool.
    
    Connection remains in pool for reuse unless stale.
    
    Args:
        server_name: Server identifier
        session: Session to release
    """
    async with self._lock:
        # Connection stays in pool, just update timestamp
        for conn in self._pools[server_name]:
            if conn.session == session:
                conn.last_used = datetime.now()
                break

async def _cleanup_stale(self, server_name: str) -> None:
    """Remove stale connections from pool.
    
    Closes and removes connections that exceed:
        - Max lifetime
        - Idle timeout
    """
    pool = self._pools[server_name]
    now = datetime.now()
    
    to_remove = []
    for conn in pool:
        lifetime = (now - conn.created_at).total_seconds()
        idle_time = (now - conn.last_used).total_seconds()
        
        if lifetime > self._max_lifetime or idle_time > self._idle_timeout:
            await conn.session.close()
            to_remove.append(conn)
    
    for conn in to_remove:
        pool.remove(conn)

async def health_check_pool(self) -> Dict[str, Dict[str, int]]:
    """Get pool statistics.
    
    Returns:
        Dict with stats per server:
            - total: Total connections
            - active: Recently used connections
            - stale: Stale connections
    """
    stats = {}
    now = datetime.now()
    
    async with self._lock:
        for server_name, pool in self._pools.items():
            active = sum(
                1 for c in pool
                if (now - c.last_used).total_seconds() < self._idle_timeout
            )
            stats[server_name] = {
                "total": len(pool),
                "active": active,
                "stale": len(pool) - active
            }
    
    return stats

async def close_all(self) -> None:
    """Close all pooled connections.
    
    Used during shutdown to release all resources.
    """
    async with self._lock:
        for pool in self._pools.values():
            for conn in pool:
                await conn.session.close()
        self._pools.clear()

---

## Automatic Reconnection

Automatic reconnection with exponential backoff handles transient failures gracefully.

### Reconnection with Exponential Backoff

```python
"""Reconnect to failed MCP server with exponential backoff.

Backoff Strategy:
    - Attempt 1: Immediate
    - Attempt 2: 1 second delay
    - Attempt 3: 2 seconds delay
    - Attempt 4: 4 seconds delay
    - Attempt 5: 8 seconds delay
    - Max delay: 60 seconds

Failure Handling:
    - After max retries, mark server FAILED
    - Log all reconnection attempts
    - Preserve server configuration for manual restart
"""

async def reconnect_with_backoff(
    server_name: str,
    manager: MCPServerManager,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0
) -> bool:
    """Reconnect to failed MCP server with exponential backoff.
    
    Args:
        server_name: Server to reconnect
        manager: MCPServerManager instance
        max_retries: Maximum reconnection attempts
        base_delay: Initial delay in seconds
        max_delay: Maximum delay cap
        
    Returns:
        True if reconnection succeeded
    """
    async with manager._lock:
        state = manager._servers.get(server_name)
        if not state:
            raise ValueError(f"Unknown server: {server_name}")
        
        state.status = ServerStatus.RECONNECTING
    
    for attempt in range(1, max_retries + 1):
        try:
            print(
                f"Reconnection attempt {attempt}/{max_retries} "
                f"for {server_name}"
            )
            
            # Cleanup old connection
            await _cleanup_server_resources(state)
            
            # Attempt reconnection based on transport
            success = False
            if state.config.transport == TransportType.STDIO:
                success = await manager._start_stdio_server(state)
            elif state.config.transport == TransportType.HTTP:
                success = await manager._connect_http_server(state)
            elif state.config.transport == TransportType.SSE:
                success = await manager._connect_sse_server(state)
            
            if success:
                print(f"Reconnected to {server_name}")
                state.consecutive_failures = 0
                return True
            
        except Exception as e:
            print(f"Reconnection attempt {attempt} failed: {e}")
        
        # Exponential backoff
        if attempt < max_retries:
            delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
            print(f"Waiting {delay}s before retry...")
            await asyncio.sleep(delay)
    
    # All retries exhausted
    async with manager._lock:
        state.status = ServerStatus.FAILED
        state.error_message = f"Failed after {max_retries} reconnection attempts"
    
    print(f"Reconnection failed for {server_name}")
    return False


async def _cleanup_server_resources(state: MCPServerState) -> None:
    """Clean up resources before reconnection attempt.
    
    Ensures clean slate by:
        - Terminating stdio processes
        - Closing HTTP sessions
        - Releasing connection pool resources
    """
    if state.process:
        try:
            await state.process.shutdown(timeout=2.0)
        except Exception:
            pass
        state.process = None
    
    if state.http_session:
        try:
            await state.http_session.close()
        except Exception:
            pass
        state.http_session = None

Manager Reconnect Method

# In MCPServerManager

async def reconnect_server(
    self,
    server_name: str,
    max_retries: int = 3
) -> bool:
    """Reconnect to a failed server.
    
    Public method for manual or automatic reconnection.
    Uses exponential backoff strategy internally.
    
    Args:
        server_name: Server to reconnect
        max_retries: Maximum reconnection attempts
        
    Returns:
        True if reconnection succeeded
        
    Example:
        >>> success = await manager.reconnect_server("weather-api")
        >>> if success:
        ...     print("Server reconnected")
    """
    return await reconnect_with_backoff(
        server_name,
        self,
        max_retries=max_retries or self._max_reconnect_attempts
    )


async def health_check_all(self) -> Dict[str, bool]:
    """Check health of all registered servers.
    
    Returns:
        Dict mapping server name to health status
    """
    checker = ServerHealthChecker(self)
    return await checker.check_all_servers()

This concludes Pass 3. The document now includes health checking, connection pooling, and automatic reconnection with exponential backoff.

    if self._sse_response.status != 200:
        raise ConnectionError(
            f"SSE connection failed: {self._sse_response.status}"
        )

async def listen_events(
    self,
    callback: Callable[[Dict[str, Any]], None]
) -> None:
    """Listen for server-sent events.
    
    Blocks and calls callback for each event received.
    Runs until connection closes or cancelled.
    
    Args:
        callback: Async function called for each event
    """
    if not self._sse_response:
        raise RuntimeError("Not connected")
    
    try:
        async for line in self._sse_response.content:
            line = line.decode().strip()
            
            # SSE format: "data: {json}"
            if line.startswith('data: '):
                try:
                    event_data = json.loads(line[6:])
                    await callback(event_data)
                except json.JSONDecodeError as e:
                    print(f"Invalid SSE data: {e}")
            
            # Heartbeat or comment
            elif line.startswith(':'):
                pass  # Ignore comments
                
    except asyncio.CancelledError:
        raise
    except Exception as e:
        print(f"SSE stream error: {e}")

async def send_request(
    self,
    method: str,
    params: Dict[str, Any]
) -> Dict[str, Any]:
    """Send HTTP request (responses come via SSE).
    
    Args:
        method: JSON-RPC method
        params: Method parameters
        
    Returns:
        Immediate acknowledgment (actual result via SSE)
    """
    if not self._http_session:
        raise RuntimeError("Not connected")
    
    self._request_id += 1
    request = {
        "jsonrpc": "2.0",
        "method": method,
        "params": params,
        "id": self._request_id
    }
    
    endpoint = f"{self._http_base_url}/{method}"
    
    async with self._http_session.post(
        endpoint,
        json=request
    ) as response:
        response.raise_for_status()
        return await response.json()

async def close(self) -> None:
    """Close SSE connection and HTTP session."""
    if self._sse_response:
        self._sse_response.close()
        self._sse_response = None
    
    if self._http_session:
        await self._http_session.close()
        self._http_session = None

### SSE Server Lifecycle

```python
# In MCPServerManager._connect_sse_server()

async def _connect_sse_server(self, state: MCPServerState) -> bool:
    """Connect to SSE-based MCP server.
    
    Args:
        state: Server state with SSE configuration
        
    Returns:
        True if connected successfully
    """
    config = state.config
    
    try:
        # Create SSE server
        server = SSEMCPServer()
        
        # Prepare headers
        headers = dict(config.headers or {})
        if config.auth_token:
            headers["Authorization"] = f"Bearer {config.auth_token}"
        
        # Connect to SSE stream
        sse_url = f"{config.base_url}/sse"
        await server.connect(
            sse_url=sse_url,
            http_base_url=config.base_url,
            headers=headers
        )
        
        # Start event listener in background
        async def handle_event(event: Dict[str, Any]) -> None:
            # Process SSE events (tool results, notifications, etc.)
            print(f"SSE event from {config.name}: {event}")
        
        event_task = asyncio.create_task(
            server.listen_events(handle_event)
        )
        
        # Store in state
        state.http_session = server
        state.status = ServerStatus.RUNNING
        
        # Verify with initial request
        tools = await server.send_request("tools/list", {})
        state.tools_loaded = len(tools.get("tools", []))
        state.status = ServerStatus.HEALTHY
        
        return True
        
    except Exception as e:
        state.status = ServerStatus.FAILED
        state.error_message = str(e)
        return False

This concludes Pass 2. The document now includes complete transport-specific implementations for stdio, HTTP, and SSE.


Connection Pooling Strategy

Connection pooling for MCP servers reduces latency and manages resource limits by reusing connections across multiple tool calls.

MCPConnectionPool Class

"""Connection pool for MCP servers.

Maintains a pool of reusable connections to reduce overhead.
Particularly effective for HTTP/SSE transports where connection
establishment is expensive (~50ms per connection).

Performance Impact:
    - First request: ~50ms (connection establishment)
    - Pooled requests: ~5ms (reuse existing connection)
    - Reduces server load and client latency

Invariants:
    - Pool size never exceeds max_connections per server
    - Stale connections automatically cleaned up
    - Thread-safe via asyncio locks
"""

import asyncio
from typing import Dict, List, Optional
from logging import getLogger
from dataclasses import dataclass

logger = getLogger(__name__)


@dataclass
class MCPServerConfig:
    """Configuration for MCP server connection."""
    name: str
    transport: str  # "stdio" or "http"
    command: Optional[str] = None
    args: Optional[List[str]] = None
    env: Optional[Dict[str, str]] = None
    url: Optional[str] = None
    headers: Optional[Dict[str, str]] = None


class MCPConnectionPool:
    """Pool connections to MCP servers.
    
    Maintains a pool of connections for each MCP server to reduce
    connection overhead and improve performance for frequent tool calls.
    """
    
    def __init__(self, max_connections: int = 5):
        """Initialize connection pool.
        
        Args:
            max_connections: Maximum connections per server
        """
        self._pools: Dict[str, List[MCPClient]] = {}
        self._max_connections = max_connections
        self._locks: Dict[str, asyncio.Lock] = {}
        logger.info(f"Initialized MCP connection pool (max={max_connections})")
    
    async def acquire(self, server_name: str, config: MCPServerConfig) -> MCPClient:
        """Acquire connection from pool.
        
        Returns existing healthy connection if available, otherwise
        creates new connection up to max_connections limit.
        
        Args:
            server_name: Name of MCP server
            config: Server configuration for new connections
            
        Returns:
            Connected MCP client ready for use
            
        Raises:
            ConnectionError: If unable to establish connection
        """
        if server_name not in self._locks:
            self._locks[server_name] = asyncio.Lock()
        
        async with self._locks[server_name]:
            # Try to get existing connection
            if server_name in self._pools and self._pools[server_name]:
                client = self._pools[server_name].pop()
                
                # Verify connection is still healthy
                if await self._check_health(client):
                    logger.debug(f"Reusing connection to '{server_name}'")
                    return client
                else:
                    logger.warning(f"Stale connection to '{server_name}', reconnecting")
                    await client.disconnect()
            
            # Create new connection
            if server_name not in self._pools:
                self._pools[server_name] = []
            
            client = await self._connect(config)
            logger.info(f"Created new connection to '{server_name}'")
            return client
    
    async def release(self, server_name: str, client: MCPClient):
        """Return connection to pool.
        
        Connection is returned to pool for reuse if pool is not full.
        Otherwise, connection is closed.
        
        Args:
            server_name: Name of MCP server
            client: Client connection to return
        """
        async with self._locks[server_name]:
            if len(self._pools.get(server_name, [])) < self._max_connections:
                self._pools[server_name].append(client)
                logger.debug(f"Returned connection to '{server_name}' pool")
            else:
                await client.disconnect()
                logger.debug(f"Pool full, closed connection to '{server_name}'")
    
    async def _connect(self, config: MCPServerConfig) -> MCPClient:
        """Establish new MCP connection.
        
        Args:
            config: Server configuration
            
        Returns:
            Connected MCP client
            
        Raises:
            ConnectionError: If connection fails
        """
        if config.transport == "stdio":
            return await StdioMCPClient.connect(
                command=config.command,
                args=config.args,
                env=config.env
            )
        elif config.transport == "http":
            return HttpMCPClient(
                base_url=config.url,
                headers=config.headers
            )
        else:
            raise ValueError(f"Unsupported transport: {config.transport}")
    
    async def _check_health(self, client: MCPClient) -> bool:
        """Check if connection is healthy.
        
        Args:
            client: MCP client to check
            
        Returns:
            True if connection is healthy, False otherwise
        """
        try:
            await asyncio.wait_for(client.ping(), timeout=2.0)
            return True
        except (asyncio.TimeoutError, Exception):
            return False
    
    async def close_all(self):
        """Close all pooled connections.
        
        Called during shutdown to clean up resources.
        """
        for server_name, clients in self._pools.items():
            for client in clients:
                await client.disconnect()
            logger.info(f"Closed {len(clients)} connections to '{server_name}'")
        self._pools.clear()

Pool Configuration Guidelines

Transport-Specific Recommendations:

  • stdio transport: Pool size 1-2 (process overhead)

    • Each connection spawns a subprocess
    • High memory overhead per connection
    • Use minimal pooling for development
  • HTTP transport: Pool size 3-5 (network connections)

    • Balances connection reuse with memory
    • Suitable for moderate request rates
    • Default for production deployments
  • Adjust based on:

    • Tool call frequency (higher = larger pool)
    • Latency requirements (lower latency = larger pool)
    • Available memory (constrain for resource-limited systems)

Example Configurations:

# Development (low traffic)
pool = MCPConnectionPool(max_connections=2)

# Production (moderate traffic)
pool = MCPConnectionPool(max_connections=5)

# High-throughput (many concurrent requests)
pool = MCPConnectionPool(max_connections=10)

Health Check Implementation

Comprehensive health monitoring ensures MCP servers remain operational with automatic failure detection and recovery.

Health Check Components

"""Health checking for MCP servers.

Performs multi-level health verification:
    1. Basic connectivity (ping/heartbeat)
    2. Functional validation (tool listing)
    3. Latency measurement

Thresholds:
    - 2 consecutive failures → Log warning
    - 5 consecutive failures → Trigger reconnection
    - 10 consecutive failures → Mark as FAILED
"""

from enum import Enum
from typing import NamedTuple
from datetime import datetime


class HealthStatus(Enum):
    """Health check status."""
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"


class HealthCheckResult(NamedTuple):
    """Result of health check."""
    name: str
    status: HealthStatus
    message: str
    latency_ms: Optional[float] = None
    timestamp: datetime = datetime.now()


async def check_mcp_server_health(
    server: MCPServer,
    timeout: float = 5.0
) -> HealthCheckResult:
    """Comprehensive MCP server health check.
    
    Performs multiple checks to verify server is operational:
    1. Basic connectivity (ping)
    2. Tool listing (verifies server is functional)
    3. Latency measurement
    
    Args:
        server: MCP server to check
        timeout: Maximum time for health check (seconds)
        
    Returns:
        HealthCheckResult with detailed status information
        
    Example:
        result = await check_mcp_server_health(mcp_server)
        if result.status == HealthStatus.HEALTHY:
            print(f"Server OK: {result.message}")
        else:
            print(f"Server issue: {result.message}")
    """
    start_time = datetime.now()
    
    try:
        # Basic connectivity check
        await asyncio.wait_for(server.client.ping(), timeout=timeout / 2)
        
        # Functional check - list tools
        tools = await asyncio.wait_for(
            server.client.list_tools(),
            timeout=timeout / 2
        )
        
        # Calculate latency
        latency = (datetime.now() - start_time).total_seconds() * 1000
        
        return HealthCheckResult(
            name=f"mcp_{server.name}",
            status=HealthStatus.HEALTHY,
            message=f"Server responsive, {len(tools)} tools available",
            latency_ms=latency
        )
    
    except asyncio.TimeoutError:
        return HealthCheckResult(
            name=f"mcp_{server.name}",
            status=HealthStatus.UNHEALTHY,
            message=f"Health check timeout after {timeout}s"
        )
    
    except ConnectionError as e:
        return HealthCheckResult(
            name=f"mcp_{server.name}",
            status=HealthStatus.UNHEALTHY,
            message=f"Connection error: {str(e)}"
        )
    
    except Exception as e:
        return HealthCheckResult(
            name=f"mcp_{server.name}",
            status=HealthStatus.DEGRADED,
            message=f"Unexpected error: {str(e)}"
        )


async def periodic_health_checks(
    servers: List[MCPServer],
    interval: int = 60
):
    """Run periodic health checks on all MCP servers.
    
    Continuously monitors server health and logs issues.
    
    Args:
        servers: List of MCP servers to monitor
        interval: Seconds between health checks
        
    Example:
        # Start background health monitoring
        asyncio.create_task(periodic_health_checks(mcp_servers, interval=30))
    """
    while True:
        for server in servers:
            result = await check_mcp_server_health(server)
            
            if result.status == HealthStatus.HEALTHY:
                logger.debug(f"{result.name}: {result.message} ({result.latency_ms:.1f}ms)")
            elif result.status == HealthStatus.DEGRADED:
                logger.warning(f"{result.name}: {result.message}")
            else:
                logger.error(f"{result.name}: {result.message}")
        
        await asyncio.sleep(interval)

Health Check Best Practices

1. Timeout Configuration

  • Ping/connectivity: 2-5 seconds

    • Quick check for basic connectivity
    • Fails fast on network issues
  • Functional checks: 5-10 seconds

    • Tool listing or resource queries
    • Verifies server logic is working

2. Check Frequency

  • Development: 10-30 seconds (quick feedback)
  • Production: 30-60 seconds (balanced)
  • Stable systems: 60-300 seconds (reduced overhead)

3. Alert Thresholds

  • 2 failures: Log warning (transient issue)
  • 5 failures: Trigger reconnection (persistent problem)
  • 10 failures: Mark FAILED (server down)

4. Graceful Degradation

  • Continue operation with degraded servers
  • Disable only on complete failure
  • Provide fallback mechanisms where possible
  • Alert operations team for manual intervention

Integration with Connection Pool

"""Combine connection pooling with health monitoring."""


class MCPServerManager:
    """Combine connection pooling with health monitoring."""
    
    def __init__(self):
        self.pool = MCPConnectionPool(max_connections=5)
        self.health_status: Dict[str, HealthCheckResult] = {}
    
    async def get_client(
        self,
        server_name: str,
        config: MCPServerConfig
    ) -> Optional[MCPClient]:
        """Get client only if server is healthy.
        
        Args:
            server_name: Server name
            config: Server configuration
            
        Returns:
            Client if healthy, None if unhealthy
        """
        # Check health status
        if server_name in self.health_status:
            status = self.health_status[server_name]
            if status.status == HealthStatus.UNHEALTHY:
                logger.warning(f"Skipping unhealthy server '{server_name}'")
                return None
        
        # Acquire connection
        return await self.pool.acquire(server_name, config)

Usage Example:

# Initialize manager with pooling and health checks
manager = MCPServerManager()

# Start background health monitoring
asyncio.create_task(
    periodic_health_checks(
        servers=manager.get_all_servers(),
        interval=60  # Check every minute
    )
)

# Get client (automatically checks health)
client = await manager.get_client("context7", config)
if client:
    result = await client.call_tool("resolve-library-id", {"libraryName": "react"})
    await manager.pool.release("context7", client)
else:
    logger.error("Server unavailable, using fallback")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment