Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save Eugene-Fed/cebc27744ded924897d41ea337f861c8 to your computer and use it in GitHub Desktop.

Select an option

Save Eugene-Fed/cebc27744ded924897d41ea337f861c8 to your computer and use it in GitHub Desktop.

Проблема асинхронного FastAPI

🔴 Пример блокировки Event Loop

from fastapi import FastAPI
import time  # Синхронная библиотека!

app = FastAPI()

@app.get("/blocking-endpoint")
async def blocking_endpoint():
    """Этот endpoint заблокирует весь event loop на 5 секунд!"""
    time.sleep(5)  # Синхронная блокирующая операция
    return {"message": "Запрос выполнен"}

💡 Почему это проблема?

  1. Когда вы вызываете time.sleep(5) внутри async def, весь event loop приостанавливается.
  2. Даже если другие пользователи отправляют запросы в это время, они не будут обрабатываться, пока sleep не завершится.
  3. Множество популярных библиотек на самом деле не поддерживает асинхронный режима работы, что приводит к неожиданным для разработчика последствиям.
  4. Это сводит на нет все преимущества от написания сложной логики асинхронного кода.

Скрытая многопоточность FastAPI

При выполнении синхронного FastAPI кода используется механизм Starlette (микрофреймворк на котором построен FastAPI), чтобы запускать синхронные обработчики ресурсов (endpoints) в отдельном потоке. Вот как это работает:


🔹 1. Синхронные endpoint-функции (def)

Если вы объявляете маршрут (rout) как обычную (синхронную) функцию (def), FastAPI автоматически запускает её в ThreadPoolExecutor (пуле потоков), чтобы не блокировать event loop.
Это делается прозрачно, без явного указания потоков.

Пример:

from fastapi import FastAPI
import time

app = FastAPI()

@app.get("/sync-route")  # ← Обычная синхронная функция!
def sync_endpoint():
    time.sleep(5)  # Блокирующая операция
    return {"message": "Это выполнится в отдельном потоке!"}

Что происходит:

  1. FastAPI видит, что функция sync_endpoint — синхронная.
  2. Автоматически отправляет её в ThreadPoolExecutor.
  3. Event loop продолжает работать, пока time.sleep(5) выполняется в фоне.
  4. Таким образом, обращение к синхронному ресурсу не блокирует обработку обращений к другим ресурсам

🔹 2. Где FastAPI не создает потоки автоматически?

  • Вне endpoint-функций:
    Если синхронный код вызывается внутри зависимостей (Depends), фоновых задач (BackgroundTasks), middleware, а также при обработке WebSocket автоматическое распределение по потокам не работает.

  • В async def endpoints при работе с синхронными библиотеками:
    Например, requests.get(), pandas.read_csv() или psycopg2 требуют ручного выноса в потоки при вызовах из асинхронных обработчиков.


📌 Сравните:

Ситуация Автоматические потоки? Решение
Синхронные endpoint (def) Да Ничего не делать
Асинхронные endpoint (async def) Нет asyncio.to_thread()
Синхронные зависимости Нет Ручной вынос в потоки
Фоновые задачи Нет BackgroundTasks + потоки

Важно помнить, что, FastAPI не создаёт новый поток при каждом обращении к синхронному маршруту. Вместо этого он использует пул потоков (ThreadPoolExecutor), где количество потоков ограничено. Вот как это работает:


🔹 Механизм работы

В FastAPI размер пула потоков по умолчанию зависит от используемого ASGI-сервера, так как сам FastAPI — это фреймворк, работающий поверх ASGI (например, Uvicorn или Hypercorn).

Если используется Uvicorn (по умолчанию):

  • Uvicorn использует пул потоков для выполнения синхронного (блокирующего) кода, но сам по себе обрабатывает асинхронные запросы в одном потоке (с event loop).
  • Размер пула потоков по умолчанию: 40 (начиная с версии uvicorn>=0.12.0).
  • Это можно изменить через параметр --limit-concurrency (или limit_concurrency в конфигурации).

Пример запуска с изменённым размером пула:

uvicorn app:app --workers 4 --limit-concurrency 100

Если используется Hypercorn:

  • Hypercorn также поддерживает пул потоков для синхронных операций.
  • По умолчанию: зависит от версии, но обычно 100 (можно уточнить в документации Hypercorn).

FastAPI без ASGI-сервера (теоретически):

  • FastAPI сам не управляет пулом потоков — это делает ASGI-сервер.
  • Если вы используете синхронные зависимости (например, def вместо async def), они будут выполняться в пуле потоков сервера.

Как проверить/изменить?

  • В Uvicorn можно задать параметр:
    import uvicorn
    
    uvicorn.run(
        "app:app",
        workers=4,
        limit_concurrency=50  # меняем лимит потоков
    )
  • В Hypercorn:
    from hypercorn.config import Config
    from hypercorn.asyncio import serve
    
    config = Config()
    config.worker_class = "threads"
    config.workers = 4
    config.threads = 50  # пул потоков на worker

Важно:

  • Если ваш код полностью асинхронный (async def), пул потоков не используется (работает в event loop).
  • Пул потоков нужен только для синхронного (блокирующего) кода (например, работа с requests, синхронными БД и т. д.).

