Created
May 28, 2024 11:12
-
-
Save beatzxbt/cbb3bb993ffe4f96bfe4e6e09d571f7e to your computer and use it in GitHub Desktop.
orderbook handler w/zmq support
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import zmq | |
| import orjson | |
| from time import time_ns | |
| from abc import ABC, abstractmethod | |
| from typing import Dict | |
| from mm.frameworks.exch.base.status import Status | |
| class OrderbookHandler(ABC): | |
| stream = "-orderbook___".encode() # NOTE: Must have exactly '___' at end as filler | |
| def __init__(self, socket: zmq.Socket, exchange: int) -> None: | |
| self.socket = socket | |
| self.exchange = f"{exchange}-".encode() | |
| self.payload = { | |
| "status": Status.WARMUP, | |
| "ts": 0, | |
| "local_ts": 0, | |
| "bids": [[0.0, 0.0]], | |
| "asks": [[0.0, 0.0]] | |
| } | |
| def add_required_keys(self, required_keys: Dict) -> None: | |
| self.required_keys = required_keys | |
| def validate_payload(self, recv: Dict) -> bool: | |
| return self.required_keys <= recv.keys() | |
| def construct_topic(self, symbol: str) -> bytes: | |
| """ | |
| Final: exchange-symbol-stream___ | |
| """ | |
| return self.exchange + symbol.encode() + self.stream | |
| def send(self, status: int, symbol: str) -> None: | |
| self.payload["status"] = status | |
| self.payload["local_ts"] = time_ns() | |
| msg = self.construct_topic(symbol) + orjson.dumps(self.payload) | |
| self.socket.send(msg, copy=False) | |
| return None | |
| @abstractmethod | |
| def resync(self, recv: Dict) -> None: | |
| pass | |
| @abstractmethod | |
| def process(self, recv: Dict) -> None: | |
| pass | |
| # -----------------# | |
| import zmq | |
| from typing import Dict | |
| from mm.frameworks.exch.base.handlers.orderbook import OrderbookHandler | |
| from mm.frameworks.constants.enums.enums import Exchange | |
| from mm.frameworks.exch.base.status import Status | |
| class BybitOrderbookHandler(OrderbookHandler): | |
| def __init__(self, socket: zmq.Socket) -> None: | |
| super().__init__(socket, Exchange.BYBIT) | |
| self.add_required_keys({"ts", "data"}) | |
| self.update_ids = {} | |
| def resync(self, symbol: str, recv: Dict) -> None: | |
| try: | |
| self.payload["ts"] = int(recv["cts"]) | |
| self.payload["bids"] = [list(map(float, level)) for level in recv["data"]["b"]] | |
| self.payload["asks"] = [list(map(float, level)) for level in recv["data"]["a"]] | |
| self.send(Status.RESYNC, symbol) | |
| except KeyError as ke: | |
| pass # TODO: Add logging here, many bad payloads == restart | |
| except Exception as e: | |
| raise Exception(f"Bybit Orderbook Resync: {e}") | |
| def process(self, recv: Dict) -> None: | |
| try: | |
| if self.validate_payload(recv): | |
| update_id = int(recv["u"]) | |
| symbol = recv["data"]["s"] | |
| if symbol in self.update_ids: | |
| # Duplicate/old update, ignore... | |
| if update_id <= self.update_ids[symbol]: | |
| return None | |
| self.update_ids[symbol] = update_id | |
| self.payload["ts"] = int(recv["cts"]) | |
| self.payload["bids"] = [list(map(float, level)) for level in recv["data"]["b"]] | |
| self.payload["asks"] = [list(map(float, level)) for level in recv["data"]["a"]] | |
| self.send(Status.NORMAL, symbol) | |
| else: | |
| self.update_ids[symbol] = update_id | |
| except Exception as e: | |
| self.send(Status.ERROR, symbol) | |
| raise Exception(f"Bybit Orderbook Process: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment