Last active
September 17, 2024 02:12
-
-
Save thegamecracks/898f51cf20e3adae174a6fbbb0a0d3cc to your computer and use it in GitHub Desktop.
Subclassing asyncio event loop to run certain tasks when idle
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # HACK: this proof of concept is not robust, don't use this in production! | |
| # Minimum Python version: 3.11 | |
| import asyncio | |
| from contextvars import Context | |
| from typing import ( | |
| Any, | |
| Callable, | |
| Coroutine, | |
| Generator, | |
| Generic, | |
| Protocol, | |
| TypeVar, | |
| TypeVarTuple, | |
| ) | |
| T = TypeVar("T") | |
| Ts = TypeVarTuple("Ts") | |
| class IdleEventLoop(asyncio.BaseEventLoop): | |
| """An event loop adding support for idle callbacks that run | |
| when the event loop is about to go idle. | |
| Idling is defined as any iteration where the event loop plans to sleep, | |
| more specifically when there are no handles to run immediately. | |
| In the case of a typical asyncio application, this usually happens | |
| when all running coroutines have to wait on futures. | |
| This class cannot be used by itself and should be subclassed with another | |
| BaseEventLoop implementation mixed in. | |
| """ | |
| def __init__(self, *args, **kwargs) -> None: | |
| super().__init__(*args, **kwargs) | |
| self._selector = IdleSelector(self, self._selector) | |
| self._idle: list[asyncio.Handle] = [] | |
| def call_idle( | |
| self, | |
| callback: Callable[[*Ts], object], | |
| *args: *Ts, | |
| context: Context | None = None, | |
| ) -> asyncio.Handle: | |
| # Simplified from: | |
| # https://github.com/python/cpython/blob/v3.12.6/Lib/asyncio/base_events.py#L785-L802 | |
| handle = asyncio.Handle(callback, args, self, context) | |
| self._idle.append(handle) | |
| return handle | |
| def _move_idle_to_ready(self) -> None: | |
| self._ready.extend(self._idle) | |
| self._idle.clear() | |
| class Selector(Protocol): | |
| def select(self, timeout: float | None = None) -> Any: ... | |
| class IdleSelector: | |
| """A proxy for another selector that queues idle callbacks to run | |
| once the event loop is about to go idle. | |
| An arguably better implementation is to override BaseEventLoop._run_once() | |
| and handle idle callbacks there. | |
| """ | |
| def __init__(self, loop: IdleEventLoop, selector: Selector) -> None: | |
| self.loop = loop | |
| self.selector = selector | |
| self._i = 0 | |
| def __getattr__(self, name: str) -> Any: | |
| return getattr(self.selector, name) | |
| def select(self, timeout: float | None = None) -> Any: | |
| print(f"Iteration {self._i}, {timeout = }") | |
| self._i += 1 | |
| if timeout is not None and timeout <= 0: | |
| return self.selector.select(timeout) | |
| if len(self.loop._idle) > 0: | |
| print(" Event loop about to idle, scheduling idle callbacks") | |
| self.loop._move_idle_to_ready() | |
| return self.selector.select(0) | |
| # No idle callbacks, sleep as usual | |
| return self.selector.select(timeout) | |
| # NOTE: for non-Windows systems, try this with SelectorEventLoop instead | |
| class IdleProactorEventLoop(IdleEventLoop, asyncio.ProactorEventLoop): ... | |
| class IdleProactorEventLoopPolicy(asyncio.WindowsProactorEventLoopPolicy): | |
| _loop_factory = IdleProactorEventLoop | |
| async def run_when_idle(coro: Coroutine[Any, Any, Any]) -> Coroutine[Any, Any, Any]: | |
| """Wraps a coroutine and runs it only when the event loop is idle.""" | |
| loop = asyncio.get_running_loop() | |
| if not isinstance(loop, IdleEventLoop): | |
| raise RuntimeError("Must use idle event loop with create_idle_task()") | |
| # Derived from https://gist.github.com/thegamecracks/2d92c2f376018fe990f6101c9e94f343 | |
| # See that for more details | |
| send_val = None | |
| exc = None | |
| while True: | |
| if exc is None and len(loop._ready) > 0: | |
| # Instead of using an event like the original snippet, | |
| # we create a future and set it when the event loop is idle. | |
| print(f" {coro.__name__}() waiting for idle") | |
| fut = loop.create_future() | |
| loop.call_idle(fut.set_result, None) | |
| try: | |
| await fut | |
| except BaseException as e: | |
| exc = e | |
| try: | |
| if exc is None: | |
| yield_aw = coro.send(send_val) | |
| else: | |
| yield_aw = coro.throw(exc) | |
| exc = None | |
| except StopIteration as e: | |
| return e.value | |
| if yield_aw is None: | |
| await asyncio.sleep(0) | |
| continue | |
| try: | |
| send_val = await reyield_future(yield_aw) | |
| except BaseException as e: | |
| exc = e | |
| class reyield_future(Generic[T]): | |
| """Allows yielding a future that was already awaited.""" | |
| def __init__(self, fut: asyncio.Future[T]) -> None: | |
| self.fut = fut | |
| def __await__(self) -> Generator[asyncio.Future[T], None, T]: | |
| if not self.fut._asyncio_future_blocking: | |
| raise RuntimeError("Future must already be awaited") | |
| yield self.fut | |
| return self.fut.result() | |
| # Finally, let's run some coroutines | |
| async def main() -> None: | |
| print(" main() started") | |
| async with asyncio.TaskGroup() as tg: | |
| tg.create_task(do_stuff()) | |
| tg.create_task(sleep()) | |
| tg.create_task(run_when_idle(idle_func())) | |
| print(" main() done") | |
| async def do_stuff(): | |
| print(" do_stuff() started") | |
| for i in range(3): | |
| print(" doing stuff", i) | |
| await asyncio.sleep(0) # wait until next iteration | |
| print(" do_stuff() done") | |
| async def sleep(): | |
| print(" sleep() started") | |
| await asyncio.sleep(3) # invokes loop.call_later() which sets a timeout | |
| print(" sleep() done") | |
| async def idle_func(): | |
| print(" idle_func() started") | |
| for i in range(3): | |
| print(" doing idle stuff", i) | |
| await asyncio.sleep(0) | |
| print(" idle_func() done") | |
| asyncio.set_event_loop_policy(IdleProactorEventLoopPolicy()) | |
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Iteration 0, timeout = 0 | |
| main() started | |
| Iteration 1, timeout = 0 | |
| do_stuff() started | |
| doing stuff 0 | |
| sleep() started | |
| idle_func() waiting for idle | |
| Iteration 2, timeout = 0 | |
| doing stuff 1 | |
| Iteration 3, timeout = 0 | |
| doing stuff 2 | |
| Iteration 4, timeout = 0 | |
| do_stuff() done | |
| Iteration 5, timeout = 0 | |
| Iteration 6, timeout = 3.0 | |
| Event loop about to idle, scheduling idle callbacks | |
| Iteration 7, timeout = 0 | |
| idle_func() started | |
| doing idle stuff 0 | |
| Iteration 8, timeout = 0 | |
| doing idle stuff 1 | |
| Iteration 9, timeout = 0 | |
| doing idle stuff 2 | |
| Iteration 10, timeout = 0 | |
| idle_func() done | |
| Iteration 11, timeout = 0 | |
| Iteration 12, timeout = 3.0 | |
| Iteration 13, timeout = 0 | |
| sleep() done | |
| Iteration 14, timeout = 0 | |
| Iteration 15, timeout = 0 | |
| main() done | |
| Iteration 16, timeout = 0 | |
| Iteration 17, timeout = 0 | |
| Iteration 18, timeout = 0 | |
| Iteration 19, timeout = 0 | |
| Iteration 20, timeout = 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment