Database Integration
SQLAlchemy Setup
Basic Configuration
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # Only for SQLite
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Database Models
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, DateTime, Float
from sqlalchemy.orm import relationship
from datetime import datetime
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String)
price = Column(Float)
owner_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)
owner = relationship("User", back_populates="items")
CRUD Operations
Create Operations
from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException
def create_user(db: Session, user: UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = User(
username=user.username,
email=user.email,
hashed_password=fake_hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.post("/users/", response_model=UserResponse)
def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)):
db_user = get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return create_user(db=db, user=user)
Read Operations
def get_user(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(User).filter(User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(User).offset(skip).limit(limit).all()
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(Item).offset(skip).limit(limit).all()
@app.get("/users/{user_id}", response_model=UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
Update Operations
def update_user(db: Session, user_id: int, user: UserUpdate):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
for key, value in user.dict(exclude_unset=True).items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
@app.put("/users/{user_id}", response_model=UserResponse)
def update_user_endpoint(
user_id: int,
user: UserUpdate,
db: Session = Depends(get_db)
):
db_user = update_user(db, user_id, user)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
Delete Operations
def delete_user(db: Session, user_id: int):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user:
db.delete(db_user)
db.commit()
return True
return False
@app.delete("/users/{user_id}")
def delete_user_endpoint(user_id: int, db: Session = Depends(get_db)):
success = delete_user(db, user_id)
if not success:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "User deleted successfully"}
Database Migrations
Alembic Setup
pip install alembic
alembic init alembic
Alembic Configuration
# alembic/env.py
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from app.database import Base
from app.models import User, Item # Import all models
target_metadata = Base.metadata
def run_migrations_online():
configuration = context.config
configuration.set_main_option("sqlalchemy.url", DATABASE_URL)
connectable = engine_from_config(
configuration.get_section(configuration.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
Migration Commands
# Create migration
alembic revision --autogenerate -m "Add user table"
# Apply migrations
alembic upgrade head
# Downgrade
alembic downgrade -1
# Show current revision
alembic current
# Show migration history
alembic history
Async Database Operations
AsyncSQLAlchemy Setup
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite+aiosqlite:///./sql_app.db"
engine = create_async_engine(SQLALCHEMY_DATABASE_URL)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
Async CRUD Operations
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
async def get_user_async(db: AsyncSession, user_id: int):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_users_async(db: AsyncSession, skip: int = 0, limit: int = 100):
result = await db.execute(select(User).offset(skip).limit(limit))
return result.scalars().all()
async def create_user_async(db: AsyncSession, user: UserCreate):
db_user = User(
username=user.username,
email=user.email,
hashed_password=user.password + "notreallyhashed"
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@app.post("/users/", response_model=UserResponse)
async def create_user_async_endpoint(
user: UserCreate,
db: AsyncSession = Depends(get_db)
):
return await create_user_async(db=db, user=user)
Database Relationships
One-to-Many Relationships
# User has many items
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
Many-to-Many Relationships
from sqlalchemy import Table
# Association table
item_tags = Table(
'item_tags',
Base.metadata,
Column('item_id', Integer, ForeignKey('items.id')),
Column('tag_id', Integer, ForeignKey('tags.id'))
)
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
title = Column(String)
tags = relationship("Tag", secondary=item_tags, back_populates="items")
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
items = relationship("Item", secondary=item_tags, back_populates="tags")
Database Transactions
Transaction Management
from sqlalchemy.orm import Session
from contextlib import contextmanager
@contextmanager
def db_transaction(db: Session):
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
def transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float):
with db_transaction(db):
from_user = get_user(db, from_user_id)
to_user = get_user(db, to_user_id)
if from_user.balance < amount:
raise ValueError("Insufficient funds")
from_user.balance -= amount
to_user.balance += amount
Database Connection Pooling
Connection Pool Configuration
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
Database Testing
Test Database Setup
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import pytest
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture
def db_session():
Base.metadata.create_all(bind=engine)
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
Base.metadata.drop_all(bind=engine)
def test_create_user(db_session):
user = UserCreate(username="testuser", email="test@example.com", password="testpass")
db_user = create_user(db_session, user)
assert db_user.username == "testuser"
assert db_user.email == "test@example.com"