Authentication & Security
Comprehensive guide to implementing authentication and security in FastAPI, including OAuth2, JWT tokens, API keys, and security best practices.
Password Hashing
Using passlib
pip install "passlib[bcrypt]"
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
OAuth2 with Password Flow
Basic OAuth2 Setup
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
from jose import JWTError, jwt
from typing import Optional
# Configuration
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
Token Creation
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
User Authentication
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
Login Endpoint
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
JWT Token with Refresh Tokens
Enhanced Token System
from datetime import datetime, timedelta
from typing import Optional
# Configuration
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
@app.post("/token", response_model=TokenPair)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
access_token = create_access_token(data={"sub": user.username})
refresh_token = create_refresh_token(data={"sub": user.username})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@app.post("/token/refresh", response_model=Token)
async def refresh_token(refresh_token: str):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(
status_code=401,
detail="Invalid token type"
)
username = payload.get("sub")
new_access_token = create_access_token(data={"sub": username})
return {
"access_token": new_access_token,
"token_type": "bearer"
}
except JWTError:
raise HTTPException(
status_code=401,
detail="Invalid refresh token"
)
API Key Authentication
Simple API Key
from fastapi.security import APIKeyHeader
API_KEY = "your-api-key"
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def get_api_key(api_key: str = Depends(api_key_header)):
if api_key != API_KEY:
raise HTTPException(
status_code=403,
detail="Could not validate API key"
)
return api_key
@app.get("/protected/")
async def protected_route(api_key: str = Depends(get_api_key)):
return {"message": "Access granted"}
API Key with Database
from sqlalchemy.orm import Session
class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String, unique=True, index=True)
name = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
user_id = Column(Integer, ForeignKey("users.id"))
async def validate_api_key(
api_key: str = Depends(api_key_header),
db: Session = Depends(get_db)
):
if not api_key:
raise HTTPException(
status_code=403,
detail="API key required"
)
db_key = db.query(APIKey).filter(
APIKey.key == api_key,
APIKey.is_active == True
).first()
if not db_key:
raise HTTPException(
status_code=403,
detail="Invalid API key"
)
return db_key
@app.get("/protected/")
async def protected_route(api_key: APIKey = Depends(validate_api_key)):
return {"message": "Access granted", "key_name": api_key.name}
Role-Based Access Control (RBAC)
User Roles
from enum import Enum
class Role(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
email = Column(String, unique=True)
hashed_password = Column(String)
role = Column(String, default=Role.USER)
def require_role(required_role: Role):
def role_checker(current_user: User = Depends(get_current_user)):
if current_user.role != required_role:
raise HTTPException(
status_code=403,
detail=f"Access denied. Required role: {required_role}"
)
return current_user
return role_checker
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(require_role(Role.ADMIN))
):
# Only admins can delete users
return {"message": "User deleted"}
@app.post("/moderate/")
async def moderate_content(
current_user: User = Depends(require_role(Role.MODERATOR))
):
return {"message": "Content moderated"}
Permission-Based Access
from typing import List
class Permission(str, Enum):
READ_USERS = "read:users"
WRITE_USERS = "write:users"
DELETE_USERS = "delete:users"
class User(Base):
__tablename__ = "users"
# ... other fields
permissions = Column(JSON) # List of permissions
def has_permission(required_permission: Permission):
def permission_checker(current_user: User = Depends(get_current_user)):
if required_permission not in current_user.permissions:
raise HTTPException(
status_code=403,
detail="Insufficient permissions"
)
return current_user
return permission_checker
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: User = Depends(has_permission(Permission.DELETE_USERS))
):
return {"message": "User deleted"}
OAuth2 with Scopes
Implementing Scopes
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"read:items": "Read items",
"write:items": "Write items",
"admin": "Admin access"
}
)
def create_access_token(data: dict, scopes: List[str]):
to_encode = data.copy()
to_encode.update({
"scopes": scopes,
"exp": datetime.utcnow() + timedelta(minutes=30)
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_scopes = payload.get("scopes", [])
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# Check if token has required scopes
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=403,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
user = get_user(username)
return user
@app.get("/items/")
async def read_items(
current_user: User = Security(get_current_user, scopes=["read:items"])
):
return {"items": []}
@app.post("/items/")
async def create_item(
item: Item,
current_user: User = Security(get_current_user, scopes=["write:items"])
):
return {"item": item}
Security Best Practices
CORS Configuration
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # Specific origins
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
max_age=3600,
)
Rate Limiting
from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/limited/")
@limiter.limit("5/minute")
async def limited_endpoint(request: Request):
return {"message": "This endpoint is rate limited"}
HTTPS Enforcement
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
# Redirect all HTTP to HTTPS
app.add_middleware(HTTPSRedirectMiddleware)
Security Headers
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["yourdomain.com", "*.yourdomain.com"]
)
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000"
return response
Input Validation and Sanitization
SQL Injection Prevention
# GOOD - Using ORM with parameterized queries
@app.get("/users/{username}")
def get_user(username: str, db: Session = Depends(get_db)):
user = db.query(User).filter(User.username == username).first()
return user
# BAD - String interpolation (vulnerable to SQL injection)
# NEVER DO THIS:
# query = f"SELECT * FROM users WHERE username = '{username}'"
XSS Prevention
from pydantic import validator
import bleach
class Comment(BaseModel):
content: str
@validator('content')
def sanitize_content(cls, v):
# Remove potentially dangerous HTML
return bleach.clean(v, tags=[], strip=True)
@app.post("/comments/")
def create_comment(comment: Comment):
# Content is now sanitized
return comment
Token Blacklisting
Implementing Token Revocation
from redis import Redis
redis_client = Redis(host='localhost', port=6379, db=0)
def blacklist_token(token: str, expires_in: int):
redis_client.setex(f"blacklist:{token}", expires_in, "true")
def is_token_blacklisted(token: str) -> bool:
return redis_client.exists(f"blacklist:{token}") > 0
async def get_current_user(token: str = Depends(oauth2_scheme)):
if is_token_blacklisted(token):
raise HTTPException(
status_code=401,
detail="Token has been revoked"
)
# ... rest of authentication logic
@app.post("/logout/")
async def logout(current_user: User = Depends(get_current_user)):
# Blacklist the current token
blacklist_token(token, expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60)
return {"message": "Successfully logged out"}
Two-Factor Authentication (2FA)
TOTP Implementation
pip install pyotp qrcode
import pyotp
import qrcode
from io import BytesIO
from fastapi.responses import StreamingResponse
@app.post("/2fa/setup/")
async def setup_2fa(current_user: User = Depends(get_current_user)):
# Generate secret
secret = pyotp.random_base32()
# Save secret to user
current_user.totp_secret = secret
db.commit()
# Generate QR code
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=current_user.email,
issuer_name="Your App"
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buf = BytesIO()
img.save(buf)
buf.seek(0)
return StreamingResponse(buf, media_type="image/png")
@app.post("/2fa/verify/")
async def verify_2fa(
code: str,
current_user: User = Depends(get_current_user)
):
totp = pyotp.TOTP(current_user.totp_secret)
if totp.verify(code):
return {"message": "2FA verified successfully"}
raise HTTPException(status_code=400, detail="Invalid 2FA code")
Best Practices
- Never Store Plain Passwords: Always hash passwords with bcrypt or argon2
- Use HTTPS: Always use HTTPS in production
- Validate Input: Validate and sanitize all user input
- Rate Limiting: Implement rate limiting to prevent abuse
- Token Expiration: Use short-lived access tokens with refresh tokens
- Secure Headers: Add security headers to all responses
- CORS Configuration: Configure CORS properly for your use case
- Principle of Least Privilege: Grant minimal necessary permissions
- Audit Logging: Log authentication and authorization events
- Regular Updates: Keep dependencies updated for security patches
Security in FastAPI is comprehensive and flexible. Following these patterns will help you build secure, production-ready APIs.