Skip to main content

Best Practices

Comprehensive guide to FastAPI best practices covering project structure, code organization, performance, security, deployment, and production-ready patterns.

Project Structure

myapp/
├── app/
│ ├── __init__.py
│ ├── main.py # Application entry point
│ ├── config.py # Configuration management
│ ├── dependencies.py # Shared dependencies
│ │
│ ├── api/ # API routes
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── endpoints/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── users.py
│ │ │ │ └── items.py
│ │ │ └── api.py # API router
│ │ └── deps.py # API dependencies
│ │
│ ├── core/ # Core functionality
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── security.py
│ │ └── exceptions.py
│ │
│ ├── models/ # Database models
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ │
│ ├── schemas/ # Pydantic schemas
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ │
│ ├── services/ # Business logic
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ └── item_service.py
│ │
│ ├── repositories/ # Data access layer
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── user_repo.py
│ │ └── item_repo.py
│ │
│ └── utils/ # Utility functions
│ ├── __init__.py
│ └── helpers.py

├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_api/
│ ├── test_services/
│ └── test_repositories/

├── alembic/ # Database migrations
├── .env # Environment variables
├── .env.example # Example environment file
├── requirements.txt
├── pyproject.toml
└── README.md

Configuration Management

Using Pydantic Settings

from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
# Application
app_name: str = "My FastAPI App"
version: str = "1.0.0"
debug: bool = False

# Database
database_url: str

# Security
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30

# External Services
redis_url: str = "redis://localhost:6379"
smtp_host: str
smtp_port: int = 587

# API
api_v1_prefix: str = "/api/v1"
allowed_hosts: list[str] = ["*"]

class Config:
env_file = ".env"
case_sensitive = False

@lru_cache()
def get_settings():
return Settings()

Environment-Specific Configuration

from enum import Enum

class Environment(str, Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"

class Settings(BaseSettings):
environment: Environment = Environment.DEVELOPMENT

@property
def is_production(self) -> bool:
return self.environment == Environment.PRODUCTION

@property
def database_url(self) -> str:
if self.is_production:
return self.prod_database_url
return self.dev_database_url

Error Handling

Standardized Error Responses

from fastapi import HTTPException
from fastapi.responses import JSONResponse

class APIException(Exception):
def __init__(
self,
status_code: int,
detail: str,
error_code: str = None
):
self.status_code = status_code
self.detail = detail
self.error_code = error_code

@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.error_code,
"message": exc.detail,
"timestamp": datetime.utcnow().isoformat()
}
}
)

# Usage
raise APIException(
status_code=404,
detail="User not found",
error_code="USER_NOT_FOUND"
)

Global Exception Handler

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {exc}", exc_info=True)

return JSONResponse(
status_code=500,
content={
"error": {
"code": "INTERNAL_SERVER_ERROR",
"message": "An unexpected error occurred"
}
}
)

Logging

Structured Logging

import logging
from logging.handlers import RotatingFileHandler
import json

class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}

if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)

return json.dumps(log_data)

def setup_logging():
logger = logging.getLogger("app")
logger.setLevel(logging.INFO)

handler = RotatingFileHandler(
"app.log",
maxBytes=10485760, # 10MB
backupCount=5
)
handler.setFormatter(JSONFormatter())

logger.addHandler(handler)
return logger

logger = setup_logging()

@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(
"request_started",
extra={
"method": request.method,
"url": str(request.url),
"client": request.client.host
}
)

response = await call_next(request)

logger.info(
"request_completed",
extra={
"status_code": response.status_code,
"method": request.method,
"url": str(request.url)
}
)

return response

API Versioning

URL Path Versioning

from fastapi import APIRouter

api_v1 = APIRouter(prefix="/api/v1")
api_v2 = APIRouter(prefix="/api/v2")

@api_v1.get("/items/")
def read_items_v1():
return {"version": "1.0", "items": []}

@api_v2.get("/items/")
def read_items_v2():
return {"version": "2.0", "items": [], "metadata": {}}

app.include_router(api_v1)
app.include_router(api_v2)

Header Versioning

from fastapi import Header

@app.get("/items/")
def read_items(api_version: str = Header(default="1.0", alias="API-Version")):
if api_version == "2.0":
return {"version": "2.0", "items": [], "metadata": {}}
return {"version": "1.0", "items": []}

Database Best Practices

Repository Pattern

from typing import Generic, TypeVar, Type, List, Optional
from sqlalchemy.orm import Session
from pydantic import BaseModel

ModelType = TypeVar("ModelType")
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)

class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType], db: Session):
self.model = model
self.db = db

def get(self, id: int) -> Optional[ModelType]:
return self.db.query(self.model).filter(self.model.id == id).first()

def get_multi(self, skip: int = 0, limit: int = 100) -> List[ModelType]:
return self.db.query(self.model).offset(skip).limit(limit).all()

def create(self, obj_in: CreateSchemaType) -> ModelType:
obj_data = obj_in.dict()
db_obj = self.model(**obj_data)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj

def update(self, db_obj: ModelType, obj_in: UpdateSchemaType) -> ModelType:
update_data = obj_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
self.db.commit()
self.db.refresh(db_obj)
return db_obj

def delete(self, id: int) -> bool:
obj = self.db.query(self.model).get(id)
if obj:
self.db.delete(obj)
self.db.commit()
return True
return False

Transaction Management

from contextlib import contextmanager

@contextmanager
def transaction(db: Session):
try:
yield db
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()

# Usage
with transaction(db) as session:
user = create_user(session, user_data)
create_profile(session, user.id, profile_data)

Pagination

Standardized Pagination

from typing import Generic, TypeVar, List
from pydantic import BaseModel

T = TypeVar('T')

class PaginatedResponse(BaseModel, Generic[T]):
items: List[T]
total: int
page: int
page_size: int
total_pages: int

def paginate(
query,
page: int = 1,
page_size: int = 10,
schema: Type[T] = None
) -> PaginatedResponse[T]:
total = query.count()
items = query.offset((page - 1) * page_size).limit(page_size).all()

return PaginatedResponse(
items=[schema.from_orm(item) for item in items] if schema else items,
total=total,
page=page,
page_size=page_size,
total_pages=(total + page_size - 1) // page_size
)

@app.get("/items/", response_model=PaginatedResponse[ItemSchema])
def read_items(
page: int = 1,
page_size: int = 10,
db: Session = Depends(get_db)
):
query = db.query(Item)
return paginate(query, page, page_size, ItemSchema)

Validation

Custom Validators

from pydantic import BaseModel, validator, root_validator

class UserCreate(BaseModel):
username: str
email: str
password: str
password_confirm: str

@validator('username')
def username_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('Username must be alphanumeric')
return v

@validator('password')
def password_strength(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain a number')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain an uppercase letter')
return v

@root_validator
def passwords_match(cls, values):
password = values.get('password')
password_confirm = values.get('password_confirm')

if password != password_confirm:
raise ValueError('Passwords do not match')

return values

Caching

Redis Caching Decorator

import redis
import json
from functools import wraps

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cache(expire: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"

# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)

# Execute function
result = await func(*args, **kwargs)

# Store in cache
redis_client.setex(cache_key, expire, json.dumps(result))

return result
return wrapper
return decorator

@app.get("/expensive-operation/")
@cache(expire=3600)
async def expensive_operation(param: str):
# Expensive computation
result = perform_expensive_calculation(param)
return result

Monitoring and Health Checks

Health Check Endpoint

from sqlalchemy import text

@app.get("/health")
async def health_check(db: Session = Depends(get_db)):
try:
# Check database
db.execute(text("SELECT 1"))

# Check Redis
redis_client.ping()

return {
"status": "healthy",
"database": "ok",
"cache": "ok",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"error": str(e)
}
)

Metrics Endpoint

from prometheus_client import Counter, Histogram, generate_latest

request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)

request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
with request_duration.labels(
method=request.method,
endpoint=request.url.path
).time():
response = await call_next(request)

request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()

return response

@app.get("/metrics")
def metrics():
return Response(generate_latest(), media_type="text/plain")

Deployment

Production Configuration

# production.py
import multiprocessing

# Gunicorn configuration
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
bind = "0.0.0.0:8000"
keepalive = 120
errorlog = "-"
accesslog = "-"
loglevel = "info"

Docker Configuration

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY ./app /app

# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Expose port
EXPOSE 8000

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

version: '3.8'

services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/mydb
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis

db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=mydb
volumes:
- postgres_data:/var/lib/postgresql/data

redis:
image: redis:7-alpine
volumes:
- redis_data:/data

volumes:
postgres_data:
redis_data:

Security Checklist

  • Use HTTPS in production
  • Implement rate limiting
  • Validate all user input
  • Hash passwords with bcrypt
  • Use environment variables for secrets
  • Enable CORS properly
  • Add security headers
  • Implement proper authentication
  • Use parameterized queries
  • Keep dependencies updated
  • Implement request size limits
  • Add CSRF protection where needed
  • Use secure session management
  • Implement proper authorization
  • Log security events

Performance Optimization

Query Optimization

from sqlalchemy.orm import joinedload

# Bad: N+1 queries
users = db.query(User).all()
for user in users:
print(user.items) # Separate query for each user

# Good: Eager loading
users = db.query(User).options(joinedload(User.items)).all()
for user in users:
print(user.items) # No additional queries

Connection Pooling

engine = create_engine(
DATABASE_URL,
pool_size=20,
max_overflow=0,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)

Response Compression

from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000)

Testing Best Practices

# Use fixtures for common setup
@pytest.fixture
def test_user(db):
user = User(username="test", email="test@example.com")
db.add(user)
db.commit()
return user

# Test one thing per test
def test_user_creation(client):
response = client.post("/users/", json={"username": "test"})
assert response.status_code == 201

# Use parametrize for multiple scenarios
@pytest.mark.parametrize("status_code,expected", [
(200, "success"),
(404, "not found"),
])
def test_responses(client, status_code, expected):
# Test logic
pass

Documentation

Comprehensive Docstrings

@app.post("/users/", response_model=User, tags=["users"])
async def create_user(
user: UserCreate,
db: Session = Depends(get_db)
):
"""
Create a new user.

- **username**: unique username (3-50 characters)
- **email**: valid email address
- **password**: minimum 8 characters

Returns the created user object.
"""
return create_user_in_db(db, user)

Best Practices Summary

  1. Structure: Use clear, organized project structure
  2. Configuration: Use Pydantic Settings for configuration
  3. Error Handling: Implement standardized error responses
  4. Logging: Use structured logging
  5. Security: Follow security best practices
  6. Testing: Write comprehensive tests
  7. Documentation: Document your API thoroughly
  8. Performance: Optimize database queries and use caching
  9. Monitoring: Implement health checks and metrics
  10. Deployment: Use proper production configuration

Following these best practices will help you build maintainable, scalable, and production-ready FastAPI applications.