Created
January 22, 2026 22:51
-
-
Save gnachman/68028b40a5e5c750b03be05bc9f58ecf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class BusyList: | |
| def __init__(self): | |
| self._producers = [] | |
| def remove(self, producer): | |
| self._producers.remove(producer) | |
| def add(self, producer): | |
| if producer not in self._producers: | |
| self._producers.append(producer) | |
| def take(self): | |
| if not self._producers: | |
| return None | |
| producer = self._producers[0] | |
| del self._producers[0] | |
| return producer | |
| class WorkQueue: | |
| """Methods in this class must all be called on the same thread. That ensures mark_ready() executes atomically from the POV of a producer.""" | |
| def __init__(self, token_executor): | |
| self._tokens = [] | |
| self._busy_list = BusyList() | |
| self._outstanding = 0 | |
| self._maximum = 1000 | |
| self._token_executor = token_executor | |
| def mark_ready(self, producer): | |
| """Adds a producer to the busy list if it isn't already on it. It is added to the end so that it cannot starve others.""" | |
| self._busy_list.add(producer) | |
| self._dequeue() | |
| def _dequeue(self): | |
| while self._outstanding < self._maximum: | |
| # Admit next producer | |
| producer = self._busy_list.take() | |
| if not producer: | |
| return | |
| tokens = producer.dequeue() | |
| if producer.has_work(): | |
| # They have more to do but they go to the back of the line. | |
| self._busy_list.add(producer) | |
| self._outstanding += len(tokens) | |
| self._token_executor.execute_in_background(producer, tokens) | |
| def execution_complete(self, producer, count): | |
| """Called by token executor when work completes.""" | |
| self._outstanding -= count | |
| producer.consume(count) | |
| self._dequeue() | |
| class Producer: | |
| def __init__(self, dispatch_source, work_queue): | |
| self._dispatch_source = dispatch_source | |
| self._work_queue = work_queue | |
| self._bytes = [] | |
| self._outstanding = 0 | |
| self._maximum = 1000 | |
| dispatch_source.read_callback = self._read | |
| def _read(self, bytes): | |
| """Called when there are bytes to read. Assume no EOF for simplicity during prototyping""" | |
| self._outstanding += len(bytes) | |
| if self._outstanding >= _maximum: | |
| # Stop reading until this block of input is processed by token executor. | |
| self._dispatch_source.suspend() | |
| self._bytes.append([bytes]) | |
| self._work_queue.mark_ready(self) | |
| def dequeue(self): | |
| """Called by work queue""" | |
| bytes = self._bytes[0] | |
| del self._bytes[0] | |
| return bytes | |
| def consume(self, count): | |
| self._outstanding -= count | |
| if self._outstanding < self._maximum and self._outstanding + count >= self._maximum: | |
| # We can start reading again | |
| self._dispatch_source.resume() | |
| def has_work(self): | |
| return len(self._bytes) > 0 | |
| class TokenExecutor: | |
| def __init__(self, work_queue): | |
| self._work_queue = work_queue | |
| def execute_in_background(self, producer, tokens): | |
| # Not shown. Calls execution_complete on this thread when done. | |
| def execution_complete(self, producer, tokens): | |
| """Called when work completed in other thread""" | |
| self._work_queue.execution_complete(producer, tokens) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment