Created
February 28, 2026 06:49
-
-
Save graingert/9dc56c649cef82c4cc21a5c24b8d0dcf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @app.get("/news-and-weather") | |
| @contextlib.asynccontextmanager | |
| async def news_and_weather() -> AsyncGenerator[MemoryObjectRecieveStream[bytes]]: | |
| async with anyio.create_task_group() as tg: | |
| tx, rx = anyio.create_memory_object_stream[bytes]() | |
| with tx, rx: | |
| tg.start_soon(ws_stream, "ws://example.com/news", tx.clone()) | |
| tg.start_soon(ws_stream, "ws://example.com/weather", tx.clone()) | |
| tx.close() | |
| yield rx |
Just for completeness, in case anyone else comes here checking for this discussion that we finished on chat, here's the version that should work fine.
The trick is that the task group is created in a dependency with yield, that is converted by FastAPI to an async context manager, and the exit code is run after the response is done, so nothing ever "yields out" of the task group:
import random
from collections.abc import AsyncIterator
from typing import Annotated
import anyio
from anyio.abc import TaskGroup
from anyio.streams.memory import MemoryObjectSendStream
from fastapi import Depends, FastAPI
app = FastAPI()
async def fake_stream(url: str, tx: MemoryObjectSendStream[bytes]) -> None:
i = 0
while True:
i += 1
await tx.send(f"{url} message {i}".encode())
await anyio.sleep(1 + 1 * random.random())
async def task_group() -> AsyncIterator[TaskGroup]:
async with anyio.create_task_group() as tg:
yield tg
tg.cancel_scope.cancel()
@app.get("/news-and-weather")
async def news_and_weather(
tg: Annotated[TaskGroup, Depends(task_group, scope="request")],
) -> AsyncIterator[bytes]:
tx, rx = anyio.create_memory_object_stream[bytes]()
async with tx, rx:
tg.start_soon(fake_stream, "ws://example.com/news", tx.clone())
tg.start_soon(fake_stream, "ws://example.com/weather", tx.clone())
tx.close()
async for message in rx:
yield message
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you! I'm learning about all this.
Thanks for the Claude link, and the PEP, I just read it all.
I iterated a few times with LLMs, trying to find something that would work (in theory) but still keep the style I wanted to have in FastAPI.
Would this work?