Skip to content

Instantly share code, notes, and snippets.

View beatzxbt's full-sized avatar
🎯
Focusing

beatzxbt

🎯
Focusing
View GitHub Profile
@beatzxbt
beatzxbt / numeric.pxd
Created August 19, 2025 19:33
cython generics POC
cimport numpy as cnp
from libc.stdint cimport uint64_t as u64
ctypedef fused uint_t:
cnp.uint8_t
cnp.uint16_t
cnp.uint32_t
cnp.uint64_t
ctypedef fused int_t:
@beatzxbt
beatzxbt / utils.pyx
Last active August 9, 2025 12:18
better async iterator for websocket ringbuffer
import asyncio
from libc.stdint cimport (
UINT8_MAX,
uint8_t as u8,
)
cdef class BytesRingBuffer:
"""A fixed-size ring buffer for bytes objects"""
def __cinit__(self, bint only_insert_unique=False, u8 max_size=UINT8_MAX):
"""
@beatzxbt
beatzxbt / ringbuffer.pyx
Created March 6, 2025 20:01
1d f64 ringbuffer.pyx
import numpy as np
cimport numpy as cnp
from libc.stdint cimport uint64_t, int64_t
cdef class RingBufferOneDim:
"""
A 1-dimensional fixed-size circular buffer for floats/doubles.
"""
@beatzxbt
beatzxbt / ratelimiter.pyx
Created March 2, 2025 16:00
basic rate limiter
from libc.stdint cimport uint32_t
from mm_toolbox.time.time cimport time_s
from .engine cimport OrderAction
cdef class RateLimitCounter:
cdef:
uint32_t tokens_per_sec
uint32_t tokens_remaining
@beatzxbt
beatzxbt / tools.pyx
Created December 31, 2024 21:57
uber fast list[list[str]] -> np.ndarray[np.double]
import numpy as np
cimport numpy as np
from libc.stdint cimport uint16_t
from libc.stdlib cimport strtod
from cpython.unicode cimport PyUnicode_AsUTF8
cpdef np.ndarray[np.double_t, ndim=2] parse_list_list_str_to_floats(list[list[str]] data):
"""
Parses a list of lists of strings into a NumPy array of floats.
import xxhash
import asyncio
import platform
from collections import deque
from warnings import warn as warning
from typing import List, Dict, Optional
from mm_toolbox.time import time_ns, time_s
from .conn.raw cimport WsConnection
@beatzxbt
beatzxbt / l2dataingresstools.pyx
Created September 24, 2024 11:30
high performance str -> float converter
import numpy as np
cimport numpy as np
from libc.stdlib cimport strtod
from cpython.unicode cimport PyUnicode_AsUTF8
from cython cimport boundscheck, wraparound
@boundscheck(False)
@wraparound(False)
cdef np.ndarray[np.float64_t, ndim=2] parse_list_strings_to_floats(list[list[str]] data):
"""
@beatzxbt
beatzxbt / wspool.py
Created August 15, 2024 08:43
fast ws pool impl
import orjson
import asyncio
from warnings import warn as warning
from dataclasses import dataclass
from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType
from typing import List, Dict, Callable, Any, Optional
from mm_toolbox.src.logging import Logger
from mm_toolbox.src.time import time_ms
from mm_toolbox.src.ringbuffer import RingBufferSingleDimFloat
def generate_new_buffer(size: float, strength: float) -> tuple[float, float]:
"""
Generates a new buffer range based on the size and strength.
"""
return (size * (1 - strength), size * (1 + strength))
def process_data(data) -> None:
"""
Processes the data with varying buffer strengths and prints the results.
"""
@beatzxbt
beatzxbt / bybit_orderbook_handler.py
Created May 28, 2024 11:12
orderbook handler w/zmq support
import zmq
import orjson
from time import time_ns
from abc import ABC, abstractmethod
from typing import Dict
from mm.frameworks.exch.base.status import Status
class OrderbookHandler(ABC):