Skip to content

Instantly share code, notes, and snippets.

@x42005e1f
Created December 1, 2025 05:59
Show Gist options
  • Select an option

  • Save x42005e1f/69db2e057660b88093145520725fb2ed to your computer and use it in GitHub Desktop.

Select an option

Save x42005e1f/69db2e057660b88093145520725fb2ed to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2025 Ilya Egorov <[email protected]>
# SPDX-License-Identifier: 0BSD
from __future__ import annotations
import sys
from contextvars import ContextVar
from typing import Final
from culsans import Queue
if sys.version_info >= (3, 12): # PEP 698
from typing import override
else: # typing-extensions>=4.5.0
from typing_extensions import override
INITIAL_CAPACITY: Final[int] = 8
_itemsize_cvar: ContextVar[int] = ContextVar("_itemsize_cvar", default=-1)
class ByteQueue(Queue[bytes]):
__slots__ = (
"__data",
"__getpos",
"__putpos",
"__size",
)
__data: bytearray
__getpos: int
__putpos: int
__size: int
def __reallocate(self, newsize: int) -> None:
capacity = len(self.__data)
if newsize > capacity or capacity >> 2 > newsize:
if newsize > capacity:
newdata = bytearray(2 ** (newsize - 1).bit_length())
elif capacity == INITIAL_CAPACITY:
return
elif newsize:
newdata = bytearray(
max(
INITIAL_CAPACITY,
2 ** ((newsize - 1).bit_length() + 1),
)
)
else:
newdata = bytearray(INITIAL_CAPACITY)
data = self.__data
getpos = self.__getpos
putpos = self.__putpos
size = self.__size
if getpos < putpos:
newdata[:size] = memoryview(data)[getpos:putpos]
elif getpos > putpos:
newdata[: capacity - getpos] = memoryview(data)[getpos:]
newdata[capacity - getpos : size] = memoryview(data)[:putpos]
elif size:
newdata[:size] = data
self.__data = newdata
self.__getpos = 0
self.__putpos = size
def sync_read(
self,
count: int = -1,
block: bool = True,
timeout: float | None = None,
) -> bytes:
token = _itemsize_cvar.set(count)
try:
return self.sync_get(block, timeout)
finally:
_itemsize_cvar.reset(token)
async def async_read(self, count: int = -1) -> bytes:
token = _itemsize_cvar.set(count)
try:
return await self.async_get()
finally:
_itemsize_cvar.reset(token)
def read_nowait(self, count: int = -1) -> bytes:
token = _itemsize_cvar.set(count)
try:
return self.get_nowait()
finally:
_itemsize_cvar.reset(token)
@override
def _init(self, maxsize: int) -> None:
self.__data = bytearray(INITIAL_CAPACITY)
self.__getpos = 0
self.__putpos = 0
self.__size = 0
@override
def _qsize(self) -> int:
return self.__size
@override
def _isize(self, item: bytes) -> int:
return len(item)
@override
def _chain(self) -> bool:
return True
@override
def _put(self, item: bytes) -> None:
itemsize = len(item)
self.__reallocate(self.__size + itemsize)
data = self.__data
putpos = self.__putpos
capacity = len(data)
if putpos + itemsize >= capacity:
offset = capacity - putpos
data[putpos:] = memoryview(item)[:offset]
data[: itemsize - offset] = memoryview(item)[offset:]
self.__putpos = itemsize - offset
else:
data[putpos : putpos + itemsize] = item
self.__putpos += itemsize
self.__size += itemsize
@override
def _get(self) -> bytes:
itemsize = min(_itemsize_cvar.get(), self.__size)
if itemsize < 0:
itemsize = self.__size
data = self.__data
getpos = self.__getpos
capacity = len(data)
if getpos + itemsize >= capacity:
offset = capacity - getpos
item = (
memoryview(data)[getpos:].tobytes()
+ memoryview(data)[: itemsize - offset].tobytes()
)
self.__getpos = itemsize - offset
else:
item = memoryview(data)[getpos : getpos + itemsize].tobytes()
self.__getpos += itemsize
self.__size -= itemsize
self.__reallocate(self.__size)
return item
@override
def _peek(self) -> bytes:
data = self.__data
getpos = self.__getpos
putpos = self.__putpos
size = self.__size
if getpos < putpos:
return memoryview(data)[getpos:putpos].tobytes()
if getpos > putpos:
return (
memoryview(data)[getpos:].tobytes()
+ memoryview(data)[:putpos].tobytes()
)
if size:
return bytes(data)
return b""
@override
def _peekable(self) -> bool:
return True
@override
def _clear(self) -> None:
self.__data.clear()
self.__getpos = 0
self.__putpos = 0
self.__size = 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment