- Начальный уровень
- Продвинутый уровень
- Экспертный уровень
- Найти ошибку в коде -- начальный
- Найти ошибку в коде -- продвинутый
- Найти ошибку в коде -- экспертный
- Задание: Напишите эндпоинт
GET /hello, который возвращает{"message": "Hello, World!"}. - Проверяемые знания: Базовое создание роута, работа с декораторами, возврат JSON.
- Задание: Создайте эндпоинт
GET /items/{item_id}, который принимаетitem_id(путь) иq(query-параметр, опциональный). Возвращает JSON с этими значениями. - Проверяемые знания: Path-параметры, query-параметры, валидация.
- Задание: Сделайте эндпоинт
POST /items/, который принимает JSON с полямиname(обязательный, строка) иprice(обязательный, положительное число). Используйте Pydantic для валидации. - Проверяемые знания: Pydantic модели, валидация, POST-запросы.
- Задание: Создайте кастомный обработчик ошибки для
HTTP 404, который возвращает JSON{"error": "Not Found", "detail": "Item not found"}. - Проверяемые знания: Кастомные исключения, обработчики ошибок.
- Задание: Реализуйте dependency
get_db, который "подключается" к БД (можно имитировать) и возвращает соединение. Используйте его в эндпоинтеGET /users/. - Проверяемые знания: DI (Dependency Injection), работа с зависимостями.
- Задание: Напишите асинхронный эндпоинт
GET /slow/, который ждет 1 секунду (черезasyncio.sleep), затем возвращает{"status": "ok"}. - Проверяемые знания: Асинхронность,
async/await.
- Задание: Сделайте эндпоинт
POST /upload/, который принимает файл и возвращает его размер в байтах. - Проверяемые знания: Работа с файлами (
UploadFile), FormData.
- Задание: Добавьте middleware, который добавляет заголовок
X-Process-Timeс временем обработки запроса в миллисекундах. - Проверяемые знания: Middleware, время выполнения запроса.
- Задание: Реализуйте эндпоинт
/login, который принимает username/password и возвращает JWT-токен. Затем сделайте защищенный эндпоинт/me, требующий этот токен. - Проверяемые знания: JWT, OAuth2, безопасность.
- Задание: Напишите тест (с помощью
pytest) для эндпоинтаGET /hello, проверяющий статус-код и ответ. - Проверяемые знания: Тестирование FastAPI (TestClient), pytest.
- Фоновые задачи: Создайте эндпоинт, который запускает фоновую задачу (например, пишет в лог через 5 секунд).
- WebSockets: Реализуйте простой WebSocket-эндпоинт (
/ws), который возвращает клиенту его же сообщения.
- Задание: Создайте модель
UserCreate, где:- Пароль (
password) должен содержать минимум 8 символов, включая цифру и спецсимвол. - Email проверяется через регулярное выражение.
- Пароль (
- Цель: Реализация кастомных валидаторов с использованием
@validatorв Pydantic.
- Задание:
- Создайте dependency
get_db, который открывает транзакцию. - Реализуйте эндпоинт
GET /users/{user_id}/posts, который возвращает пользователя и его посты в одном запросе к БД (без N+1 проблемы).
- Создайте dependency
- Проверяемые знания: SQLAlchemy (или другой ORM), eager loading, транзакции.
- Задание: Настройте лимит в 5 запросов в минуту для эндпоинта
POST /comments/с помощью:- Redis для хранения счетчика.
- Кастомного middleware или dependency.
- Цель: Работа с внешними сервисами, асинхронные вызовы.
- Задание: Динамически добавьте в OpenAPI схему кастомный параметр
X-API-Versionдля всех эндпоинтов. - Проверяемые знания: Кастомизация OpenAPI, доступ к
app.openapi().
- Задание:
- Создайте эндпоинт
POST /report/, который запускает фоновую задачу в Celery на генерацию отчета. - Эндпоинт
GET /report/{task_id}должен возвращать статус задачи.
- Создайте эндпоинт
- Цель: Интеграция FastAPI с Celery, работа с
AsyncResult.
- Задание: Реализуйте OAuth2 авторизацию, где:
- Токен выдается только пользователям с ролью
admin. - В токене должен быть
user_idиscopes.
- Токен выдается только пользователям с ролью
- Проверяемые знания:
OAuth2PasswordBearer, JWT, кастомные зависимости.
- Задание: Напишите тест для эндпоинта
GET /weather/{city}, который:- Мокает запрос к внешнему API (например, OpenWeatherMap).
- Проверяет обработку ошибки 404.
- Цель:
pytest,pytest-mock,httpx.AsyncClient.
- Задание: Создайте WebSocket (
/chat/{room_id}), где:- Сообщения от одного пользователя рассылаются всем в комнате.
- История сообщений хранится в Redis.
- Проверяемые знания:
WebSocket, бродкастинг, работа с Redis.
- Задание: Интегрируйте GraphQL через Strawberry:
- Создайте тип
Bookс полямиid,title,author. - Реализуйте query
getBooksи mutationaddBook.
- Создайте тип
- Цель: GraphQL, интеграция сторонних библиотек.
- Задание: Для эндпоинта
GET /products/добавьте заголовокETagна основе хеша данных.- Если
ETagсовпадает сIf-None-Match, верните304 Not Modified.
- Если
- Проверяемые знания: HTTP-кеширование, кастомные заголовки.
- gRPC + FastAPI: Создайте gRPC сервис для проверки орфографии, который вызывается из FastAPI.
- Server-Sent Events (SSE): Реализуйте эндпоинт
/stream, который отправляет события каждые 5 секунд.
Задача:
- Реализуйте GraphQL API (через Strawberry или Ariadne) с типами
UserиPost. - Используйте
DataLoaderдля решения проблемы N+1 при запросе списка пользователей с их постами.
Что проверяем: - Понимание проблем N+1 в GraphQL.
- Умение работать с асинхронными DataLoader.
Задача:
- Создайте middleware, который:
- Блокирует запросы с подозрительными User-Agent (например,
curlилиPostman). - Добавляет кастомный заголовок
X-Request-Source.
Что проверяем:
- Блокирует запросы с подозрительными User-Agent (например,
- Глубокое знание middleware и работы с заголовками.
- Безопасность и защиту от скрейпинга.
Задача:
- Реализуйте административный эндпоинт
POST /admin/toggle_route, который:- Динамически включает/выключает другие эндпоинты (например,
/api/status).
Что проверяем:
- Динамически включает/выключает другие эндпоинты (например,
- Работу с
app.routes, динамическое изменение API. - Паттерн Feature Toggle.
Задача:
- Создайте эндпоинт
POST /events/, который отправляет событие в Kafka. - Реализуйте consumer в фоне (через
asyncio), который обрабатывает эти события и пишет их в БД.
Что проверяем: - Интеграцию с брокерами сообщений (Kafka/RabbitMQ).
- Асинхронную обработку событий.
Задача:
- Реализуйте систему, где каждый клиент (tenant) использует разные БД.
- Автоматически определяйте tenant из заголовка
X-Tenant-IDи подключайте соответствующую БД.
Что проверяем: - Dependency Injection для мультитенантности.
- Динамическое подключение к разным БД (SQLAlchemy/asyncpg).
Задача:
- Генерируйте OpenAPI-схему так, чтобы:
- Эндпоинты группировались по кастомным тегам (например,
LegacyиV2). - Добавлялось описание для каждого тега.
Что проверяем:
- Эндпоинты группировались по кастомным тегам (например,
- Кастомизацию OpenAPI, работу с
app.openapi_schema.
Задача:
- Реализуйте эндпоинт
POST /orders/, который:- Создает заказ в БД.
- Резервирует товар на складе (через HTTP-вызов).
- Если склад недоступен — откатывает заказ.
Что проверяем:
- Паттерн Saga для распределённых систем.
- Обработку ошибок и компенсирующие транзакции.
Задача:
- Создайте эндпоинт
GET /notifications/, который:- Подключается к Redis Pub/Sub.
- Отправляет клиенту уведомления в реальном времени через SSE.
Что проверяем:
- Работу с Server-Sent Events.
- Интеграцию с Redis для real-time уведомлений.
Задача:
- Создайте gRPC-сервис для проверки текста на спам.
- Интегрируйте его в FastAPI через
grpclib. - Добавьте кеширование результатов в Redis.
Что проверяем: - Работу с gRPC и protobuf.
- Паттерн API Gateway.
Задача:
- Вынесите CPU-bound логику (например, хеширование паролей) в Rust (через
pyo3) или Cython. - Интегрируйте модуль в FastAPI.
Что проверяем: - Умение оптимизировать критические участки кода.
- Работу с FFI (Foreign Function Interface).
- Реализация JIT-компиляции эндпоинтов: Динамически генерируйте эндпоинты на основе конфигурации в runtime.
- Своя система плагинов: FastAPI-приложение, которое подгружает эндпоинты из внешних Python-модулей.
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
def read_item(item_id):
return {"item_id": item_id}from fastapi import FastAPI
app = FastAPI()
@app.get("/search")
def search(query):
return {"query": query}from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
@app.post("/items/")
def create_item(item: Item):
return itemfrom fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/async")
async def async_task():
await asyncio.sleep(2)
return {"message": "Task completed"}from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload")
def upload_file(file: UploadFile = File(...)):
return {"filename": file.filename}from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(message + "\n")
@app.post("/send-email")
def send_email(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "Email sent")
return {"message": "Email sent in the background"}from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
response = await call_next(request)
response.headers["X-Process-Time"] = "10ms"
return responsefrom fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET"],
)from fastapi import FastAPI, Query
app = FastAPI()
@app.get("/items")
def get_items(page: int = Query(1), limit: int = Query(10)):
items = [{"id": i} for i in range((page - 1) * limit, page * limit)]
return {"items": items}from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
app = FastAPI()
DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@app.get("/users/{user_id}")
def get_user(user_id: int):
db = SessionLocal()
user = db.execute("SELECT * FROM users WHERE id = :id", {"id": user_id}).fetchone()
return userfrom fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def long_task():
time.sleep(10)
@app.post("/task")
def run_task(background_tasks: BackgroundTasks):
background_tasks.add_task(long_task)
return {"message": "Task started"}from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
app = FastAPI()
@app.get("/stream")
def stream_data():
def generate():
for i in range(10):
time.sleep(1)
yield f"data: {i}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")from fastapi import FastAPI
import redis
app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6379, db=0)
@app.get("/cache/{key}")
def get_cache(key: str):
value = redis_client.get(key)
return {"key": key, "value": value}from fastapi import FastAPI, WebSocket
import redis
app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6379, db=0)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
pubsub = redis_client.pubsub()
pubsub.subscribe("channel")
for message in pubsub.listen():
await websocket.send_text(message["data"])from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "mysecret"
@app.get("/protected")
def protected_route(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return {"user_id": payload["user_id"]}
except:
raise HTTPException(status_code=401, detail="Invalid token")from fastapi import FastAPI
from strawberry.fastapi import GraphQLRouter
import strawberry
@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "Hello, world!"
schema = strawberry.Schema(query=Query)
graphql_app = GraphQLRouter(schema)
app = FastAPI()
app.include_router(graphql_app, prefix="/graphql")from fastapi import FastAPI
app = FastAPI()
@app.get("/items")
def get_items():
# Логика получения данных
return {"items": []}from fastapi import FastAPI
import requests
app = FastAPI()
@app.get("/weather")
def get_weather(city: str):
response = requests.get(f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=your_api_key")
return response.json()from fastapi import FastAPI
from kafka import KafkaProducer
app = FastAPI()
producer = KafkaProducer(bootstrap_servers="localhost:9092")
@app.post("/events")
def send_event(event: dict):
producer.send("topic", value=event)
return {"message": "Event sent"}from fastapi import FastAPI, UploadFile
import pandas as pd
app = FastAPI()
@app.post("/process-file")
async def process_file(file: UploadFile):
df = pd.read_csv(file.file)
result = df.groupby("category").sum()
return {"summary": result.to_dict()}from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "mysecret"
@app.get("/protected")
def protected_route(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return {"user_id": payload["user_id"]}
except:
raise HTTPException(status_code=401, detail="Invalid token")from fastapi import FastAPI
from kafka import KafkaProducer
app = FastAPI()
producer = KafkaProducer(bootstrap_servers="localhost:9092")
@app.post("/events")
def send_event(event: dict):
producer.send("topic", value=event)
return {"message": "Event sent"}from fastapi import FastAPI, WebSocket
import redis
app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6379, db=0)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
pubsub = redis_client.pubsub()
pubsub.subscribe("channel")
for message in pubsub.listen():
await websocket.send_text(message["data"])from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
@app.get("/stream")
async def stream_data():
async def generate():
for i in range(10):
await asyncio.sleep(1)
yield f"data: {i}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
app = FastAPI()
DATABASE_URL = "postgresql://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@app.get("/users/{user_id}")
def get_user(user_id: int):
db = SessionLocal()
user = db.execute("SELECT * FROM users WHERE id = :id", {"id": user_id}).fetchone()
return user
@app.post("/users")
def create_user(name: str, email: str):
db = SessionLocal()
db.execute("INSERT INTO users (name, email) VALUES (:name, :email)", {"name": name, "email": email})
db.commit()
return {"message": "User created"}# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
spec:
replicas: 3
selector:
matchLabels:
app: fastapi
template:
metadata:
labels:
app: fastapi
spec:
containers:
- name: fastapi
image: my-fastapi-app:latest
ports:
- containerPort: 8000from fastapi import FastAPI
from strawberry.fastapi import GraphQLRouter
import strawberry
@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "Hello, world!"
schema = strawberry.Schema(query=Query)
graphql_app = GraphQLRouter(schema)
app = FastAPI()
app.include_router(graphql_app, prefix="/graphql")from fastapi import FastAPI
from httpx import AsyncClient
app = FastAPI()
@app.get("/users/{user_id}/orders")
async def get_user_orders(user_id: int):
async with AsyncClient() as client:
response = await client.get(f"http://order-service/orders?user_id={user_id}")
return response.json()from fastapi import FastAPI
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/limited")
@limiter.limit("5/minute")
def limited_route():
return {"message": "This route is rate-limited"}