Skip to main content

Database Integration

Learn how to integrate databases with FastAPI using SQLAlchemy, including setup, models, queries, migrations, and best practices for production applications.

SQLAlchemy Setup

Installation

pip install sqlalchemy
pip install psycopg2-binary # For PostgreSQL
# or
pip install pymysql # For MySQL
# or
pip install aiosqlite # For async SQLite

Database Configuration

database.py:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Database URL format:
# PostgreSQL: "postgresql://user:password@localhost/dbname"
# MySQL: "mysql://user:password@localhost/dbname"
# SQLite: "sqlite:///./app.db"

SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname"

engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_pre_ping=True, # Check connection before using
pool_size=10, # Connection pool size
max_overflow=20 # Extra connections when pool is full
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

Database Dependency

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

Database Models

Basic Model

from sqlalchemy import Boolean, Column, Integer, String, Float, DateTime
from sqlalchemy.sql import func
from database import Base

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

Relationships

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)

# One-to-many relationship
items = relationship("Item", back_populates="owner")
profile = relationship("Profile", back_populates="user", uselist=False)

class Item(Base):
__tablename__ = "items"

id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String)
owner_id = Column(Integer, ForeignKey("users.id"))

# Many-to-one relationship
owner = relationship("User", back_populates="items")

class Profile(Base):
__tablename__ = "profiles"

id = Column(Integer, primary_key=True, index=True)
bio = Column(String)
user_id = Column(Integer, ForeignKey("users.id"), unique=True)

# One-to-one relationship
user = relationship("User", back_populates="profile")

Many-to-Many Relationships

from sqlalchemy import Table

# Association table
item_tags = Table(
'item_tags',
Base.metadata,
Column('item_id', Integer, ForeignKey('items.id')),
Column('tag_id', Integer, ForeignKey('tags.id'))
)

class Item(Base):
__tablename__ = "items"

id = Column(Integer, primary_key=True, index=True)
title = Column(String)

# Many-to-many relationship
tags = relationship("Tag", secondary=item_tags, back_populates="items")

class Tag(Base):
__tablename__ = "tags"

id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True)

items = relationship("Item", secondary=item_tags, back_populates="tags")

Pydantic Schemas

Separating Models and Schemas

schemas.py:

from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import List, Optional

# Base schemas
class ItemBase(BaseModel):
title: str
description: Optional[str] = None

class ItemCreate(ItemBase):
pass

class ItemUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None

class Item(ItemBase):
id: int
owner_id: int
created_at: datetime

class Config:
from_attributes = True # Pydantic v2 (was orm_mode in v1)

# User schemas
class UserBase(BaseModel):
email: EmailStr
username: str

class UserCreate(UserBase):
password: str

class User(UserBase):
id: int
is_active: bool
items: List[Item] = []

class Config:
from_attributes = True

CRUD Operations

Create Operations

from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException

@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Check if user exists
db_user = db.query(models.User).filter(
models.User.email == user.email
).first()

if db_user:
raise HTTPException(status_code=400, detail="Email already registered")

# Create new user
hashed_password = get_password_hash(user.password)
db_user = models.User(
email=user.email,
username=user.username,
hashed_password=hashed_password
)

db.add(db_user)
db.commit()
db.refresh(db_user) # Get the created object with ID

return db_user

Read Operations

@app.get("/users/", response_model=List[schemas.User])
def read_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
users = db.query(models.User).offset(skip).limit(limit).all()
return users

@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == user_id).first()

if user is None:
raise HTTPException(status_code=404, detail="User not found")

return user

Update Operations

@app.put("/users/{user_id}", response_model=schemas.User)
def update_user(
user_id: int,
user: schemas.UserUpdate,
db: Session = Depends(get_db)
):
db_user = db.query(models.User).filter(models.User.id == user_id).first()

if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

# Update only provided fields
update_data = user.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)

db.commit()
db.refresh(db_user)

return db_user

@app.patch("/users/{user_id}", response_model=schemas.User)
def partial_update_user(
user_id: int,
user: schemas.UserUpdate,
db: Session = Depends(get_db)
):
# Similar to PUT but explicitly for partial updates
# Implementation same as above
pass

Delete Operations

@app.delete("/users/{user_id}", status_code=204)
def delete_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(models.User).filter(models.User.id == user_id).first()

if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

db.delete(db_user)
db.commit()

return None

Advanced Queries

Filtering and Searching

@app.get("/items/search/", response_model=List[schemas.Item])
def search_items(
q: str | None = None,
min_price: float | None = None,
max_price: float | None = None,
db: Session = Depends(get_db)
):
query = db.query(models.Item)

if q:
query = query.filter(
models.Item.title.contains(q) |
models.Item.description.contains(q)
)

if min_price is not None:
query = query.filter(models.Item.price >= min_price)

if max_price is not None:
query = query.filter(models.Item.price <= max_price)

return query.all()

Sorting

@app.get("/items/", response_model=List[schemas.Item])
def read_items(
sort_by: str = "created_at",
order: str = "desc",
db: Session = Depends(get_db)
):
query = db.query(models.Item)

# Dynamic sorting
if order == "desc":
query = query.order_by(getattr(models.Item, sort_by).desc())
else:
query = query.order_by(getattr(models.Item, sort_by).asc())

return query.all()

Pagination

from typing import List

class PaginatedResponse(BaseModel):
items: List[schemas.Item]
total: int
page: int
pages: int

@app.get("/items/", response_model=PaginatedResponse)
def read_items(
page: int = 1,
page_size: int = 10,
db: Session = Depends(get_db)
):
# Get total count
total = db.query(models.Item).count()

# Calculate pagination
skip = (page - 1) * page_size
pages = (total + page_size - 1) // page_size

# Get items
items = db.query(models.Item).offset(skip).limit(page_size).all()

return PaginatedResponse(
items=items,
total=total,
page=page,
pages=pages
)

Joins and Relationships

@app.get("/users/{user_id}/items/", response_model=List[schemas.Item])
def read_user_items(user_id: int, db: Session = Depends(get_db)):
# Using relationship
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")

return user.items

@app.get("/items-with-owners/")
def read_items_with_owners(db: Session = Depends(get_db)):
# Explicit join
items = db.query(models.Item).join(models.User).all()
return items

Aggregations

from sqlalchemy import func

@app.get("/stats/")
def get_stats(db: Session = Depends(get_db)):
stats = db.query(
func.count(models.Item.id).label("total_items"),
func.avg(models.Item.price).label("avg_price"),
func.max(models.Item.price).label("max_price"),
func.min(models.Item.price).label("min_price")
).first()

return {
"total_items": stats.total_items,
"avg_price": float(stats.avg_price) if stats.avg_price else 0,
"max_price": float(stats.max_price) if stats.max_price else 0,
"min_price": float(stats.min_price) if stats.min_price else 0
}

Async Database Operations

Async SQLAlchemy Setup

pip install sqlalchemy[asyncio]
pip install asyncpg # For PostgreSQL

database.py:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL, echo=True)

AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)

async def get_async_db():
async with AsyncSessionLocal() as session:
yield session

Async CRUD Operations

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select

@app.get("/users/", response_model=List[schemas.User])
async def read_users(db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(models.User))
users = result.scalars().all()
return users

@app.post("/users/", response_model=schemas.User)
async def create_user(
user: schemas.UserCreate,
db: AsyncSession = Depends(get_async_db)
):
db_user = models.User(**user.dict())
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user

@app.get("/users/{user_id}", response_model=schemas.User)
async def read_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
result = await db.execute(
select(models.User).filter(models.User.id == user_id)
)
user = result.scalar_one_or_none()

if user is None:
raise HTTPException(status_code=404, detail="User not found")

return user

Database Migrations

Alembic Setup

pip install alembic
alembic init alembic

alembic/env.py:

from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
from app.database import Base
from app.models import * # Import all models

# this is the Alembic Config object
config = context.config

# Set database URL
config.set_main_option("sqlalchemy.url", "postgresql://user:pass@localhost/db")

# Import Base metadata
target_metadata = Base.metadata

def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)

with context.begin_transaction():
context.run_migrations()

run_migrations_online()

Creating Migrations

# Auto-generate migration
alembic revision --autogenerate -m "Create users table"

# Apply migrations
alembic upgrade head

# Rollback migration
alembic downgrade -1

# Show current revision
alembic current

# Show migration history
alembic history

Repository Pattern

Creating a Repository Layer

repositories/user_repository.py:

from sqlalchemy.orm import Session
from typing import List, Optional
from models import User
from schemas import UserCreate, UserUpdate

class UserRepository:
def __init__(self, db: Session):
self.db = db

def get(self, user_id: int) -> Optional[User]:
return self.db.query(User).filter(User.id == user_id).first()

def get_by_email(self, email: str) -> Optional[User]:
return self.db.query(User).filter(User.email == email).first()

def get_all(self, skip: int = 0, limit: int = 100) -> List[User]:
return self.db.query(User).offset(skip).limit(limit).all()

def create(self, user: UserCreate) -> User:
db_user = User(**user.dict())
self.db.add(db_user)
self.db.commit()
self.db.refresh(db_user)
return db_user

def update(self, user_id: int, user: UserUpdate) -> Optional[User]:
db_user = self.get(user_id)
if not db_user:
return None

update_data = user.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)

self.db.commit()
self.db.refresh(db_user)
return db_user

def delete(self, user_id: int) -> bool:
db_user = self.get(user_id)
if not db_user:
return False

self.db.delete(db_user)
self.db.commit()
return True

# Dependency
def get_user_repository(db: Session = Depends(get_db)):
return UserRepository(db)

# Usage in endpoint
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(
user_id: int,
repo: UserRepository = Depends(get_user_repository)
):
user = repo.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user

Best Practices

  1. Use Connection Pooling: Configure appropriate pool size for your application
  2. Session Management: Always close database sessions (use dependencies)
  3. Migrations: Use Alembic for schema changes in production
  4. Indexes: Add indexes on frequently queried columns
  5. Eager Loading: Use joinedload() to avoid N+1 queries
  6. Transactions: Use transactions for operations that must complete together
  7. Error Handling: Handle database errors gracefully
  8. Validation: Validate data before database operations
  9. Repository Pattern: Separate database logic from route handlers
  10. Environment Variables: Store database credentials securely

Transaction Management

from sqlalchemy.exc import SQLAlchemyError

@app.post("/transfer/")
def transfer_funds(
from_account: int,
to_account: int,
amount: float,
db: Session = Depends(get_db)
):
try:
# Debit from account
from_acc = db.query(Account).filter(Account.id == from_account).first()
from_acc.balance -= amount

# Credit to account
to_acc = db.query(Account).filter(Account.id == to_account).first()
to_acc.balance += amount

db.commit()
return {"status": "success"}

except SQLAlchemyError as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))

Database integration with FastAPI is straightforward and powerful. Following these patterns will help you build scalable, maintainable database-backed APIs.