Zappa 0.62.0 ships with native WebSocket support via API Gateway WebSocket APIs. You can now handle persistent WebSocket connections directly in your Lambda functions — no extra infrastructure, no configuration, no new dependencies.
Zappa provides two handler patterns: a decorator-based approach for simple use cases and a class-based approach when you want structure and shared state. Both work with the same underlying API Gateway WebSocket API and can coexist with your existing WSGI/ASGI app.
When you zappa deploy or zappa update, Zappa scans your project for from zappa.websocket import ... statements. When found, it:
- Provisions a WebSocket API Gateway alongside your REST/HTTP API via CloudFormation
- Creates
$connect,$disconnect, and$defaultroutes pointing to your Lambda - Adds
execute-api:ManageConnectionsIAM permissions so your handlers can push messages to clients - Prints the
wss://URL after deployment
Incoming WebSocket events (CONNECT, DISCONNECT, MESSAGE) are routed to your registered handlers automatically. zappa undeploy cleans up all WebSocket resources.
The decorator pattern is the fastest way to add WebSocket support. Import the decorators, apply them to plain functions, and you're done.
# app.py
import json
import logging
from flask import Flask
from zappa.websocket import on_connect, on_disconnect, on_message, send_message
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Keep track of connections (use DynamoDB in production)
connections = set()
@app.route("/")
def index():
return {"status": "ok"}
@on_connect
def handle_connect(event, context):
"""Called when a client opens a WebSocket connection."""
connection_id = event["requestContext"]["connectionId"]
connections.add(connection_id)
logger.info("Connected: %s", connection_id)
return {"statusCode": 200}
@on_disconnect
def handle_disconnect(event, context):
"""Called when a client closes the connection."""
connection_id = event["requestContext"]["connectionId"]
connections.discard(connection_id)
logger.info("Disconnected: %s", connection_id)
return {"statusCode": 200}
@on_message
def handle_message(event, context):
"""Called for every message sent by the client."""
connection_id = event["requestContext"]["connectionId"]
body = json.loads(event.get("body", "{}"))
action = body.get("action")
if action == "echo":
send_message(connection_id, {"type": "echo", "data": body.get("data")})
elif action == "ping":
send_message(connection_id, {"type": "pong"})
else:
send_message(connection_id, {"type": "error", "message": f"Unknown action: {action}"})
return {"statusCode": 200}Key points:
@on_connectand@on_messageare required — Zappa validates this at startup@on_disconnectis optional- Handlers receive the raw Lambda
(event, context)signature - Must return
{"statusCode": 200}(or another valid status) for API Gateway
The class-based pattern suits applications that need shared state, helper methods, or a more structured approach. Subclass ZappaWebSocketServer and override the methods you need.
# app.py
import json
import logging
import boto3
from flask import Flask
from zappa.websocket import ZappaWebSocketServer, send_message
logger = logging.getLogger(__name__)
app = Flask(__name__)
class ChatRoom(ZappaWebSocketServer):
"""A simple chat room backed by DynamoDB for connection tracking."""
TABLE_NAME = "websocket-connections"
def _get_table(self):
return boto3.resource("dynamodb").Table(self.TABLE_NAME)
def on_connect(self, event, context):
connection_id = event["requestContext"]["connectionId"]
self._get_table().put_item(Item={"connectionId": connection_id})
logger.info("Client joined: %s", connection_id)
return {"statusCode": 200}
def on_disconnect(self, event, context):
connection_id = event["requestContext"]["connectionId"]
self._get_table().delete_item(Key={"connectionId": connection_id})
logger.info("Client left: %s", connection_id)
return {"statusCode": 200}
def on_message(self, event, context):
data = json.loads(event.get("body", "{}"))
sender_id = event["requestContext"]["connectionId"]
message = data.get("message", "")
table = self._get_table()
response = table.scan(ProjectionExpression="connectionId")
for item in response["Items"]:
cid = item["connectionId"]
try:
send_message(cid, {"sender": sender_id, "message": message})
except Exception:
table.delete_item(Key={"connectionId": cid})
return {"statusCode": 200}Key points:
- Only override the methods you need — unoverridden methods return
{"statusCode": 200}by default - The class is lazily instantiated as a singleton, so instance state persists across invocations within the same Lambda container
- Registration happens at class definition time via
__init_subclass__, so simply defining the class is enough
send_message(connection_id, data) sends a message to any connected client by its connection ID. The Zappa handler automatically sets the WEBSOCKET_API_GATEWAY_ENDPOINT environment variable on each WebSocket event, so the function discovers the API Gateway endpoint without any manual configuration.
from zappa.websocket import send_message
@on_message
def handle_message(event, context):
connection_id = event["requestContext"]["connectionId"]
send_message(connection_id, {"type": "echo", "data": event.get("body")})
return {"statusCode": 200}The data argument accepts three types:
dict— JSON-encoded to a UTF-8 byte stringstr— encoded as UTF-8 bytesbytes— sent as-is (raw binary)
send_message(connection_id, {"type": "notification", "text": "Hello!"})
send_message(connection_id, "plain text payload")
send_message(connection_id, b"\x00\x01\x02")Because send_message takes a connection_id (not the event), you can send to any connected client — not just the sender. This is what the ChatRoom broadcast example does: it scans DynamoDB for all stored connection IDs and calls send_message for each one.
In most cases, no settings changes are needed. Zappa scans your project for files that import from zappa.websocket and wires everything up automatically during deploy.
If auto-detection doesn't find your handlers (dynamic imports, non-standard layouts), set the module path in zappa_settings.json:
{
"dev": {
"app_function": "app.app",
"websocket_handler_module": "app"
}
}For handlers in a separate file:
{
"dev": {
"app_function": "app.app",
"websocket_handler_module": "myproject.ws_handlers"
}
}# Deploy
$ zappa deploy dev
...
Your WebSocket URL is: wss://abc123.execute-api.us-east-1.amazonaws.com/dev
# Test with wscat (npm install -g wscat)
$ wscat -c wss://abc123.execute-api.us-east-1.amazonaws.com/dev
Connected (press CTRL+C to quit)
> {"action": "echo", "data": "hello"}
< {"type": "echo", "data": "hello"}
> {"action": "ping"}
< {"type": "pong"}
# Update after code changes
$ zappa update dev| Decorators | Base Class | |
|---|---|---|
| Best for | Simple handlers, quick prototypes | Shared state, helper methods, complex logic |
| Structure | Flat functions | Single class with methods |
| State | Module-level variables | Instance attributes (singleton per container) |
| Registration | Explicit via @decorator |
Automatic via method override |
| Minimum code | 3 decorated functions | 1 class with 2 methods |
Both patterns can live in the same file as your Flask/Django/FastAPI app, or in a separate module. They produce identical API Gateway resources and behave identically at runtime.
- No ASGI WebSocket protocol — this uses API Gateway's native WebSocket API, not ASGI WebSocket scope. ASGI support (also new in 0.62.0) handles HTTP only.
- Lambda concurrency — each WebSocket message invokes Lambda independently. There's no persistent server process.
- Connection tracking — Lambda doesn't share memory across invocations. Use DynamoDB or ElastiCache for tracking connected clients (the in-memory
set()in the decorator example is illustrative only). - 30-minute idle timeout — API Gateway disconnects idle WebSocket connections after 10 minutes (configurable up to 30 minutes).
- 128 KB message size — API Gateway WebSocket API has a 128 KB payload limit per frame.
Zappa 0.62.0 is available now. Upgrade with pip install zappa --upgrade and start building real-time features on Lambda.