Skip to content

Instantly share code, notes, and snippets.

@gnachman
Created January 22, 2026 22:51
Show Gist options
  • Select an option

  • Save gnachman/68028b40a5e5c750b03be05bc9f58ecf to your computer and use it in GitHub Desktop.

Select an option

Save gnachman/68028b40a5e5c750b03be05bc9f58ecf to your computer and use it in GitHub Desktop.
class BusyList:
def __init__(self):
self._producers = []
def remove(self, producer):
self._producers.remove(producer)
def add(self, producer):
if producer not in self._producers:
self._producers.append(producer)
def take(self):
if not self._producers:
return None
producer = self._producers[0]
del self._producers[0]
return producer
class WorkQueue:
"""Methods in this class must all be called on the same thread. That ensures mark_ready() executes atomically from the POV of a producer."""
def __init__(self, token_executor):
self._tokens = []
self._busy_list = BusyList()
self._outstanding = 0
self._maximum = 1000
self._token_executor = token_executor
def mark_ready(self, producer):
"""Adds a producer to the busy list if it isn't already on it. It is added to the end so that it cannot starve others."""
self._busy_list.add(producer)
self._dequeue()
def _dequeue(self):
while self._outstanding < self._maximum:
# Admit next producer
producer = self._busy_list.take()
if not producer:
return
tokens = producer.dequeue()
if producer.has_work():
# They have more to do but they go to the back of the line.
self._busy_list.add(producer)
self._outstanding += len(tokens)
self._token_executor.execute_in_background(producer, tokens)
def execution_complete(self, producer, count):
"""Called by token executor when work completes."""
self._outstanding -= count
producer.consume(count)
self._dequeue()
class Producer:
def __init__(self, dispatch_source, work_queue):
self._dispatch_source = dispatch_source
self._work_queue = work_queue
self._bytes = []
self._outstanding = 0
self._maximum = 1000
dispatch_source.read_callback = self._read
def _read(self, bytes):
"""Called when there are bytes to read. Assume no EOF for simplicity during prototyping"""
self._outstanding += len(bytes)
if self._outstanding >= _maximum:
# Stop reading until this block of input is processed by token executor.
self._dispatch_source.suspend()
self._bytes.append([bytes])
self._work_queue.mark_ready(self)
def dequeue(self):
"""Called by work queue"""
bytes = self._bytes[0]
del self._bytes[0]
return bytes
def consume(self, count):
self._outstanding -= count
if self._outstanding < self._maximum and self._outstanding + count >= self._maximum:
# We can start reading again
self._dispatch_source.resume()
def has_work(self):
return len(self._bytes) > 0
class TokenExecutor:
def __init__(self, work_queue):
self._work_queue = work_queue
def execute_in_background(self, producer, tokens):
# Not shown. Calls execution_complete on this thread when done.
def execution_complete(self, producer, tokens):
"""Called when work completed in other thread"""
self._work_queue.execution_complete(producer, tokens)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment