-
-
Save msdrigg/02c7716d6e2a0cb4e5ef08d14f180119 to your computer and use it in GitHub Desktop.
| import asyncio | |
| from contextlib import asynccontextmanager | |
| import functools | |
| from typing import Annotated, Any, AsyncContextManager, Awaitable, Callable | |
| from anyio import create_task_group | |
| from anyio.abc import TaskGroup | |
| from fastapi import Request, FastAPI | |
| from fastapi.params import Depends | |
| # Creating this gist for discussion here https://github.com/tiangolo/fastapi/discussions/8805 | |
| # Note: using these methods will likely discard request body (because they call receive and discard the result) | |
| app = FastAPI() | |
| async def wait_for_disconnect(request: Request) -> True: | |
| _receive = request._receive | |
| while (await _receive())["type"] != "http.disconnect": | |
| pass | |
| return True | |
| async def cancellation( | |
| request: Request, | |
| ): | |
| event = asyncio.Event() | |
| async def set_event_on_disconnect(): | |
| await wait_for_disconnect(request) | |
| event.set() | |
| async with asyncio.TaskGroup() as tg: | |
| disconnect_task = tg.create_task(set_event_on_disconnect()) | |
| yield event | |
| disconnect_task.cancel() | |
| CancellationEvent = Annotated[asyncio.Event, Depends(cancellation)] | |
| @asynccontextmanager | |
| async def create_request_task_group(request: Request): | |
| async def cancel_on_disconnect(): | |
| await wait_for_disconnect(request) | |
| raise asyncio.CancelledError() | |
| async with create_task_group() as outer_tg: | |
| outer_tg.start_soon(cancel_on_disconnect) | |
| async with create_task_group() as tg: | |
| yield tg | |
| outer_tg.cancel_scope.cancel() | |
| async def request_task_group(request: Request): | |
| return functools.partial(create_request_task_group, request) | |
| RequestTaskGroup = Annotated[ | |
| Callable[[], AsyncContextManager[TaskGroup]], Depends(request_task_group) | |
| ] | |
| async def huge_work(): | |
| print("Starting work") | |
| await asyncio.sleep(5) | |
| print("Work completed!") | |
| @app.get("/cancel_tg_dependency") | |
| async def cancel_tg_dependency( | |
| get_task_group: RequestTaskGroup, | |
| ): | |
| # This task group will be cancelled if the client disconnects | |
| # before it exits | |
| async with get_task_group() as tg: | |
| tg.start_soon(huge_work) | |
| return {"message": "Done"} | |
| @app.get("/cancel_tg") | |
| async def cancel_tg(request: Request): | |
| async with create_request_task_group(request) as tg: | |
| tg.start_soon(huge_work) | |
| return {"message": "Done"} | |
| @app.get("/cancel_event") | |
| async def cancel_event(event: CancellationEvent): | |
| async with create_task_group() as tg: | |
| async def cancel_after_completion(func: Awaitable[Any]): | |
| await func() | |
| tg.cancel_scope.cancel() | |
| tg.start_soon(cancel_after_completion, huge_work) | |
| tg.start_soon(cancel_after_completion, event.wait) | |
| return {"message": "Done"} |
I did testing with python 3.11 and latest versions of fastapi. I would also make sure your uvicorn is the latest version.
To test the actual code, I would recommend navigating to one of the test routes in your browser, and then closing the tab before the task completes. You can check the logs to see that the print("Work completed!") never prints
Thanks, I confirm it works with Python 3.11 and latest versions of FastAPI and Uvicorn:
pip freeze
annotated-types==0.5.0
anyio==3.7.1
click==8.1.7
fastapi==0.103.1
h11==0.14.0
idna==3.4
pydantic==2.3.0
pydantic_core==2.6.3
sniffio==1.3.0
starlette==0.27.0
typing_extensions==4.7.1
uvicorn==0.23.2
I've made a Gist with comments and a way to handle input and output for the task : https://gist.github.com/benoit-cty/01a4dd1e81c6ded86395c760490f3b73
Thank you again for this code, it's really great to be able to save computation resources !
Glad it will help you!
This didn't work unfortunately - same code, virtual environment replicated with the above requirements file.
- No errors btw
Thank you very much for this code !
Can you explain how to test it ?
I try :
uvicorn fastapi_cancel_test:app --reloadThe code raise an error:
So I comment all the
cancel_eventmethod, then the code run and I try: