Skip to content

Instantly share code, notes, and snippets.

@thegamecracks
Last active September 17, 2024 02:12
Show Gist options
  • Select an option

  • Save thegamecracks/898f51cf20e3adae174a6fbbb0a0d3cc to your computer and use it in GitHub Desktop.

Select an option

Save thegamecracks/898f51cf20e3adae174a6fbbb0a0d3cc to your computer and use it in GitHub Desktop.
Subclassing asyncio event loop to run certain tasks when idle
# HACK: this proof of concept is not robust, don't use this in production!
# Minimum Python version: 3.11
import asyncio
from contextvars import Context
from typing import (
Any,
Callable,
Coroutine,
Generator,
Generic,
Protocol,
TypeVar,
TypeVarTuple,
)
T = TypeVar("T")
Ts = TypeVarTuple("Ts")
class IdleEventLoop(asyncio.BaseEventLoop):
"""An event loop adding support for idle callbacks that run
when the event loop is about to go idle.
Idling is defined as any iteration where the event loop plans to sleep,
more specifically when there are no handles to run immediately.
In the case of a typical asyncio application, this usually happens
when all running coroutines have to wait on futures.
This class cannot be used by itself and should be subclassed with another
BaseEventLoop implementation mixed in.
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._selector = IdleSelector(self, self._selector)
self._idle: list[asyncio.Handle] = []
def call_idle(
self,
callback: Callable[[*Ts], object],
*args: *Ts,
context: Context | None = None,
) -> asyncio.Handle:
# Simplified from:
# https://github.com/python/cpython/blob/v3.12.6/Lib/asyncio/base_events.py#L785-L802
handle = asyncio.Handle(callback, args, self, context)
self._idle.append(handle)
return handle
def _move_idle_to_ready(self) -> None:
self._ready.extend(self._idle)
self._idle.clear()
class Selector(Protocol):
def select(self, timeout: float | None = None) -> Any: ...
class IdleSelector:
"""A proxy for another selector that queues idle callbacks to run
once the event loop is about to go idle.
An arguably better implementation is to override BaseEventLoop._run_once()
and handle idle callbacks there.
"""
def __init__(self, loop: IdleEventLoop, selector: Selector) -> None:
self.loop = loop
self.selector = selector
self._i = 0
def __getattr__(self, name: str) -> Any:
return getattr(self.selector, name)
def select(self, timeout: float | None = None) -> Any:
print(f"Iteration {self._i}, {timeout = }")
self._i += 1
if timeout is not None and timeout <= 0:
return self.selector.select(timeout)
if len(self.loop._idle) > 0:
print(" Event loop about to idle, scheduling idle callbacks")
self.loop._move_idle_to_ready()
return self.selector.select(0)
# No idle callbacks, sleep as usual
return self.selector.select(timeout)
# NOTE: for non-Windows systems, try this with SelectorEventLoop instead
class IdleProactorEventLoop(IdleEventLoop, asyncio.ProactorEventLoop): ...
class IdleProactorEventLoopPolicy(asyncio.WindowsProactorEventLoopPolicy):
_loop_factory = IdleProactorEventLoop
async def run_when_idle(coro: Coroutine[Any, Any, Any]) -> Coroutine[Any, Any, Any]:
"""Wraps a coroutine and runs it only when the event loop is idle."""
loop = asyncio.get_running_loop()
if not isinstance(loop, IdleEventLoop):
raise RuntimeError("Must use idle event loop with create_idle_task()")
# Derived from https://gist.github.com/thegamecracks/2d92c2f376018fe990f6101c9e94f343
# See that for more details
send_val = None
exc = None
while True:
if exc is None and len(loop._ready) > 0:
# Instead of using an event like the original snippet,
# we create a future and set it when the event loop is idle.
print(f" {coro.__name__}() waiting for idle")
fut = loop.create_future()
loop.call_idle(fut.set_result, None)
try:
await fut
except BaseException as e:
exc = e
try:
if exc is None:
yield_aw = coro.send(send_val)
else:
yield_aw = coro.throw(exc)
exc = None
except StopIteration as e:
return e.value
if yield_aw is None:
await asyncio.sleep(0)
continue
try:
send_val = await reyield_future(yield_aw)
except BaseException as e:
exc = e
class reyield_future(Generic[T]):
"""Allows yielding a future that was already awaited."""
def __init__(self, fut: asyncio.Future[T]) -> None:
self.fut = fut
def __await__(self) -> Generator[asyncio.Future[T], None, T]:
if not self.fut._asyncio_future_blocking:
raise RuntimeError("Future must already be awaited")
yield self.fut
return self.fut.result()
# Finally, let's run some coroutines
async def main() -> None:
print(" main() started")
async with asyncio.TaskGroup() as tg:
tg.create_task(do_stuff())
tg.create_task(sleep())
tg.create_task(run_when_idle(idle_func()))
print(" main() done")
async def do_stuff():
print(" do_stuff() started")
for i in range(3):
print(" doing stuff", i)
await asyncio.sleep(0) # wait until next iteration
print(" do_stuff() done")
async def sleep():
print(" sleep() started")
await asyncio.sleep(3) # invokes loop.call_later() which sets a timeout
print(" sleep() done")
async def idle_func():
print(" idle_func() started")
for i in range(3):
print(" doing idle stuff", i)
await asyncio.sleep(0)
print(" idle_func() done")
asyncio.set_event_loop_policy(IdleProactorEventLoopPolicy())
asyncio.run(main())
Iteration 0, timeout = 0
main() started
Iteration 1, timeout = 0
do_stuff() started
doing stuff 0
sleep() started
idle_func() waiting for idle
Iteration 2, timeout = 0
doing stuff 1
Iteration 3, timeout = 0
doing stuff 2
Iteration 4, timeout = 0
do_stuff() done
Iteration 5, timeout = 0
Iteration 6, timeout = 3.0
Event loop about to idle, scheduling idle callbacks
Iteration 7, timeout = 0
idle_func() started
doing idle stuff 0
Iteration 8, timeout = 0
doing idle stuff 1
Iteration 9, timeout = 0
doing idle stuff 2
Iteration 10, timeout = 0
idle_func() done
Iteration 11, timeout = 0
Iteration 12, timeout = 3.0
Iteration 13, timeout = 0
sleep() done
Iteration 14, timeout = 0
Iteration 15, timeout = 0
main() done
Iteration 16, timeout = 0
Iteration 17, timeout = 0
Iteration 18, timeout = 0
Iteration 19, timeout = 0
Iteration 20, timeout = 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment