Authentication
Flask provides flexible authentication mechanisms through various extensions and patterns. This guide covers user authentication, session management, and security best practices.
Setup and Configuration
Installation
pip install Flask-Login
pip install Flask-Bcrypt
pip install Flask-JWT-Extended # For JWT authentication
Basic Configuration
from flask import Flask
from flask_login import LoginManager
from flask_bcrypt import Bcrypt
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
# Initialize extensions
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Please log in to access this page.'
bcrypt = Bcrypt(app)
User Model
Flask-Login User Model
from flask_login import UserMixin
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(120), nullable=False)
active = db.Column(db.Boolean, default=True)
def set_password(self, password):
self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')
def check_password(self, password):
return bcrypt.check_password_hash(self.password_hash, password)
def is_active(self):
return self.active
def __repr__(self):
return f'<User {self.username}>'
User Loader Function
from flask_login import LoginManager
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
Password Handling
Password Hashing
from flask_bcrypt import Bcrypt
bcrypt = Bcrypt(app)
# Hash password
password_hash = bcrypt.generate_password_hash('mypassword').decode('utf-8')
# Check password
is_valid = bcrypt.check_password_hash(password_hash, 'mypassword')
Alternative Hashing with Werkzeug
from werkzeug.security import generate_password_hash, check_password_hash
class User(db.Model):
# ... other fields ...
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
Password Strength Validation
import re
def validate_password_strength(password):
"""Validate password strength"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not re.search(r"[A-Z]", password):
return False, "Password must contain at least one uppercase letter"
if not re.search(r"[a-z]", password):
return False, "Password must contain at least one lowercase letter"
if not re.search(r"\d", password):
return False, "Password must contain at least one digit"
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False, "Password must contain at least one special character"
return True, "Password is strong"
Session-Based Authentication
Login and Logout
from flask import request, render_template, redirect, url_for, flash
from flask_login import login_user, logout_user, login_required, current_user
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
remember_me = bool(request.form.get('remember_me'))
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user, remember=remember_me)
# Redirect to next page or dashboard
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('dashboard'))
else:
flash('Invalid username or password', 'error')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('You have been logged out', 'info')
return redirect(url_for('index'))
Registration
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']
# Check if user already exists
if User.query.filter_by(username=username).first():
flash('Username already exists', 'error')
return render_template('register.html')
if User.query.filter_by(email=email).first():
flash('Email already registered', 'error')
return render_template('register.html')
# Validate password strength
is_strong, message = validate_password_strength(password)
if not is_strong:
flash(message, 'error')
return render_template('register.html')
# Create new user
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('Registration successful! Please log in.', 'success')
return redirect(url_for('login'))
return render_template('register.html')
Protected Routes
from flask_login import login_required, current_user
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html', user=current_user)
@app.route('/profile')
@login_required
def profile():
return render_template('profile.html', user=current_user)
Role-Based Access Control
Role Model
from enum import Enum
class Role(Enum):
USER = "user"
ADMIN = "admin"
MODERATOR = "moderator"
class User(UserMixin, db.Model):
# ... existing fields ...
role = db.Column(db.Enum(Role), default=Role.USER)
def has_role(self, role):
return self.role == role
def is_admin(self):
return self.role == Role.ADMIN
Permission Decorators
from functools import wraps
from flask import abort
from flask_login import current_user
def require_role(role):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.has_role(role):
abort(403) # Forbidden
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin():
abort(403)
return f(*args, **kwargs)
return decorated_function
# Usage
@app.route('/admin')
@login_required
@admin_required
def admin_panel():
return render_template('admin.html')
@app.route('/moderate')
@login_required
@require_role(Role.MODERATOR)
def moderate():
return render_template('moderate.html')
JWT Authentication
JWT Setup
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
jwt = JWTManager(app)
JWT Login
from flask_jwt_extended import create_access_token, create_refresh_token
@app.route('/api/login', methods=['POST'])
def api_login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email
}
}
return {'message': 'Invalid credentials'}, 401
JWT Protected Routes
from flask_jwt_extended import jwt_required, get_jwt_identity
@app.route('/api/profile', methods=['GET'])
@jwt_required()
def api_profile():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
return {
'id': user.id,
'username': user.username,
'email': user.email
}
@app.route('/api/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
current_user_id = get_jwt_identity()
new_token = create_access_token(identity=current_user_id)
return {'access_token': new_token}
OAuth Integration
Flask-Dance Setup
pip install Flask-Dance
Google OAuth
from flask_dance.contrib.google import make_google_blueprint, google
# Google OAuth blueprint
google_bp = make_google_blueprint(
client_id="your-google-client-id",
client_secret="your-google-client-secret",
scope=["openid", "email", "profile"]
)
app.register_blueprint(google_bp, url_prefix="/auth")
@app.route('/auth/google')
def google_login():
if not google.authorized:
return redirect(url_for("google.login"))
resp = google.get("/oauth2/v1/userinfo")
if not resp.ok:
return "Failed to fetch user info from Google", 400
google_info = resp.json()
# Create or get user
user = User.query.filter_by(email=google_info['email']).first()
if not user:
user = User(
username=google_info['email'],
email=google_info['email']
)
db.session.add(user)
db.session.commit()
login_user(user)
return redirect(url_for('dashboard'))
Two-Factor Authentication
TOTP Setup
pip install pyotp qrcode[pil]
2FA Implementation
import pyotp
import qrcode
from io import BytesIO
import base64
class User(UserMixin, db.Model):
# ... existing fields ...
totp_secret = db.Column(db.String(32))
two_factor_enabled = db.Column(db.Boolean, default=False)
def generate_totp_secret(self):
self.totp_secret = pyotp.random_base32()
return self.totp_secret
def get_totp_uri(self):
return pyotp.totp.TOTP(self.totp_secret).provisioning_uri(
name=self.email,
issuer_name="Your App Name"
)
def verify_totp(self, token):
totp = pyotp.TOTP(self.totp_secret)
return totp.verify(token)
@app.route('/setup-2fa')
@login_required
def setup_2fa():
if not current_user.totp_secret:
current_user.generate_totp_secret()
db.session.commit()
# Generate QR code
qr_uri = current_user.get_totp_uri()
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(qr_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffered = BytesIO()
img.save(buffered)
img_str = base64.b64encode(buffered.getvalue()).decode()
return render_template('setup_2fa.html', qr_code=img_str)
@app.route('/verify-2fa', methods=['POST'])
@login_required
def verify_2fa():
token = request.form['token']
if current_user.verify_totp(token):
current_user.two_factor_enabled = True
db.session.commit()
flash('Two-factor authentication enabled!', 'success')
return redirect(url_for('dashboard'))
else:
flash('Invalid token', 'error')
return redirect(url_for('setup_2fa'))
Password Reset
Password Reset Token
from itsdangerous import URLSafeTimedSerializer
def generate_reset_token(user):
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
return serializer.dumps(user.email, salt='password-reset-salt')
def verify_reset_token(token, expiration=3600):
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
try:
email = serializer.loads(
token,
salt='password-reset-salt',
max_age=expiration
)
return User.query.filter_by(email=email).first()
except:
return None
@app.route('/forgot-password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'POST':
email = request.form['email']
user = User.query.filter_by(email=email).first()
if user:
token = generate_reset_token(user)
reset_url = url_for('reset_password', token=token, _external=True)
# Send email (implement send_email function)
send_email(
to=user.email,
subject='Password Reset Request',
template='email/reset_password.html',
user=user,
reset_url=reset_url
)
flash('If your email is registered, you will receive a password reset link.', 'info')
return redirect(url_for('login'))
return render_template('forgot_password.html')
@app.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
user = verify_reset_token(token)
if not user:
flash('Invalid or expired token', 'error')
return redirect(url_for('forgot_password'))
if request.method == 'POST':
password = request.form['password']
confirm_password = request.form['confirm_password']
if password != confirm_password:
flash('Passwords do not match', 'error')
return render_template('reset_password.html', token=token)
is_strong, message = validate_password_strength(password)
if not is_strong:
flash(message, 'error')
return render_template('reset_password.html', token=token)
user.set_password(password)
db.session.commit()
flash('Your password has been reset!', 'success')
return redirect(url_for('login'))
return render_template('reset_password.html', token=token)
Security Middleware
Rate Limiting
pip install Flask-Limiter
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route('/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
# Login logic
pass
CSRF Protection
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# Exempt specific routes
@app.route('/api/webhook', methods=['POST'])
@csrf.exempt
def webhook():
return "OK"
Session Security
Session Configuration
from datetime import timedelta
app.config.update(
SESSION_COOKIE_SECURE=True, # HTTPS only
SESSION_COOKIE_HTTPONLY=True, # No JavaScript access
SESSION_COOKIE_SAMESITE='Lax',
PERMANENT_SESSION_LIFETIME=timedelta(hours=1)
)
Session Management
from flask import session
from datetime import datetime
@app.before_request
def make_session_permanent():
session.permanent = True
@app.before_request
def check_session_timeout():
if 'user_id' in session:
last_activity = session.get('last_activity')
if last_activity:
if datetime.now() - last_activity > app.permanent_session_lifetime:
session.clear()
flash('Session expired. Please log in again.', 'warning')
return redirect(url_for('login'))
session['last_activity'] = datetime.now()
Testing Authentication
Authentication Tests
import pytest
from app import app, db, User
@pytest.fixture
def client():
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
with app.test_client() as client:
with app.app_context():
db.create_all()
yield client
db.drop_all()
@pytest.fixture
def user():
user = User(username='testuser', email='test@example.com')
user.set_password('testpassword')
db.session.add(user)
db.session.commit()
return user
def test_login(client, user):
response = client.post('/login', data={
'username': 'testuser',
'password': 'testpassword'
})
assert response.status_code == 302 # Redirect after login
def test_invalid_login(client, user):
response = client.post('/login', data={
'username': 'testuser',
'password': 'wrongpassword'
})
assert b'Invalid username or password' in response.data
def test_protected_route(client, user):
# Test without login
response = client.get('/dashboard')
assert response.status_code == 302 # Redirect to login
# Test with login
client.post('/login', data={
'username': 'testuser',
'password': 'testpassword'
})
response = client.get('/dashboard')
assert response.status_code == 200
Best Practices
Security Checklist
# 1. Always hash passwords
user.set_password(password) # Never store plain text
# 2. Use HTTPS in production
app.config['SESSION_COOKIE_SECURE'] = True
# 3. Implement rate limiting
@limiter.limit("5 per minute")
def login():
pass
# 4. Validate input
@login_required
def update_profile():
username = request.form.get('username', '').strip()
if not username or len(username) < 3:
flash('Invalid username', 'error')
return redirect(url_for('profile'))
# 5. Use CSRF protection
csrf = CSRFProtect(app)
# 6. Implement proper session management
session.permanent = True
app.permanent_session_lifetime = timedelta(hours=1)
# 7. Log security events
import logging
logger = logging.getLogger(__name__)
@app.route('/login', methods=['POST'])
def login():
username = request.form['username']
if not user or not user.check_password(password):
logger.warning(f'Failed login attempt for username: {username}')
else:
logger.info(f'Successful login for username: {username}')