Skip to main content

Authentication & Security

Comprehensive guide to implementing authentication and security in FastAPI, including OAuth2, JWT tokens, API keys, and security best practices.

Password Hashing

Using passlib

pip install "passlib[bcrypt]"
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

OAuth2 with Password Flow

Basic OAuth2 Setup

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
from jose import JWTError, jwt
from typing import Optional

# Configuration
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class Token(BaseModel):
access_token: str
token_type: str

class TokenData(BaseModel):
username: Optional[str] = None

class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None

class UserInDB(User):
hashed_password: str

Token Creation

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()

if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)

to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

return encoded_jwt

User Authentication

def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception

user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception

return user

async def get_current_active_user(
current_user: User = Depends(get_current_user)
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

Login Endpoint

@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(db, form_data.username, form_data.password)

if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)

return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user

JWT Token with Refresh Tokens

Enhanced Token System

from datetime import datetime, timedelta
from typing import Optional

# Configuration
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7

class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str

def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})

return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

@app.post("/token", response_model=TokenPair)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(db, form_data.username, form_data.password)

if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)

access_token = create_access_token(data={"sub": user.username})
refresh_token = create_refresh_token(data={"sub": user.username})

return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}

@app.post("/token/refresh", response_model=Token)
async def refresh_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])

if payload.get("type") != "refresh":
raise HTTPException(
status_code=401,
detail="Invalid token type"
)

username = payload.get("sub")
new_access_token = create_access_token(data={"sub": username})

return {
"access_token": new_access_token,
"token_type": "bearer"
}

except JWTError:
raise HTTPException(
status_code=401,
detail="Invalid refresh token"
)

API Key Authentication

Simple API Key

from fastapi.security import APIKeyHeader

API_KEY = "your-api-key"
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

async def get_api_key(api_key: str = Depends(api_key_header)):
if api_key != API_KEY:
raise HTTPException(
status_code=403,
detail="Could not validate API key"
)
return api_key

@app.get("/protected/")
async def protected_route(api_key: str = Depends(get_api_key)):
return {"message": "Access granted"}

API Key with Database

from sqlalchemy.orm import Session

class APIKey(Base):
__tablename__ = "api_keys"

id = Column(Integer, primary_key=True, index=True)
key = Column(String, unique=True, index=True)
name = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
user_id = Column(Integer, ForeignKey("users.id"))

async def validate_api_key(
api_key: str = Depends(api_key_header),
db: Session = Depends(get_db)
):
if not api_key:
raise HTTPException(
status_code=403,
detail="API key required"
)

db_key = db.query(APIKey).filter(
APIKey.key == api_key,
APIKey.is_active == True
).first()

if not db_key:
raise HTTPException(
status_code=403,
detail="Invalid API key"
)

return db_key

@app.get("/protected/")
async def protected_route(api_key: APIKey = Depends(validate_api_key)):
return {"message": "Access granted", "key_name": api_key.name}

Role-Based Access Control (RBAC)

User Roles

from enum import Enum

class Role(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
email = Column(String, unique=True)
hashed_password = Column(String)
role = Column(String, default=Role.USER)

def require_role(required_role: Role):
def role_checker(current_user: User = Depends(get_current_user)):
if current_user.role != required_role:
raise HTTPException(
status_code=403,
detail=f"Access denied. Required role: {required_role}"
)
return current_user
return role_checker

@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(require_role(Role.ADMIN))
):
# Only admins can delete users
return {"message": "User deleted"}

@app.post("/moderate/")
async def moderate_content(
current_user: User = Depends(require_role(Role.MODERATOR))
):
return {"message": "Content moderated"}

Permission-Based Access

from typing import List

class Permission(str, Enum):
READ_USERS = "read:users"
WRITE_USERS = "write:users"
DELETE_USERS = "delete:users"

class User(Base):
__tablename__ = "users"
# ... other fields
permissions = Column(JSON) # List of permissions

def has_permission(required_permission: Permission):
def permission_checker(current_user: User = Depends(get_current_user)):
if required_permission not in current_user.permissions:
raise HTTPException(
status_code=403,
detail="Insufficient permissions"
)
return current_user
return permission_checker

@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(has_permission(Permission.DELETE_USERS))
):
return {"message": "User deleted"}

OAuth2 with Scopes

Implementing Scopes

from fastapi.security import OAuth2PasswordBearer, SecurityScopes

oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"read:items": "Read items",
"write:items": "Write items",
"admin": "Admin access"
}
)

def create_access_token(data: dict, scopes: List[str]):
to_encode = data.copy()
to_encode.update({
"scopes": scopes,
"exp": datetime.utcnow() + timedelta(minutes=30)
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"

credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)

try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_scopes = payload.get("scopes", [])

if username is None:
raise credentials_exception

except JWTError:
raise credentials_exception

# Check if token has required scopes
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=403,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)

user = get_user(username)
return user

@app.get("/items/")
async def read_items(
current_user: User = Security(get_current_user, scopes=["read:items"])
):
return {"items": []}

@app.post("/items/")
async def create_item(
item: Item,
current_user: User = Security(get_current_user, scopes=["write:items"])
):
return {"item": item}

Security Best Practices

CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # Specific origins
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
max_age=3600,
)

Rate Limiting

from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/limited/")
@limiter.limit("5/minute")
async def limited_endpoint(request: Request):
return {"message": "This endpoint is rate limited"}

HTTPS Enforcement

from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

# Redirect all HTTP to HTTPS
app.add_middleware(HTTPSRedirectMiddleware)

Security Headers

from fastapi.middleware.trustedhost import TrustedHostMiddleware

app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["yourdomain.com", "*.yourdomain.com"]
)

@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000"
return response

Input Validation and Sanitization

SQL Injection Prevention

# GOOD - Using ORM with parameterized queries
@app.get("/users/{username}")
def get_user(username: str, db: Session = Depends(get_db)):
user = db.query(User).filter(User.username == username).first()
return user

# BAD - String interpolation (vulnerable to SQL injection)
# NEVER DO THIS:
# query = f"SELECT * FROM users WHERE username = '{username}'"

XSS Prevention

from pydantic import validator
import bleach

class Comment(BaseModel):
content: str

@validator('content')
def sanitize_content(cls, v):
# Remove potentially dangerous HTML
return bleach.clean(v, tags=[], strip=True)

@app.post("/comments/")
def create_comment(comment: Comment):
# Content is now sanitized
return comment

Token Blacklisting

Implementing Token Revocation

from redis import Redis

redis_client = Redis(host='localhost', port=6379, db=0)

def blacklist_token(token: str, expires_in: int):
redis_client.setex(f"blacklist:{token}", expires_in, "true")

def is_token_blacklisted(token: str) -> bool:
return redis_client.exists(f"blacklist:{token}") > 0

async def get_current_user(token: str = Depends(oauth2_scheme)):
if is_token_blacklisted(token):
raise HTTPException(
status_code=401,
detail="Token has been revoked"
)

# ... rest of authentication logic

@app.post("/logout/")
async def logout(current_user: User = Depends(get_current_user)):
# Blacklist the current token
blacklist_token(token, expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60)
return {"message": "Successfully logged out"}

Two-Factor Authentication (2FA)

TOTP Implementation

pip install pyotp qrcode
import pyotp
import qrcode
from io import BytesIO
from fastapi.responses import StreamingResponse

@app.post("/2fa/setup/")
async def setup_2fa(current_user: User = Depends(get_current_user)):
# Generate secret
secret = pyotp.random_base32()

# Save secret to user
current_user.totp_secret = secret
db.commit()

# Generate QR code
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=current_user.email,
issuer_name="Your App"
)

qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)

img = qr.make_image(fill_color="black", back_color="white")
buf = BytesIO()
img.save(buf)
buf.seek(0)

return StreamingResponse(buf, media_type="image/png")

@app.post("/2fa/verify/")
async def verify_2fa(
code: str,
current_user: User = Depends(get_current_user)
):
totp = pyotp.TOTP(current_user.totp_secret)

if totp.verify(code):
return {"message": "2FA verified successfully"}

raise HTTPException(status_code=400, detail="Invalid 2FA code")

Best Practices

  1. Never Store Plain Passwords: Always hash passwords with bcrypt or argon2
  2. Use HTTPS: Always use HTTPS in production
  3. Validate Input: Validate and sanitize all user input
  4. Rate Limiting: Implement rate limiting to prevent abuse
  5. Token Expiration: Use short-lived access tokens with refresh tokens
  6. Secure Headers: Add security headers to all responses
  7. CORS Configuration: Configure CORS properly for your use case
  8. Principle of Least Privilege: Grant minimal necessary permissions
  9. Audit Logging: Log authentication and authorization events
  10. Regular Updates: Keep dependencies updated for security patches

Security in FastAPI is comprehensive and flexible. Following these patterns will help you build secure, production-ready APIs.