Skip to content

Instantly share code, notes, and snippets.

@monkut
Last active March 10, 2026 07:24
Show Gist options
  • Select an option

  • Save monkut/0d6213fd6e92ea4d411b47b002a5c508 to your computer and use it in GitHub Desktop.

Select an option

Save monkut/0d6213fd6e92ea4d411b47b002a5c508 to your computer and use it in GitHub Desktop.
Real-Time WebSockets on AWS Lambda with Zappa 0.62.0

Real-Time WebSockets on AWS Lambda with Zappa 0.62.0

Zappa 0.62.0 ships with native WebSocket support via API Gateway WebSocket APIs. You can now handle persistent WebSocket connections directly in your Lambda functions — no extra infrastructure, no configuration, no new dependencies.

Zappa provides two handler patterns: a decorator-based approach for simple use cases and a class-based approach when you want structure and shared state. Both work with the same underlying API Gateway WebSocket API and can coexist with your existing WSGI/ASGI app.

How It Works

When you zappa deploy or zappa update, Zappa scans your project for from zappa.websocket import ... statements. When found, it:

  1. Provisions a WebSocket API Gateway alongside your REST/HTTP API via CloudFormation
  2. Creates $connect, $disconnect, and $default routes pointing to your Lambda
  3. Adds execute-api:ManageConnections IAM permissions so your handlers can push messages to clients
  4. Prints the wss:// URL after deployment

Incoming WebSocket events (CONNECT, DISCONNECT, MESSAGE) are routed to your registered handlers automatically. zappa undeploy cleans up all WebSocket resources.


Pattern 1: Decorators

The decorator pattern is the fastest way to add WebSocket support. Import the decorators, apply them to plain functions, and you're done.

# app.py
import json
import logging

from flask import Flask
from zappa.websocket import on_connect, on_disconnect, on_message, send_message

logger = logging.getLogger(__name__)
app = Flask(__name__)

# Keep track of connections (use DynamoDB in production)
connections = set()


@app.route("/")
def index():
    return {"status": "ok"}


@on_connect
def handle_connect(event, context):
    """Called when a client opens a WebSocket connection."""
    connection_id = event["requestContext"]["connectionId"]
    connections.add(connection_id)
    logger.info("Connected: %s", connection_id)
    return {"statusCode": 200}


@on_disconnect
def handle_disconnect(event, context):
    """Called when a client closes the connection."""
    connection_id = event["requestContext"]["connectionId"]
    connections.discard(connection_id)
    logger.info("Disconnected: %s", connection_id)
    return {"statusCode": 200}


@on_message
def handle_message(event, context):
    """Called for every message sent by the client."""
    connection_id = event["requestContext"]["connectionId"]
    body = json.loads(event.get("body", "{}"))
    action = body.get("action")

    if action == "echo":
        send_message(connection_id, {"type": "echo", "data": body.get("data")})
    elif action == "ping":
        send_message(connection_id, {"type": "pong"})
    else:
        send_message(connection_id, {"type": "error", "message": f"Unknown action: {action}"})

    return {"statusCode": 200}

Key points:

  • @on_connect and @on_message are required — Zappa validates this at startup
  • @on_disconnect is optional
  • Handlers receive the raw Lambda (event, context) signature
  • Must return {"statusCode": 200} (or another valid status) for API Gateway

Pattern 2: Base Class

The class-based pattern suits applications that need shared state, helper methods, or a more structured approach. Subclass ZappaWebSocketServer and override the methods you need.

# app.py
import json
import logging

import boto3
from flask import Flask
from zappa.websocket import ZappaWebSocketServer, send_message

logger = logging.getLogger(__name__)
app = Flask(__name__)


class ChatRoom(ZappaWebSocketServer):
    """A simple chat room backed by DynamoDB for connection tracking."""

    TABLE_NAME = "websocket-connections"

    def _get_table(self):
        return boto3.resource("dynamodb").Table(self.TABLE_NAME)

    def on_connect(self, event, context):
        connection_id = event["requestContext"]["connectionId"]
        self._get_table().put_item(Item={"connectionId": connection_id})
        logger.info("Client joined: %s", connection_id)
        return {"statusCode": 200}

    def on_disconnect(self, event, context):
        connection_id = event["requestContext"]["connectionId"]
        self._get_table().delete_item(Key={"connectionId": connection_id})
        logger.info("Client left: %s", connection_id)
        return {"statusCode": 200}

    def on_message(self, event, context):
        data = json.loads(event.get("body", "{}"))
        sender_id = event["requestContext"]["connectionId"]
        message = data.get("message", "")

        table = self._get_table()
        response = table.scan(ProjectionExpression="connectionId")

        for item in response["Items"]:
            cid = item["connectionId"]
            try:
                send_message(cid, {"sender": sender_id, "message": message})
            except Exception:
                table.delete_item(Key={"connectionId": cid})

        return {"statusCode": 200}