Если вы используете стандартный uvicorn без настроек, то пул потоков = 40.

  1. Переиспользование потоков
    При каждом запросе к синхронному маршруту (def):

    • FastAPI берёт свободный поток из пула.
    • После завершения запроса поток возвращается в пул для повторного использования.
  2. Что происходит при перегрузке
    Если все потоки заняты, новые запросы ждут освобождения (не блокируя event loop).


🔹 Пример: Запросы к синхронному маршруту

from fastapi import FastAPI
import time, threading

app = FastAPI()

@app.get("/sync-route")
def sync_route():
    print(f"Текущий поток: {threading.current_thread().name}")
    time.sleep(2)
    return {"message": "Этот код выполняется в потоке из пула!"}

Результат при 10 запросах одновременно:

Текущий поток: ThreadPoolExecutor-0_0
Текущий поток: ThreadPoolExecutor-0_1
... (но не более чем _max_workers потоков!)

🔹 Ключевые особенности

  1. Нет создания потока на каждый запрос
    Потоки заранее созданы и переиспользуются — это экономит ресурсы.

  2. Ограничение на одновременные запросы
    Если все потоки заняты, запросы начинают конкурировать за ресурсы.
    Пример: При 5 потоках и 10 запросах — 5 выполнятся сразу, остальные 5 ждут.

  3. Глобальный пул для всех синхронных маршрутов
    Один и тот же пул используется для всех def-endpoints.


🔹 Как управлять пулом потоков?

  1. Увеличить число потоков (если нужно больше параллелизма):

    from concurrent.futures import ThreadPoolExecutor
    import asyncio
    
    custom_pool = ThreadPoolExecutor(max_workers=50)
    
    @app.get("/heavy-sync-route")
    async def heavy_route():
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(custom_pool, time.sleep, 2)
        return {"message": "Выполнено в кастомном пуле"}
  2. Отказаться от потоков для CPU-задач
    Для тяжелых вычислений лучше использовать ProcessPoolExecutor:

    from concurrent.futures import ProcessPoolExecutor
    
    with ProcessPoolExecutor() as process_pool:
        await loop.run_in_executor(process_pool, cpu_intensive_task)

📊 Сравнение с новым потоком на запрос

Подход Плюсы Минусы
Пул потоков (FastAPI) Экономит ресурсы, минимизирует накладные расходы Ограниченный параллелизм
Новый поток на запрос Неограниченная масштабируемость Риск исчерпания памяти, высокий overhead

⚠️ Когда быть осторожным?

  • Синхронные вызовы к БД (например, psycopg2):
    Даже с пулом потоков может стать узким местом. Лучше использовать асинхронные драйверы (asyncpg).

  • Долгие CPU-задачи:
    Потоки не помогут из-за GIL. Используйте процессы (ProcessPoolExecutor).


Итог: FastAPI использует пул потоков для синхронных маршрутов, а не создаёт поток на каждый запрос. Это баланс между производительностью и потреблением ресурсов. Для оптимизации можно настраивать размер пула или переходить на асинхронные альтернативы.

⚠️ Важные нюансы

  1. GIL (Global Interpreter Lock)
    Даже в потоках CPU-интенсивные операции (например, pandas.read_csv()) будут блокировать друг друга из-за GIL в Python. Для таких задач лучше использовать ProcessPoolExecutor.

  2. Лимит потоков
    Если все потоки заняты (например, при 100+ одновременных запросах к медленному API через requests), новые запросы будут ждать освобождения потока.

  3. Асинхронные аналоги эффективнее
    Для высоконагруженных проектов предпочтительны асинхронные библиотеки:

    • httpx.AsyncClient() вместо requests
    • asyncpg вместо psycopg2
    • aiofiles вместо open()

🔧 Как оптимизировать?

  1. Увеличить пул потоков (если нужно больше параллелизма):

    from concurrent.futures import ThreadPoolExecutor
    import asyncio
    
    custom_pool = ThreadPoolExecutor(max_workers=100)
    
    @app.get("/custom-thread-pool")
    async def custom_pool_route():
        await asyncio.get_event_loop().run_in_executor(
            custom_pool, 
            requests.get, 
            "https://example.com"
        )
        return {"message": "Выполнено в кастомном пуле"}
  2. Заменить синхронные библиотеки на асинхронные (настоятельно рекомендуется):

    import httpx
    
    @app.get("/async-request")
    async def async_request():
        async with httpx.AsyncClient() as client:
            response = await client.get("https://example.com")  # Не блокирует loop!
        return {"status": response.status_code}

📊 Итоговая таблица

Ситуация Поток создаётся? Риски Решение
Синхронная библиотека в def-endpoint ✅ Да (автоматически) Исчерпание пула потоков Увеличить пул или перейти на async
Синхронная библиотека в async def-endpoint ❌ Нет Блокировка event loop asyncio.to_thread() или async-аналоги
CPU-интенсивные операции ✅ Да (но GIL мешает) Низкая производительность ProcessPoolExecutor

Вывод: FastAPI действительно создаёт потоки для синхронных библиотек в def-маршрутах, но это не серебряная пуля — для сложных сценариев лучше использовать асинхронные решения.

Рекомендация:
Для CPU-задач лучше использовать ProcessPoolExecutor, а для I/O — асинхронные библиотеки (httpx вместо requests, asyncpg вместо psycopg2).
Сверьтесь со списком популярных синхронных библиотек и подберите асинхронные аналоги, и не забудьте про вынос в потоки или отдельные процессы, если альтернативы нет.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment