Best Practices
Comprehensive guide to FastAPI best practices covering project structure, code organization, performance, security, deployment, and production-ready patterns.
Project Structure
Recommended Structure
myapp/
├── app/
│ ├── __init__.py
│ ├── main.py # Application entry point
│ ├── config.py # Configuration management
│ ├── dependencies.py # Shared dependencies
│ │
│ ├── api/ # API routes
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── endpoints/
│ │ │ │ ├── __init__.py
│ │ │ │ ├── users.py
│ │ │ │ └── items.py
│ │ │ └── api.py # API router
│ │ └── deps.py # API dependencies
│ │
│ ├── core/ # Core functionality
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── security.py
│ │ └── exceptions.py
│ │
│ ├── models/ # Database models
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ │
│ ├── schemas/ # Pydantic schemas
│ │ ├── __init__.py
│ │ ├── user.py
│ │ └── item.py
│ │
│ ├── services/ # Business logic
│ │ ├── __init__.py
│ │ ├── user_service.py
│ │ └── item_service.py
│ │
│ ├── repositories/ # Data access layer
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── user_repo.py
│ │ └── item_repo.py
│ │
│ └── utils/ # Utility functions
│ ├── __init__.py
│ └── helpers.py
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_api/
│ ├── test_services/
│ └── test_repositories/
│
├── alembic/ # Database migrations
├── .env # Environment variables
├── .env.example # Example environment file
├── requirements.txt
├── pyproject.toml
└── README.md
Configuration Management
Using Pydantic Settings
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Application
app_name: str = "My FastAPI App"
version: str = "1.0.0"
debug: bool = False
# Database
database_url: str
# Security
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# External Services
redis_url: str = "redis://localhost:6379"
smtp_host: str
smtp_port: int = 587
# API
api_v1_prefix: str = "/api/v1"
allowed_hosts: list[str] = ["*"]
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings():
return Settings()
Environment-Specific Configuration
from enum import Enum
class Environment(str, Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class Settings(BaseSettings):
environment: Environment = Environment.DEVELOPMENT
@property
def is_production(self) -> bool:
return self.environment == Environment.PRODUCTION
@property
def database_url(self) -> str:
if self.is_production:
return self.prod_database_url
return self.dev_database_url
Error Handling
Standardized Error Responses
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class APIException(Exception):
def __init__(
self,
status_code: int,
detail: str,
error_code: str = None
):
self.status_code = status_code
self.detail = detail
self.error_code = error_code
@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.error_code,
"message": exc.detail,
"timestamp": datetime.utcnow().isoformat()
}
}
)
# Usage
raise APIException(
status_code=404,
detail="User not found",
error_code="USER_NOT_FOUND"
)
Global Exception Handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": {
"code": "INTERNAL_SERVER_ERROR",
"message": "An unexpected error occurred"
}
}
)
Logging
Structured Logging
import logging
from logging.handlers import RotatingFileHandler
import json
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
def setup_logging():
logger = logging.getLogger("app")
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(
"app.log",
maxBytes=10485760, # 10MB
backupCount=5
)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
return logger
logger = setup_logging()
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(
"request_started",
extra={
"method": request.method,
"url": str(request.url),
"client": request.client.host
}
)
response = await call_next(request)
logger.info(
"request_completed",
extra={
"status_code": response.status_code,
"method": request.method,
"url": str(request.url)
}
)
return response
API Versioning
URL Path Versioning
from fastapi import APIRouter
api_v1 = APIRouter(prefix="/api/v1")
api_v2 = APIRouter(prefix="/api/v2")
@api_v1.get("/items/")
def read_items_v1():
return {"version": "1.0", "items": []}
@api_v2.get("/items/")
def read_items_v2():
return {"version": "2.0", "items": [], "metadata": {}}
app.include_router(api_v1)
app.include_router(api_v2)
Header Versioning
from fastapi import Header
@app.get("/items/")
def read_items(api_version: str = Header(default="1.0", alias="API-Version")):
if api_version == "2.0":
return {"version": "2.0", "items": [], "metadata": {}}
return {"version": "1.0", "items": []}
Database Best Practices
Repository Pattern
from typing import Generic, TypeVar, Type, List, Optional
from sqlalchemy.orm import Session
from pydantic import BaseModel
ModelType = TypeVar("ModelType")
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType], db: Session):
self.model = model
self.db = db
def get(self, id: int) -> Optional[ModelType]:
return self.db.query(self.model).filter(self.model.id == id).first()
def get_multi(self, skip: int = 0, limit: int = 100) -> List[ModelType]:
return self.db.query(self.model).offset(skip).limit(limit).all()
def create(self, obj_in: CreateSchemaType) -> ModelType:
obj_data = obj_in.dict()
db_obj = self.model(**obj_data)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def update(self, db_obj: ModelType, obj_in: UpdateSchemaType) -> ModelType:
update_data = obj_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def delete(self, id: int) -> bool:
obj = self.db.query(self.model).get(id)
if obj:
self.db.delete(obj)
self.db.commit()
return True
return False
Transaction Management
from contextlib import contextmanager
@contextmanager
def transaction(db: Session):
try:
yield db
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()
# Usage
with transaction(db) as session:
user = create_user(session, user_data)
create_profile(session, user.id, profile_data)
Pagination
Standardized Pagination
from typing import Generic, TypeVar, List
from pydantic import BaseModel
T = TypeVar('T')
class PaginatedResponse(BaseModel, Generic[T]):
items: List[T]
total: int
page: int
page_size: int
total_pages: int
def paginate(
query,
page: int = 1,
page_size: int = 10,
schema: Type[T] = None
) -> PaginatedResponse[T]:
total = query.count()
items = query.offset((page - 1) * page_size).limit(page_size).all()
return PaginatedResponse(
items=[schema.from_orm(item) for item in items] if schema else items,
total=total,
page=page,
page_size=page_size,
total_pages=(total + page_size - 1) // page_size
)
@app.get("/items/", response_model=PaginatedResponse[ItemSchema])
def read_items(
page: int = 1,
page_size: int = 10,
db: Session = Depends(get_db)
):
query = db.query(Item)
return paginate(query, page, page_size, ItemSchema)
Validation
Custom Validators
from pydantic import BaseModel, validator, root_validator
class UserCreate(BaseModel):
username: str
email: str
password: str
password_confirm: str
@validator('username')
def username_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('Username must be alphanumeric')
return v
@validator('password')
def password_strength(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain a number')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain an uppercase letter')
return v
@root_validator
def passwords_match(cls, values):
password = values.get('password')
password_confirm = values.get('password_confirm')
if password != password_confirm:
raise ValueError('Passwords do not match')
return values
Caching
Redis Caching Decorator
import redis
import json
from functools import wraps
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache(expire: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Execute function
result = await func(*args, **kwargs)
# Store in cache
redis_client.setex(cache_key, expire, json.dumps(result))
return result
return wrapper
return decorator
@app.get("/expensive-operation/")
@cache(expire=3600)
async def expensive_operation(param: str):
# Expensive computation
result = perform_expensive_calculation(param)
return result
Monitoring and Health Checks
Health Check Endpoint
from sqlalchemy import text
@app.get("/health")
async def health_check(db: Session = Depends(get_db)):
try:
# Check database
db.execute(text("SELECT 1"))
# Check Redis
redis_client.ping()
return {
"status": "healthy",
"database": "ok",
"cache": "ok",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"error": str(e)
}
)
Metrics Endpoint
from prometheus_client import Counter, Histogram, generate_latest
request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
with request_duration.labels(
method=request.method,
endpoint=request.url.path
).time():
response = await call_next(request)
request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
return response
@app.get("/metrics")
def metrics():
return Response(generate_latest(), media_type="text/plain")
Deployment
Production Configuration
# production.py
import multiprocessing
# Gunicorn configuration
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
bind = "0.0.0.0:8000"
keepalive = 120
errorlog = "-"
accesslog = "-"
loglevel = "info"
Docker Configuration
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY ./app /app
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 8000
# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/mydb
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
db:
image: postgres:15
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=mydb
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
Security Checklist
- Use HTTPS in production
- Implement rate limiting
- Validate all user input
- Hash passwords with bcrypt
- Use environment variables for secrets
- Enable CORS properly
- Add security headers
- Implement proper authentication
- Use parameterized queries
- Keep dependencies updated
- Implement request size limits
- Add CSRF protection where needed
- Use secure session management
- Implement proper authorization
- Log security events
Performance Optimization
Query Optimization
from sqlalchemy.orm import joinedload
# Bad: N+1 queries
users = db.query(User).all()
for user in users:
print(user.items) # Separate query for each user
# Good: Eager loading
users = db.query(User).options(joinedload(User.items)).all()
for user in users:
print(user.items) # No additional queries
Connection Pooling
engine = create_engine(
DATABASE_URL,
pool_size=20,
max_overflow=0,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)
Response Compression
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
Testing Best Practices
# Use fixtures for common setup
@pytest.fixture
def test_user(db):
user = User(username="test", email="test@example.com")
db.add(user)
db.commit()
return user
# Test one thing per test
def test_user_creation(client):
response = client.post("/users/", json={"username": "test"})
assert response.status_code == 201
# Use parametrize for multiple scenarios
@pytest.mark.parametrize("status_code,expected", [
(200, "success"),
(404, "not found"),
])
def test_responses(client, status_code, expected):
# Test logic
pass
Documentation
Comprehensive Docstrings
@app.post("/users/", response_model=User, tags=["users"])
async def create_user(
user: UserCreate,
db: Session = Depends(get_db)
):
"""
Create a new user.
- **username**: unique username (3-50 characters)
- **email**: valid email address
- **password**: minimum 8 characters
Returns the created user object.
"""
return create_user_in_db(db, user)
Best Practices Summary
- Structure: Use clear, organized project structure
- Configuration: Use Pydantic Settings for configuration
- Error Handling: Implement standardized error responses
- Logging: Use structured logging
- Security: Follow security best practices
- Testing: Write comprehensive tests
- Documentation: Document your API thoroughly
- Performance: Optimize database queries and use caching
- Monitoring: Implement health checks and metrics
- Deployment: Use proper production configuration
Following these best practices will help you build maintainable, scalable, and production-ready FastAPI applications.