Key points:

  • Only override the methods you need — unoverridden methods return {"statusCode": 200} by default
  • The class is lazily instantiated as a singleton, so instance state persists across invocations within the same Lambda container
  • Registration happens at class definition time via __init_subclass__, so simply defining the class is enough

Sending Messages to Clients

send_message(connection_id, data) sends a message to any connected client by its connection ID. The Zappa handler automatically sets the WEBSOCKET_API_GATEWAY_ENDPOINT environment variable on each WebSocket event, so the function discovers the API Gateway endpoint without any manual configuration.

from zappa.websocket import send_message

@on_message
def handle_message(event, context):
    connection_id = event["requestContext"]["connectionId"]
    send_message(connection_id, {"type": "echo", "data": event.get("body")})
    return {"statusCode": 200}

The data argument accepts three types:

  • dict — JSON-encoded to a UTF-8 byte string
  • str — encoded as UTF-8 bytes
  • bytes — sent as-is (raw binary)
send_message(connection_id, {"type": "notification", "text": "Hello!"})
send_message(connection_id, "plain text payload")
send_message(connection_id, b"\x00\x01\x02")

Because send_message takes a connection_id (not the event), you can send to any connected client — not just the sender. This is what the ChatRoom broadcast example does: it scans DynamoDB for all stored connection IDs and calls send_message for each one.


Configuration

Zero Configuration (Auto-Detection)

In most cases, no settings changes are needed. Zappa scans your project for files that import from zappa.websocket and wires everything up automatically during deploy.

Explicit Module Path

If auto-detection doesn't find your handlers (dynamic imports, non-standard layouts), set the module path in zappa_settings.json:

{
    "dev": {
        "app_function": "app.app",
        "websocket_handler_module": "app"
    }
}

For handlers in a separate file:

{
    "dev": {
        "app_function": "app.app",
        "websocket_handler_module": "myproject.ws_handlers"
    }
}

Deploying and Testing

# Deploy
$ zappa deploy dev
...
Your WebSocket URL is: wss://abc123.execute-api.us-east-1.amazonaws.com/dev

# Test with wscat (npm install -g wscat)
$ wscat -c wss://abc123.execute-api.us-east-1.amazonaws.com/dev
Connected (press CTRL+C to quit)
> {"action": "echo", "data": "hello"}
< {"type": "echo", "data": "hello"}
> {"action": "ping"}
< {"type": "pong"}

# Update after code changes
$ zappa update dev

Which Pattern Should You Use?

Decorators Base Class
Best for Simple handlers, quick prototypes Shared state, helper methods, complex logic
Structure Flat functions Single class with methods
State Module-level variables Instance attributes (singleton per container)
Registration Explicit via @decorator Automatic via method override
Minimum code 3 decorated functions 1 class with 2 methods

Both patterns can live in the same file as your Flask/Django/FastAPI app, or in a separate module. They produce identical API Gateway resources and behave identically at runtime.


Limitations

  • No ASGI WebSocket protocol — this uses API Gateway's native WebSocket API, not ASGI WebSocket scope. ASGI support (also new in 0.62.0) handles HTTP only.
  • Lambda concurrency — each WebSocket message invokes Lambda independently. There's no persistent server process.
  • Connection tracking — Lambda doesn't share memory across invocations. Use DynamoDB or ElastiCache for tracking connected clients (the in-memory set() in the decorator example is illustrative only).
  • 30-minute idle timeout — API Gateway disconnects idle WebSocket connections after 10 minutes (configurable up to 30 minutes).
  • 128 KB message size — API Gateway WebSocket API has a 128 KB payload limit per frame.

Zappa 0.62.0 is available now. Upgrade with pip install zappa --upgrade and start building real-time features on Lambda.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment