Created
December 1, 2025 05:59
-
-
Save x42005e1f/69db2e057660b88093145520725fb2ed to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # SPDX-FileCopyrightText: 2025 Ilya Egorov <[email protected]> | |
| # SPDX-License-Identifier: 0BSD | |
| from __future__ import annotations | |
| import sys | |
| from contextvars import ContextVar | |
| from typing import Final | |
| from culsans import Queue | |
| if sys.version_info >= (3, 12): # PEP 698 | |
| from typing import override | |
| else: # typing-extensions>=4.5.0 | |
| from typing_extensions import override | |
| INITIAL_CAPACITY: Final[int] = 8 | |
| _itemsize_cvar: ContextVar[int] = ContextVar("_itemsize_cvar", default=-1) | |
| class ByteQueue(Queue[bytes]): | |
| __slots__ = ( | |
| "__data", | |
| "__getpos", | |
| "__putpos", | |
| "__size", | |
| ) | |
| __data: bytearray | |
| __getpos: int | |
| __putpos: int | |
| __size: int | |
| def __reallocate(self, newsize: int) -> None: | |
| capacity = len(self.__data) | |
| if newsize > capacity or capacity >> 2 > newsize: | |
| if newsize > capacity: | |
| newdata = bytearray(2 ** (newsize - 1).bit_length()) | |
| elif capacity == INITIAL_CAPACITY: | |
| return | |
| elif newsize: | |
| newdata = bytearray( | |
| max( | |
| INITIAL_CAPACITY, | |
| 2 ** ((newsize - 1).bit_length() + 1), | |
| ) | |
| ) | |
| else: | |
| newdata = bytearray(INITIAL_CAPACITY) | |
| data = self.__data | |
| getpos = self.__getpos | |
| putpos = self.__putpos | |
| size = self.__size | |
| if getpos < putpos: | |
| newdata[:size] = memoryview(data)[getpos:putpos] | |
| elif getpos > putpos: | |
| newdata[: capacity - getpos] = memoryview(data)[getpos:] | |
| newdata[capacity - getpos : size] = memoryview(data)[:putpos] | |
| elif size: | |
| newdata[:size] = data | |
| self.__data = newdata | |
| self.__getpos = 0 | |
| self.__putpos = size | |
| def sync_read( | |
| self, | |
| count: int = -1, | |
| block: bool = True, | |
| timeout: float | None = None, | |
| ) -> bytes: | |
| token = _itemsize_cvar.set(count) | |
| try: | |
| return self.sync_get(block, timeout) | |
| finally: | |
| _itemsize_cvar.reset(token) | |
| async def async_read(self, count: int = -1) -> bytes: | |
| token = _itemsize_cvar.set(count) | |
| try: | |
| return await self.async_get() | |
| finally: | |
| _itemsize_cvar.reset(token) | |
| def read_nowait(self, count: int = -1) -> bytes: | |
| token = _itemsize_cvar.set(count) | |
| try: | |
| return self.get_nowait() | |
| finally: | |
| _itemsize_cvar.reset(token) | |
| @override | |
| def _init(self, maxsize: int) -> None: | |
| self.__data = bytearray(INITIAL_CAPACITY) | |
| self.__getpos = 0 | |
| self.__putpos = 0 | |
| self.__size = 0 | |
| @override | |
| def _qsize(self) -> int: | |
| return self.__size | |
| @override | |
| def _isize(self, item: bytes) -> int: | |
| return len(item) | |
| @override | |
| def _chain(self) -> bool: | |
| return True | |
| @override | |
| def _put(self, item: bytes) -> None: | |
| itemsize = len(item) | |
| self.__reallocate(self.__size + itemsize) | |
| data = self.__data | |
| putpos = self.__putpos | |
| capacity = len(data) | |
| if putpos + itemsize >= capacity: | |
| offset = capacity - putpos | |
| data[putpos:] = memoryview(item)[:offset] | |
| data[: itemsize - offset] = memoryview(item)[offset:] | |
| self.__putpos = itemsize - offset | |
| else: | |
| data[putpos : putpos + itemsize] = item | |
| self.__putpos += itemsize | |
| self.__size += itemsize | |
| @override | |
| def _get(self) -> bytes: | |
| itemsize = min(_itemsize_cvar.get(), self.__size) | |
| if itemsize < 0: | |
| itemsize = self.__size | |
| data = self.__data | |
| getpos = self.__getpos | |
| capacity = len(data) | |
| if getpos + itemsize >= capacity: | |
| offset = capacity - getpos | |
| item = ( | |
| memoryview(data)[getpos:].tobytes() | |
| + memoryview(data)[: itemsize - offset].tobytes() | |
| ) | |
| self.__getpos = itemsize - offset | |
| else: | |
| item = memoryview(data)[getpos : getpos + itemsize].tobytes() | |
| self.__getpos += itemsize | |
| self.__size -= itemsize | |
| self.__reallocate(self.__size) | |
| return item | |
| @override | |
| def _peek(self) -> bytes: | |
| data = self.__data | |
| getpos = self.__getpos | |
| putpos = self.__putpos | |
| size = self.__size | |
| if getpos < putpos: | |
| return memoryview(data)[getpos:putpos].tobytes() | |
| if getpos > putpos: | |
| return ( | |
| memoryview(data)[getpos:].tobytes() | |
| + memoryview(data)[:putpos].tobytes() | |
| ) | |
| if size: | |
| return bytes(data) | |
| return b"" | |
| @override | |
| def _peekable(self) -> bool: | |
| return True | |
| @override | |
| def _clear(self) -> None: | |
| self.__data.clear() | |
| self.__getpos = 0 | |
| self.__putpos = 0 | |
| self.__size = 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment