Skip to content

Instantly share code, notes, and snippets.

@A5rocks
Last active August 24, 2020 02:27
Show Gist options
  • Select an option

  • Save A5rocks/1e6f27ea98b11676cc70c2c3ab7aa187 to your computer and use it in GitHub Desktop.

Select an option

Save A5rocks/1e6f27ea98b11676cc70c2c3ab7aa187 to your computer and use it in GitHub Desktop.
Dispatch with Trio
# pretty basic backpressured pubsub
import trio
import typing
class PubSub:
topics_to_channels: typing.MutableMapping[
str,
typing.List[trio.MemorySendChannel]
]
def __init__(self) -> None:
self.topics_to_channels = {}
def _get_topic(self, topic: str) -> typing.List[trio.MemorySendChannel]:
return self.topics_to_channels.get(topic, [])
def subscribe(self, topic: str) -> trio.MemoryReceiveChannel:
if self.topics_to_channels.get(topic) is None:
self.topics_to_channels[topic] = []
send, recv = trio.open_memory_channel(0)
self.topics_to_channels[topic].append(send)
return recv
async def publish(self, topic: str, message: typing.Any) -> float:
chans = self._get_topic(topic)
res = 0.0
for chan in chans:
try:
chan.send_nowait(message)
await trio.lowlevel.checkpoint()
except trio.WouldBlock:
await trio.lowlevel.checkpoint()
# the backpressure thing is full
res += 1 / len(chans)
except trio.ClosedResourceError:
await trio.lowlevel.checkpoint()
# wtf why are we trying to send to a channel that is closed
self.topics_to_channels[topic].remove(chan)
except trio.BrokenResourceError:
# there's no receive streams!
await chan.aclose()
self.topics_to_channels[topic].remove(chan)
# we return the "backpressure rating":tm::tm:
return res
async def consumer(topic: str, identity: int, pubsub: PubSub) -> None:
channel = pubsub.subscribe(topic)
async for message in channel:
print(f'got a new message in consumer #{identity}! {message!r}')
async def main() -> None:
async with trio.open_nursery() as nursery:
pubsub = PubSub()
for i in range(5):
nursery.start_soon(consumer, 'topic_thing', i, pubsub)
# let's let the consumers subscribe
await trio.sleep(1)
await pubsub.publish('topic_thing', 'Hello, world!')
trio.run(main)
@A5rocks
Copy link
Author

A5rocks commented Aug 24, 2020

I really underestimated async's overhead. Here's the results for 2 consumers and pypy with await trio.lowlevel.checkpoint() in PubSub.publish commented out:

13.19s user 0.08s system 91% cpu 14.496 total
12.81s user 0.05s system 92% cpu 13.891 total
12.50s user 0.06s system 92% cpu 13.581 total
12.57s user 0.07s system 92% cpu 13.672 total
12.53s user 0.06s system 92% cpu 13.614 total
12.71s user 0.06s system 92% cpu 13.796 total
12.47s user 0.05s system 92% cpu 13.541 total
12.38s user 0.07s system 92% cpu 13.475 total
12.42s user 0.06s system 92% cpu 13.505 total
12.63s user 0.02s system 92% cpu 13.674 total

And while this makes for a not trio-nic (?) API, we can move the closing of the writing channel into a desubscribe call and then allow BrokenResourceException to propagate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment