Last active
August 24, 2020 02:27
-
-
Save A5rocks/1e6f27ea98b11676cc70c2c3ab7aa187 to your computer and use it in GitHub Desktop.
Dispatch with Trio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # pretty basic backpressured pubsub | |
| import trio | |
| import typing | |
| class PubSub: | |
| topics_to_channels: typing.MutableMapping[ | |
| str, | |
| typing.List[trio.MemorySendChannel] | |
| ] | |
| def __init__(self) -> None: | |
| self.topics_to_channels = {} | |
| def _get_topic(self, topic: str) -> typing.List[trio.MemorySendChannel]: | |
| return self.topics_to_channels.get(topic, []) | |
| def subscribe(self, topic: str) -> trio.MemoryReceiveChannel: | |
| if self.topics_to_channels.get(topic) is None: | |
| self.topics_to_channels[topic] = [] | |
| send, recv = trio.open_memory_channel(0) | |
| self.topics_to_channels[topic].append(send) | |
| return recv | |
| async def publish(self, topic: str, message: typing.Any) -> float: | |
| chans = self._get_topic(topic) | |
| res = 0.0 | |
| for chan in chans: | |
| try: | |
| chan.send_nowait(message) | |
| await trio.lowlevel.checkpoint() | |
| except trio.WouldBlock: | |
| await trio.lowlevel.checkpoint() | |
| # the backpressure thing is full | |
| res += 1 / len(chans) | |
| except trio.ClosedResourceError: | |
| await trio.lowlevel.checkpoint() | |
| # wtf why are we trying to send to a channel that is closed | |
| self.topics_to_channels[topic].remove(chan) | |
| except trio.BrokenResourceError: | |
| # there's no receive streams! | |
| await chan.aclose() | |
| self.topics_to_channels[topic].remove(chan) | |
| # we return the "backpressure rating":tm::tm: | |
| return res | |
| async def consumer(topic: str, identity: int, pubsub: PubSub) -> None: | |
| channel = pubsub.subscribe(topic) | |
| async for message in channel: | |
| print(f'got a new message in consumer #{identity}! {message!r}') | |
| async def main() -> None: | |
| async with trio.open_nursery() as nursery: | |
| pubsub = PubSub() | |
| for i in range(5): | |
| nursery.start_soon(consumer, 'topic_thing', i, pubsub) | |
| # let's let the consumers subscribe | |
| await trio.sleep(1) | |
| await pubsub.publish('topic_thing', 'Hello, world!') | |
| trio.run(main) |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I really underestimated async's overhead. Here's the results for 2 consumers and
pypywithawait trio.lowlevel.checkpoint()inPubSub.publishcommented out:And while this makes for a not trio-nic (?) API, we can move the closing of the writing channel into a
desubscribecall and then allowBrokenResourceExceptionto propagate.