Skip to main content

Database Integration

SQLAlchemy Setup

Basic Configuration

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"

engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # Only for SQLite
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

Database Models

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, DateTime, Float
from sqlalchemy.orm import relationship
from datetime import datetime

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)

items = relationship("Item", back_populates="owner")

class Item(Base):
__tablename__ = "items"

id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String)
price = Column(Float)
owner_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)

owner = relationship("User", back_populates="items")

CRUD Operations

Create Operations

from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException

def create_user(db: Session, user: UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = User(
username=user.username,
email=user.email,
hashed_password=fake_hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user

@app.post("/users/", response_model=UserResponse)
def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)):
db_user = get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return create_user(db=db, user=user)

Read Operations

def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()

def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()

def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()

def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(Item).offset(skip).limit(limit).all()

@app.get("/users/{user_id}", response_model=UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user

Update Operations

def update_user(db: Session, user_id: int, user: UserUpdate):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
for key, value in user.dict(exclude_unset=True).items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user

@app.put("/users/{user_id}", response_model=UserResponse)
def update_user_endpoint(
user_id: int,
user: UserUpdate,
db: Session = Depends(get_db)
):
db_user = update_user(db, user_id, user)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user

Delete Operations

def delete_user(db: Session, user_id: int):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
db.delete(db_user)
db.commit()
return True
return False

@app.delete("/users/{user_id}")
def delete_user_endpoint(user_id: int, db: Session = Depends(get_db)):
success = delete_user(db, user_id)
if not success:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "User deleted successfully"}

Database Migrations

Alembic Setup

pip install alembic
alembic init alembic

Alembic Configuration

# alembic/env.py
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.database import Base
from app.models import User, Item # Import all models

target_metadata = Base.metadata

def run_migrations_online():
configuration = context.config
configuration.set_main_option("sqlalchemy.url", DATABASE_URL)

connectable = engine_from_config(
configuration.get_section(configuration.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)

with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)

with context.begin_transaction():
context.run_migrations()

Migration Commands

# Create migration
alembic revision --autogenerate -m "Add user table"

# Apply migrations
alembic upgrade head

# Downgrade
alembic downgrade -1

# Show current revision
alembic current

# Show migration history
alembic history

Async Database Operations

AsyncSQLAlchemy Setup

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite+aiosqlite:///./sql_app.db"

engine = create_async_engine(SQLALCHEMY_DATABASE_URL)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)

async def get_db():
async with AsyncSessionLocal() as session:
yield session

Async CRUD Operations

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

async def get_user_async(db: AsyncSession, user_id: int):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()

async def get_users_async(db: AsyncSession, skip: int = 0, limit: int = 100):
result = await db.execute(select(User).offset(skip).limit(limit))
return result.scalars().all()

async def create_user_async(db: AsyncSession, user: UserCreate):
db_user = User(
username=user.username,
email=user.email,
hashed_password=user.password + "notreallyhashed"
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user

@app.post("/users/", response_model=UserResponse)
async def create_user_async_endpoint(
user: UserCreate,
db: AsyncSession = Depends(get_db)
):
return await create_user_async(db=db, user=user)

Database Relationships

One-to-Many Relationships

# User has many items
class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True)
username = Column(String, unique=True)

items = relationship("Item", back_populates="owner")

class Item(Base):
__tablename__ = "items"

id = Column(Integer, primary_key=True)
title = Column(String)
owner_id = Column(Integer, ForeignKey("users.id"))

owner = relationship("User", back_populates="items")

Many-to-Many Relationships

from sqlalchemy import Table

# Association table
item_tags = Table(
'item_tags',
Base.metadata,
Column('item_id', Integer, ForeignKey('items.id')),
Column('tag_id', Integer, ForeignKey('tags.id'))
)

class Item(Base):
__tablename__ = "items"

id = Column(Integer, primary_key=True)
title = Column(String)

tags = relationship("Tag", secondary=item_tags, back_populates="items")

class Tag(Base):
__tablename__ = "tags"

id = Column(Integer, primary_key=True)
name = Column(String, unique=True)

items = relationship("Item", secondary=item_tags, back_populates="tags")

Database Transactions

Transaction Management

from sqlalchemy.orm import Session
from contextlib import contextmanager

@contextmanager
def db_transaction(db: Session):
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()

def transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float):
with db_transaction(db):
from_user = get_user(db, from_user_id)
to_user = get_user(db, to_user_id)

if from_user.balance < amount:
raise ValueError("Insufficient funds")

from_user.balance -= amount
to_user.balance += amount

Database Connection Pooling

Connection Pool Configuration

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)

Database Testing

Test Database Setup

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import pytest

SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"

engine = create_engine(SQLALCHEMY_DATABASE_URL)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

@pytest.fixture
def db_session():
Base.metadata.create_all(bind=engine)
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
Base.metadata.drop_all(bind=engine)

def test_create_user(db_session):
user = UserCreate(username="testuser", email="test@example.com", password="testpass")
db_user = create_user(db_session, user)
assert db_user.username == "testuser"
assert db_user.email == "test@example.com"