Skip to main content

Performance

Async Programming

Async/Await Best Practices

# Good - truly async operation
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await get_user_from_db(user_id)
return user

# Bad - blocking operation in async function
@app.get("/users/{user_id}")
async def get_user_bad(user_id: int):
user = sync_get_user_from_db(user_id) # Blocks event loop
return user

# Good - use sync function for CPU-bound tasks
@app.get("/users/{user_id}")
def get_user_sync(user_id: int):
user = sync_get_user_from_db(user_id)
return user

Async Database Operations

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

engine = create_async_engine("sqlite+aiosqlite:///./test.db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_user_async(db: AsyncSession, user_id: int):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()

@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
user = await get_user_async(db, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user

Caching

In-Memory Caching

from functools import lru_cache
import time

@lru_cache(maxsize=128)
def get_expensive_data(param: str):
# Simulate expensive operation
time.sleep(1)
return f"Processed data for {param}"

@app.get("/data/{param}")
async def get_data(param: str):
return {"result": get_expensive_data(param)}

Redis Caching

import redis
import json
from typing import Optional

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cache_key(prefix: str, **kwargs) -> str:
return f"{prefix}:" + ":".join(f"{k}={v}" for k, v in kwargs.items())

async def get_cached_user(user_id: int) -> Optional[dict]:
key = cache_key("user", id=user_id)
cached = redis_client.get(key)
if cached:
return json.loads(cached)
return None

async def cache_user(user_id: int, user_data: dict, ttl: int = 3600):
key = cache_key("user", id=user_id)
redis_client.setex(key, ttl, json.dumps(user_data))

@app.get("/users/{user_id}")
async def get_user_cached(user_id: int, db: Session = Depends(get_db)):
# Try cache first
cached_user = await get_cached_user(user_id)
if cached_user:
return cached_user

# Get from database
user = get_user(db, user_id)
if user:
user_data = UserResponse.from_orm(user).dict()
await cache_user(user_id, user_data)
return user_data

raise HTTPException(status_code=404, detail="User not found")

TTL Cache with asyncio

import asyncio
from typing import Dict, Any, Optional
from datetime import datetime, timedelta

class TTLCache:
def __init__(self):
self._cache: Dict[str, Any] = {}
self._expiry: Dict[str, datetime] = {}

async def get(self, key: str) -> Optional[Any]:
if key in self._cache:
if datetime.utcnow() < self._expiry[key]:
return self._cache[key]
else:
del self._cache[key]
del self._expiry[key]
return None

async def set(self, key: str, value: Any, ttl: int = 3600):
self._cache[key] = value
self._expiry[key] = datetime.utcnow() + timedelta(seconds=ttl)

cache = TTLCache()

@app.get("/expensive-operation/{param}")
async def expensive_operation(param: str):
cached_result = await cache.get(f"operation:{param}")
if cached_result:
return cached_result

# Simulate expensive operation
await asyncio.sleep(1)
result = {"result": f"Processed {param}"}

await cache.set(f"operation:{param}", result, ttl=300)
return result

Database Optimization

Connection Pooling

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)

Query Optimization

# Bad - N+1 query problem
def get_users_with_items_bad(db: Session):
users = db.query(User).all()
for user in users:
user.items = db.query(Item).filter(Item.owner_id == user.id).all()
return users

# Good - use joins or eager loading
def get_users_with_items_good(db: Session):
return db.query(User).options(joinedload(User.items)).all()

# Good - use select_related equivalent
def get_users_with_items_optimized(db: Session):
return db.query(User).join(Item).all()

Pagination

from typing import List, Optional

def paginate_users(
db: Session,
page: int = 1,
per_page: int = 10,
search: Optional[str] = None
) -> dict:
query = db.query(User)

if search:
query = query.filter(User.username.contains(search))

total = query.count()
offset = (page - 1) * per_page
users = query.offset(offset).limit(per_page).all()

return {
"users": users,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page
}

@app.get("/users")
async def get_users_paginated(
page: int = 1,
per_page: int = 10,
search: Optional[str] = None,
db: Session = Depends(get_db)
):
return paginate_users(db, page, per_page, search)

Response Optimization

Gzip Compression

from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000)

Response Streaming

from fastapi.responses import StreamingResponse
import io

def generate_large_csv():
yield "id,name,email\n"
for i in range(10000):
yield f"{i},User {i},user{i}@example.com\n"

@app.get("/large-csv")
async def get_large_csv():
return StreamingResponse(
generate_large_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"}
)

Partial Response

from typing import Set, Optional

def serialize_user(user: User, fields: Optional[Set[str]] = None) -> dict:
data = {
"id": user.id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"created_at": user.created_at.isoformat()
}

if fields:
return {k: v for k, v in data.items() if k in fields}
return data

@app.get("/users/{user_id}")
async def get_user_partial(
user_id: int,
fields: Optional[str] = None,
db: Session = Depends(get_db)
):
user = get_user(db, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")

field_set = set(fields.split(",")) if fields else None
return serialize_user(user, field_set)

Background Tasks

Celery Integration

from celery import Celery

celery_app = Celery(
"fastapi_app",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)

@celery_app.task
def process_data(data: dict):
# Heavy processing
time.sleep(10)
return {"processed": data}

@app.post("/process")
async def start_processing(data: dict):
task = process_data.delay(data)
return {"task_id": task.id}

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}

Load Testing

Locust Load Testing

# locustfile.py
from locust import HttpUser, task, between

class APIUser(HttpUser):
wait_time = between(1, 3)

def on_start(self):
# Login or setup
response = self.client.post("/token", data={
"username": "testuser",
"password": "testpass"
})
self.token = response.json()["access_token"]
self.headers = {"Authorization": f"Bearer {self.token}"}

@task(3)
def get_users(self):
self.client.get("/users/", headers=self.headers)

@task(1)
def create_user(self):
self.client.post("/users/", json={
"username": "newuser",
"email": "new@example.com",
"password": "password"
})

Performance Monitoring

import time
from fastapi import Request

@app.middleware("http")
async def add_performance_headers(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time

response.headers["X-Process-Time"] = str(process_time)

# Log slow requests
if process_time > 1.0:
logger.warning(f"Slow request: {request.url} took {process_time:.2f}s")

return response

Memory Optimization

Memory Profiling

import tracemalloc
from fastapi import Request

tracemalloc.start()

@app.middleware("http")
async def memory_profiling(request: Request, call_next):
snapshot1 = tracemalloc.take_snapshot()
response = await call_next(request)
snapshot2 = tracemalloc.take_snapshot()

top_stats = snapshot2.compare_to(snapshot1, 'lineno')

# Log memory usage for specific endpoints
if str(request.url).endswith('/memory-intensive'):
for stat in top_stats[:3]:
print(stat)

return response

Object Pooling

from typing import List, Generic, TypeVar
import asyncio

T = TypeVar('T')

class ObjectPool(Generic[T]):
def __init__(self, create_fn, max_size: int = 10):
self._create_fn = create_fn
self._pool: List[T] = []
self._max_size = max_size
self._lock = asyncio.Lock()

async def acquire(self) -> T:
async with self._lock:
if self._pool:
return self._pool.pop()
return self._create_fn()

async def release(self, obj: T):
async with self._lock:
if len(self._pool) < self._max_size:
self._pool.append(obj)

# Usage with database connections
db_pool = ObjectPool(lambda: create_db_connection(), max_size=20)

@app.get("/users/")
async def get_users_pooled():
db = await db_pool.acquire()
try:
users = await get_users_from_db(db)
return users
finally:
await db_pool.release(db)