Last active
September 13, 2023 17:09
-
-
Save Mastermind-U/55a2f522e0bbb7bd67f60f8f105164b0 to your computer and use it in GitHub Desktop.
Example of asynchronous python code with deque and generators
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import deque | |
| def arange(stop): | |
| for i in range(1, stop + 1, 1): | |
| print(i, "/", stop) | |
| yield | |
| else: | |
| return i | |
| def nested_aranges(first, second): | |
| a = yield from arange(first) | |
| b = yield from arange(second) | |
| return a + b | |
| def run_until_complete(*tasks): | |
| return_values = {task: None for task in tasks} | |
| events = deque(tasks) | |
| while events: | |
| try: | |
| task = events.pop() | |
| next(task) | |
| events.appendleft(task) | |
| except StopIteration as exc: | |
| return_values[task] = exc.value | |
| return tuple(return_values.values()) | |
| r1, r2, r3 = run_until_complete(arange(3), arange(5), nested_aranges(2, 3)) | |
| print(r1, r2, r3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment