This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| cimport numpy as cnp | |
| from libc.stdint cimport uint64_t as u64 | |
| ctypedef fused uint_t: | |
| cnp.uint8_t | |
| cnp.uint16_t | |
| cnp.uint32_t | |
| cnp.uint64_t | |
| ctypedef fused int_t: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from libc.stdint cimport ( | |
| UINT8_MAX, | |
| uint8_t as u8, | |
| ) | |
| cdef class BytesRingBuffer: | |
| """A fixed-size ring buffer for bytes objects""" | |
| def __cinit__(self, bint only_insert_unique=False, u8 max_size=UINT8_MAX): | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| cimport numpy as cnp | |
| from libc.stdint cimport uint64_t, int64_t | |
| cdef class RingBufferOneDim: | |
| """ | |
| A 1-dimensional fixed-size circular buffer for floats/doubles. | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from libc.stdint cimport uint32_t | |
| from mm_toolbox.time.time cimport time_s | |
| from .engine cimport OrderAction | |
| cdef class RateLimitCounter: | |
| cdef: | |
| uint32_t tokens_per_sec | |
| uint32_t tokens_remaining |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| cimport numpy as np | |
| from libc.stdint cimport uint16_t | |
| from libc.stdlib cimport strtod | |
| from cpython.unicode cimport PyUnicode_AsUTF8 | |
| cpdef np.ndarray[np.double_t, ndim=2] parse_list_list_str_to_floats(list[list[str]] data): | |
| """ | |
| Parses a list of lists of strings into a NumPy array of floats. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import xxhash | |
| import asyncio | |
| import platform | |
| from collections import deque | |
| from warnings import warn as warning | |
| from typing import List, Dict, Optional | |
| from mm_toolbox.time import time_ns, time_s | |
| from .conn.raw cimport WsConnection |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| cimport numpy as np | |
| from libc.stdlib cimport strtod | |
| from cpython.unicode cimport PyUnicode_AsUTF8 | |
| from cython cimport boundscheck, wraparound | |
| @boundscheck(False) | |
| @wraparound(False) | |
| cdef np.ndarray[np.float64_t, ndim=2] parse_list_strings_to_floats(list[list[str]] data): | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import orjson | |
| import asyncio | |
| from warnings import warn as warning | |
| from dataclasses import dataclass | |
| from aiohttp import ClientSession, ClientWebSocketResponse, WSMsgType | |
| from typing import List, Dict, Callable, Any, Optional | |
| from mm_toolbox.src.logging import Logger | |
| from mm_toolbox.src.time import time_ms | |
| from mm_toolbox.src.ringbuffer import RingBufferSingleDimFloat |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def generate_new_buffer(size: float, strength: float) -> tuple[float, float]: | |
| """ | |
| Generates a new buffer range based on the size and strength. | |
| """ | |
| return (size * (1 - strength), size * (1 + strength)) | |
| def process_data(data) -> None: | |
| """ | |
| Processes the data with varying buffer strengths and prints the results. | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import zmq | |
| import orjson | |
| from time import time_ns | |
| from abc import ABC, abstractmethod | |
| from typing import Dict | |
| from mm.frameworks.exch.base.status import Status | |
| class OrderbookHandler(ABC): |
NewerOlder