Skip to content

Instantly share code, notes, and snippets.

@beatzxbt
Created May 28, 2024 11:12
Show Gist options
  • Select an option

  • Save beatzxbt/cbb3bb993ffe4f96bfe4e6e09d571f7e to your computer and use it in GitHub Desktop.

Select an option

Save beatzxbt/cbb3bb993ffe4f96bfe4e6e09d571f7e to your computer and use it in GitHub Desktop.
orderbook handler w/zmq support
import zmq
import orjson
from time import time_ns
from abc import ABC, abstractmethod
from typing import Dict
from mm.frameworks.exch.base.status import Status
class OrderbookHandler(ABC):
stream = "-orderbook___".encode() # NOTE: Must have exactly '___' at end as filler
def __init__(self, socket: zmq.Socket, exchange: int) -> None:
self.socket = socket
self.exchange = f"{exchange}-".encode()
self.payload = {
"status": Status.WARMUP,
"ts": 0,
"local_ts": 0,
"bids": [[0.0, 0.0]],
"asks": [[0.0, 0.0]]
}
def add_required_keys(self, required_keys: Dict) -> None:
self.required_keys = required_keys
def validate_payload(self, recv: Dict) -> bool:
return self.required_keys <= recv.keys()
def construct_topic(self, symbol: str) -> bytes:
"""
Final: exchange-symbol-stream___
"""
return self.exchange + symbol.encode() + self.stream
def send(self, status: int, symbol: str) -> None:
self.payload["status"] = status
self.payload["local_ts"] = time_ns()
msg = self.construct_topic(symbol) + orjson.dumps(self.payload)
self.socket.send(msg, copy=False)
return None
@abstractmethod
def resync(self, recv: Dict) -> None:
pass
@abstractmethod
def process(self, recv: Dict) -> None:
pass
# -----------------#
import zmq
from typing import Dict
from mm.frameworks.exch.base.handlers.orderbook import OrderbookHandler
from mm.frameworks.constants.enums.enums import Exchange
from mm.frameworks.exch.base.status import Status
class BybitOrderbookHandler(OrderbookHandler):
def __init__(self, socket: zmq.Socket) -> None:
super().__init__(socket, Exchange.BYBIT)
self.add_required_keys({"ts", "data"})
self.update_ids = {}
def resync(self, symbol: str, recv: Dict) -> None:
try:
self.payload["ts"] = int(recv["cts"])
self.payload["bids"] = [list(map(float, level)) for level in recv["data"]["b"]]
self.payload["asks"] = [list(map(float, level)) for level in recv["data"]["a"]]
self.send(Status.RESYNC, symbol)
except KeyError as ke:
pass # TODO: Add logging here, many bad payloads == restart
except Exception as e:
raise Exception(f"Bybit Orderbook Resync: {e}")
def process(self, recv: Dict) -> None:
try:
if self.validate_payload(recv):
update_id = int(recv["u"])
symbol = recv["data"]["s"]
if symbol in self.update_ids:
# Duplicate/old update, ignore...
if update_id <= self.update_ids[symbol]:
return None
self.update_ids[symbol] = update_id
self.payload["ts"] = int(recv["cts"])
self.payload["bids"] = [list(map(float, level)) for level in recv["data"]["b"]]
self.payload["asks"] = [list(map(float, level)) for level in recv["data"]["a"]]
self.send(Status.NORMAL, symbol)
else:
self.update_ids[symbol] = update_id
except Exception as e:
self.send(Status.ERROR, symbol)
raise Exception(f"Bybit Orderbook Process: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment