Skip to content

Instantly share code, notes, and snippets.

@beatzxbt
Last active August 9, 2025 12:18
Show Gist options
  • Select an option

  • Save beatzxbt/45ddf15a788cd8624988e423065adb6e to your computer and use it in GitHub Desktop.

Select an option

Save beatzxbt/45ddf15a788cd8624988e423065adb6e to your computer and use it in GitHub Desktop.
better async iterator for websocket ringbuffer
import asyncio
from libc.stdint cimport (
UINT8_MAX,
uint8_t as u8,
)
cdef class BytesRingBuffer:
"""A fixed-size ring buffer for bytes objects"""
def __cinit__(self, bint only_insert_unique=False, u8 max_size=UINT8_MAX):
"""
Initializes the BytesRingBuffer with a specified maximum size.
Args:
only_insert_unique (bint): Whether to only insert unique items.
max_size (u8): The maximum number of items the buffer can hold.
"""
self._only_insert_unique = only_insert_unique
self._max_size: u8 = max_size
if self._max_size == 0:
raise ValueError("max_size must be greater than 0")
self._head: u8 = 0
self._tail: u8 = 0
self._size: u8 = 0
self._buffer: list[bytes] = [b""] * max_size
self._buffer_not_empty_event = asyncio.Event()
cdef inline bint is_empty(self):
return self._size == 0
cdef void insert(self, bytes item):
"""Inserts an item into the buffer."""
# This is inefficient compared to having .insert_if_unique(), but
# makes it far cleaner to integrate into the RawWsConnection class.
if self._only_insert_unique and self.contains(item):
return
self._buffer[self._tail] = item
self._tail = (self._tail + 1) % self._max_size
if self._size < self._max_size:
self._size += 1
else:
self._head = (self._head + 1) % self._max_size
self._buffer_not_empty_event.set()
cdef bint insert_if_unique(self, bytes item):
"""Inserts an item into the buffer if it is unique.
Returns:
True if the item was inserted, False if it was already in the buffer.
"""
if not self.contains(item):
self.insert(item)
return True
return False
cdef bint contains(self, bytes item):
"""Checks if the item exists in the buffer, searching from newest to oldest."""
if self.is_empty():
return False
cdef u8 idx, count = 0
idx = (self._head - 1 + self._max_size) % self._max_size
while count < self._size:
if self._buffer[idx] == item:
return True
idx = (idx - 1 + self._max_size) % self._max_size
count += 1
return False
cdef bytes consume(self):
"""Pulls the oldest item in the buffer."""
if self.is_empty():
return b""
cdef bytes item = self._buffer[self._head]
self._head = (self._head + 1) % self._max_size
self._size -= 1
if self.is_empty():
self._buffer_not_empty_event.clear()
return item
cpdef bytes consume_py(self):
"""
Mirror of the C-level consume method, but as a python function.
For use in the '__anext__' method.
"""
return self.consume()
async def consume_async(self):
"""Waits for a message to be available and returns it."""
await self._buffer_not_empty_event.wait()
return self.consume()
cdef bytes peek_oldest(self):
"""Peeks the oldest item in the buffer."""
if self.is_empty():
return b""
return self._buffer[self._head]
cdef bytes peek_newest(self):
"""Peeks the newest item in the buffer."""
if self.is_empty():
return b""
return self._buffer[(self._tail - 1 + self._max_size) % self._max_size]
cdef void clear(self):
"""Clears the buffer and resets internal state."""
self._head = 0
self._tail = 0
self._size = 0
self._buffer = [b""] * self._max_size
def __aiter__(self) -> "BytesRingBuffer":
"""
Async iterator entry point.
Example usage:
```python
async for msg in ringbuffer:
print(msg)
```
"""
return self
async def __anext__(self) -> bytes:
"""Yields the next message when one becomes available."""
return await self.consume_async()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment