Performance
Async Programming
Async/Await Best Practices
# Good - truly async operation
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await get_user_from_db(user_id)
return user
# Bad - blocking operation in async function
@app.get("/users/{user_id}")
async def get_user_bad(user_id: int):
user = sync_get_user_from_db(user_id) # Blocks event loop
return user
# Good - use sync function for CPU-bound tasks
@app.get("/users/{user_id}")
def get_user_sync(user_id: int):
user = sync_get_user_from_db(user_id)
return user
Async Database Operations
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("sqlite+aiosqlite:///./test.db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_user_async(db: AsyncSession, user_id: int):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
user = await get_user_async(db, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
Caching
In-Memory Caching
from functools import lru_cache
import time
@lru_cache(maxsize=128)
def get_expensive_data(param: str):
# Simulate expensive operation
time.sleep(1)
return f"Processed data for {param}"
@app.get("/data/{param}")
async def get_data(param: str):
return {"result": get_expensive_data(param)}
Redis Caching
import redis
import json
from typing import Optional
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache_key(prefix: str, **kwargs) -> str:
return f"{prefix}:" + ":".join(f"{k}={v}" for k, v in kwargs.items())
async def get_cached_user(user_id: int) -> Optional[dict]:
key = cache_key("user", id=user_id)
cached = redis_client.get(key)
if cached:
return json.loads(cached)
return None
async def cache_user(user_id: int, user_data: dict, ttl: int = 3600):
key = cache_key("user", id=user_id)
redis_client.setex(key, ttl, json.dumps(user_data))
@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, db: Session = Depends(get_db)):
# Try cache first
cached_user = await get_cached_user(user_id)
if cached_user:
return cached_user
# Get from database
user = get_user(db, user_id)
if user:
user_data = UserResponse.from_orm(user).dict()
await cache_user(user_id, user_data)
return user_data
raise HTTPException(status_code=404, detail="User not found")
TTL Cache with asyncio
import asyncio
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
class TTLCache:
def __init__(self):
self._cache: Dict[str, Any] = {}
self._expiry: Dict[str, datetime] = {}
async def get(self, key: str) -> Optional[Any]:
if key in self._cache:
if datetime.utcnow() < self._expiry[key]:
return self._cache[key]
else:
del self._cache[key]
del self._expiry[key]
return None
async def set(self, key: str, value: Any, ttl: int = 3600):
self._cache[key] = value
self._expiry[key] = datetime.utcnow() + timedelta(seconds=ttl)
cache = TTLCache()
@app.get("/expensive-operation/{param}")
async def expensive_operation(param: str):
cached_result = await cache.get(f"operation:{param}")
if cached_result:
return cached_result
# Simulate expensive operation
await asyncio.sleep(1)
result = {"result": f"Processed {param}"}
await cache.set(f"operation:{param}", result, ttl=300)
return result
Database Optimization
Connection Pooling
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)
Query Optimization
# Bad - N+1 query problem
def get_users_with_items_bad(db: Session):
users = db.query(User).all()
for user in users:
user.items = db.query(Item).filter(Item.owner_id == user.id).all()
return users
# Good - use joins or eager loading
def get_users_with_items_good(db: Session):
return db.query(User).options(joinedload(User.items)).all()
# Good - use select_related equivalent
def get_users_with_items_optimized(db: Session):
return db.query(User).join(Item).all()
Pagination
from typing import List, Optional
def paginate_users(
db: Session,
page: int = 1,
per_page: int = 10,
search: Optional[str] = None
) -> dict:
query = db.query(User)
if search:
query = query.filter(User.username.contains(search))
total = query.count()
offset = (page - 1) * per_page
users = query.offset(offset).limit(per_page).all()
return {
"users": users,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page
}
@app.get("/users")
async def get_users_paginated(
page: int = 1,
per_page: int = 10,
search: Optional[str] = None,
db: Session = Depends(get_db)
):
return paginate_users(db, page, per_page, search)
Response Optimization
Gzip Compression
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
Response Streaming
from fastapi.responses import StreamingResponse
import io
def generate_large_csv():
yield "id,name,email\n"
for i in range(10000):
yield f"{i},User {i},user{i}@example.com\n"
@app.get("/large-csv")
async def get_large_csv():
return StreamingResponse(
generate_large_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"}
)
Partial Response
from typing import Set, Optional
def serialize_user(user: User, fields: Optional[Set[str]] = None) -> dict:
data = {
"id": user.id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"created_at": user.created_at.isoformat()
}
if fields:
return {k: v for k, v in data.items() if k in fields}
return data
@app.get("/users/{user_id}")
async def get_user_partial(
user_id: int,
fields: Optional[str] = None,
db: Session = Depends(get_db)
):
user = get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
field_set = set(fields.split(",")) if fields else None
return serialize_user(user, field_set)
Background Tasks
Celery Integration
from celery import Celery
celery_app = Celery(
"fastapi_app",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_data(data: dict):
# Heavy processing
time.sleep(10)
return {"processed": data}
@app.post("/process")
async def start_processing(data: dict):
task = process_data.delay(data)
return {"task_id": task.id}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
Load Testing
Locust Load Testing
# locustfile.py
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
# Login or setup
response = self.client.post("/token", data={
"username": "testuser",
"password": "testpass"
})
self.token = response.json()["access_token"]
self.headers = {"Authorization": f"Bearer {self.token}"}
@task(3)
def get_users(self):
self.client.get("/users/", headers=self.headers)
@task(1)
def create_user(self):
self.client.post("/users/", json={
"username": "newuser",
"email": "new@example.com",
"password": "password"
})
Performance Monitoring
import time
from fastapi import Request
@app.middleware("http")
async def add_performance_headers(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
# Log slow requests
if process_time > 1.0:
logger.warning(f"Slow request: {request.url} took {process_time:.2f}s")
return response
Memory Optimization
Memory Profiling
import tracemalloc
from fastapi import Request
tracemalloc.start()
@app.middleware("http")
async def memory_profiling(request: Request, call_next):
snapshot1 = tracemalloc.take_snapshot()
response = await call_next(request)
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
# Log memory usage for specific endpoints
if str(request.url).endswith('/memory-intensive'):
for stat in top_stats[:3]:
print(stat)
return response
Object Pooling
from typing import List, Generic, TypeVar
import asyncio
T = TypeVar('T')
class ObjectPool(Generic[T]):
def __init__(self, create_fn, max_size: int = 10):
self._create_fn = create_fn
self._pool: List[T] = []
self._max_size = max_size
self._lock = asyncio.Lock()
async def acquire(self) -> T:
async with self._lock:
if self._pool:
return self._pool.pop()
return self._create_fn()
async def release(self, obj: T):
async with self._lock:
if len(self._pool) < self._max_size:
self._pool.append(obj)
# Usage with database connections
db_pool = ObjectPool(lambda: create_db_connection(), max_size=20)
@app.get("/users/")
async def get_users_pooled():
db = await db_pool.acquire()
try:
users = await get_users_from_db(db)
return users
finally:
await db_pool.release(db)