Last active
August 9, 2025 12:18
-
-
Save beatzxbt/45ddf15a788cd8624988e423065adb6e to your computer and use it in GitHub Desktop.
better async iterator for websocket ringbuffer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| from libc.stdint cimport ( | |
| UINT8_MAX, | |
| uint8_t as u8, | |
| ) | |
| cdef class BytesRingBuffer: | |
| """A fixed-size ring buffer for bytes objects""" | |
| def __cinit__(self, bint only_insert_unique=False, u8 max_size=UINT8_MAX): | |
| """ | |
| Initializes the BytesRingBuffer with a specified maximum size. | |
| Args: | |
| only_insert_unique (bint): Whether to only insert unique items. | |
| max_size (u8): The maximum number of items the buffer can hold. | |
| """ | |
| self._only_insert_unique = only_insert_unique | |
| self._max_size: u8 = max_size | |
| if self._max_size == 0: | |
| raise ValueError("max_size must be greater than 0") | |
| self._head: u8 = 0 | |
| self._tail: u8 = 0 | |
| self._size: u8 = 0 | |
| self._buffer: list[bytes] = [b""] * max_size | |
| self._buffer_not_empty_event = asyncio.Event() | |
| cdef inline bint is_empty(self): | |
| return self._size == 0 | |
| cdef void insert(self, bytes item): | |
| """Inserts an item into the buffer.""" | |
| # This is inefficient compared to having .insert_if_unique(), but | |
| # makes it far cleaner to integrate into the RawWsConnection class. | |
| if self._only_insert_unique and self.contains(item): | |
| return | |
| self._buffer[self._tail] = item | |
| self._tail = (self._tail + 1) % self._max_size | |
| if self._size < self._max_size: | |
| self._size += 1 | |
| else: | |
| self._head = (self._head + 1) % self._max_size | |
| self._buffer_not_empty_event.set() | |
| cdef bint insert_if_unique(self, bytes item): | |
| """Inserts an item into the buffer if it is unique. | |
| Returns: | |
| True if the item was inserted, False if it was already in the buffer. | |
| """ | |
| if not self.contains(item): | |
| self.insert(item) | |
| return True | |
| return False | |
| cdef bint contains(self, bytes item): | |
| """Checks if the item exists in the buffer, searching from newest to oldest.""" | |
| if self.is_empty(): | |
| return False | |
| cdef u8 idx, count = 0 | |
| idx = (self._head - 1 + self._max_size) % self._max_size | |
| while count < self._size: | |
| if self._buffer[idx] == item: | |
| return True | |
| idx = (idx - 1 + self._max_size) % self._max_size | |
| count += 1 | |
| return False | |
| cdef bytes consume(self): | |
| """Pulls the oldest item in the buffer.""" | |
| if self.is_empty(): | |
| return b"" | |
| cdef bytes item = self._buffer[self._head] | |
| self._head = (self._head + 1) % self._max_size | |
| self._size -= 1 | |
| if self.is_empty(): | |
| self._buffer_not_empty_event.clear() | |
| return item | |
| cpdef bytes consume_py(self): | |
| """ | |
| Mirror of the C-level consume method, but as a python function. | |
| For use in the '__anext__' method. | |
| """ | |
| return self.consume() | |
| async def consume_async(self): | |
| """Waits for a message to be available and returns it.""" | |
| await self._buffer_not_empty_event.wait() | |
| return self.consume() | |
| cdef bytes peek_oldest(self): | |
| """Peeks the oldest item in the buffer.""" | |
| if self.is_empty(): | |
| return b"" | |
| return self._buffer[self._head] | |
| cdef bytes peek_newest(self): | |
| """Peeks the newest item in the buffer.""" | |
| if self.is_empty(): | |
| return b"" | |
| return self._buffer[(self._tail - 1 + self._max_size) % self._max_size] | |
| cdef void clear(self): | |
| """Clears the buffer and resets internal state.""" | |
| self._head = 0 | |
| self._tail = 0 | |
| self._size = 0 | |
| self._buffer = [b""] * self._max_size | |
| def __aiter__(self) -> "BytesRingBuffer": | |
| """ | |
| Async iterator entry point. | |
| Example usage: | |
| ```python | |
| async for msg in ringbuffer: | |
| print(msg) | |
| ``` | |
| """ | |
| return self | |
| async def __anext__(self) -> bytes: | |
| """Yields the next message when one becomes available.""" | |
| return await self.consume_async() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment