Skip to main content

Authentication

Flask provides flexible authentication mechanisms through various extensions and patterns. This guide covers user authentication, session management, and security best practices.

Setup and Configuration

Installation

pip install Flask-Login
pip install Flask-Bcrypt
pip install Flask-JWT-Extended # For JWT authentication

Basic Configuration

from flask import Flask
from flask_login import LoginManager
from flask_bcrypt import Bcrypt

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# Initialize extensions
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Please log in to access this page.'

bcrypt = Bcrypt(app)

User Model

Flask-Login User Model

from flask_login import UserMixin
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy(app)

class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(120), nullable=False)
active = db.Column(db.Boolean, default=True)

def set_password(self, password):
self.password_hash = bcrypt.generate_password_hash(password).decode('utf-8')

def check_password(self, password):
return bcrypt.check_password_hash(self.password_hash, password)

def is_active(self):
return self.active

def __repr__(self):
return f'<User {self.username}>'

User Loader Function

from flask_login import LoginManager

@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))

Password Handling

Password Hashing

from flask_bcrypt import Bcrypt

bcrypt = Bcrypt(app)

# Hash password
password_hash = bcrypt.generate_password_hash('mypassword').decode('utf-8')

# Check password
is_valid = bcrypt.check_password_hash(password_hash, 'mypassword')

Alternative Hashing with Werkzeug

from werkzeug.security import generate_password_hash, check_password_hash

class User(db.Model):
# ... other fields ...

def set_password(self, password):
self.password_hash = generate_password_hash(password)

def check_password(self, password):
return check_password_hash(self.password_hash, password)

Password Strength Validation

import re

def validate_password_strength(password):
"""Validate password strength"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"

if not re.search(r"[A-Z]", password):
return False, "Password must contain at least one uppercase letter"

if not re.search(r"[a-z]", password):
return False, "Password must contain at least one lowercase letter"

if not re.search(r"\d", password):
return False, "Password must contain at least one digit"

if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False, "Password must contain at least one special character"

return True, "Password is strong"

Session-Based Authentication

Login and Logout

from flask import request, render_template, redirect, url_for, flash
from flask_login import login_user, logout_user, login_required, current_user

@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))

if request.method == 'POST':
username = request.form['username']
password = request.form['password']
remember_me = bool(request.form.get('remember_me'))

user = User.query.filter_by(username=username).first()

if user and user.check_password(password):
login_user(user, remember=remember_me)

# Redirect to next page or dashboard
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('dashboard'))
else:
flash('Invalid username or password', 'error')

return render_template('login.html')

@app.route('/logout')
@login_required
def logout():
logout_user()
flash('You have been logged out', 'info')
return redirect(url_for('index'))

Registration

@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))

if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']

# Check if user already exists
if User.query.filter_by(username=username).first():
flash('Username already exists', 'error')
return render_template('register.html')

if User.query.filter_by(email=email).first():
flash('Email already registered', 'error')
return render_template('register.html')

# Validate password strength
is_strong, message = validate_password_strength(password)
if not is_strong:
flash(message, 'error')
return render_template('register.html')

# Create new user
user = User(username=username, email=email)
user.set_password(password)

db.session.add(user)
db.session.commit()

flash('Registration successful! Please log in.', 'success')
return redirect(url_for('login'))

return render_template('register.html')

Protected Routes

from flask_login import login_required, current_user

@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html', user=current_user)

@app.route('/profile')
@login_required
def profile():
return render_template('profile.html', user=current_user)

Role-Based Access Control

Role Model

from enum import Enum

class Role(Enum):
USER = "user"
ADMIN = "admin"
MODERATOR = "moderator"

class User(UserMixin, db.Model):
# ... existing fields ...
role = db.Column(db.Enum(Role), default=Role.USER)

def has_role(self, role):
return self.role == role

def is_admin(self):
return self.role == Role.ADMIN

Permission Decorators

from functools import wraps
from flask import abort
from flask_login import current_user

def require_role(role):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.has_role(role):
abort(403) # Forbidden
return f(*args, **kwargs)
return decorated_function
return decorator

def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin():
abort(403)
return f(*args, **kwargs)
return decorated_function

# Usage
@app.route('/admin')
@login_required
@admin_required
def admin_panel():
return render_template('admin.html')

@app.route('/moderate')
@login_required
@require_role(Role.MODERATOR)
def moderate():
return render_template('moderate.html')

JWT Authentication

JWT Setup

from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity

app.config['JWT_SECRET_KEY'] = 'jwt-secret-string'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)

jwt = JWTManager(app)

JWT Login

from flask_jwt_extended import create_access_token, create_refresh_token

@app.route('/api/login', methods=['POST'])
def api_login():
data = request.get_json()
username = data.get('username')
password = data.get('password')

user = User.query.filter_by(username=username).first()

if user and user.check_password(password):
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)

return {
'access_token': access_token,
'refresh_token': refresh_token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email
}
}

return {'message': 'Invalid credentials'}, 401

JWT Protected Routes

from flask_jwt_extended import jwt_required, get_jwt_identity

@app.route('/api/profile', methods=['GET'])
@jwt_required()
def api_profile():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)

return {
'id': user.id,
'username': user.username,
'email': user.email
}

@app.route('/api/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
current_user_id = get_jwt_identity()
new_token = create_access_token(identity=current_user_id)
return {'access_token': new_token}

OAuth Integration

Flask-Dance Setup

pip install Flask-Dance

Google OAuth

from flask_dance.contrib.google import make_google_blueprint, google

# Google OAuth blueprint
google_bp = make_google_blueprint(
client_id="your-google-client-id",
client_secret="your-google-client-secret",
scope=["openid", "email", "profile"]
)
app.register_blueprint(google_bp, url_prefix="/auth")

@app.route('/auth/google')
def google_login():
if not google.authorized:
return redirect(url_for("google.login"))

resp = google.get("/oauth2/v1/userinfo")
if not resp.ok:
return "Failed to fetch user info from Google", 400

google_info = resp.json()

# Create or get user
user = User.query.filter_by(email=google_info['email']).first()
if not user:
user = User(
username=google_info['email'],
email=google_info['email']
)
db.session.add(user)
db.session.commit()

login_user(user)
return redirect(url_for('dashboard'))

Two-Factor Authentication

TOTP Setup

pip install pyotp qrcode[pil]

2FA Implementation

import pyotp
import qrcode
from io import BytesIO
import base64

class User(UserMixin, db.Model):
# ... existing fields ...
totp_secret = db.Column(db.String(32))
two_factor_enabled = db.Column(db.Boolean, default=False)

def generate_totp_secret(self):
self.totp_secret = pyotp.random_base32()
return self.totp_secret

def get_totp_uri(self):
return pyotp.totp.TOTP(self.totp_secret).provisioning_uri(
name=self.email,
issuer_name="Your App Name"
)

def verify_totp(self, token):
totp = pyotp.TOTP(self.totp_secret)
return totp.verify(token)

@app.route('/setup-2fa')
@login_required
def setup_2fa():
if not current_user.totp_secret:
current_user.generate_totp_secret()
db.session.commit()

# Generate QR code
qr_uri = current_user.get_totp_uri()
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(qr_uri)
qr.make(fit=True)

img = qr.make_image(fill_color="black", back_color="white")
buffered = BytesIO()
img.save(buffered)
img_str = base64.b64encode(buffered.getvalue()).decode()

return render_template('setup_2fa.html', qr_code=img_str)

@app.route('/verify-2fa', methods=['POST'])
@login_required
def verify_2fa():
token = request.form['token']

if current_user.verify_totp(token):
current_user.two_factor_enabled = True
db.session.commit()
flash('Two-factor authentication enabled!', 'success')
return redirect(url_for('dashboard'))
else:
flash('Invalid token', 'error')
return redirect(url_for('setup_2fa'))

Password Reset

Password Reset Token

from itsdangerous import URLSafeTimedSerializer

def generate_reset_token(user):
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
return serializer.dumps(user.email, salt='password-reset-salt')

def verify_reset_token(token, expiration=3600):
serializer = URLSafeTimedSerializer(app.config['SECRET_KEY'])
try:
email = serializer.loads(
token,
salt='password-reset-salt',
max_age=expiration
)
return User.query.filter_by(email=email).first()
except:
return None

@app.route('/forgot-password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'POST':
email = request.form['email']
user = User.query.filter_by(email=email).first()

if user:
token = generate_reset_token(user)
reset_url = url_for('reset_password', token=token, _external=True)

# Send email (implement send_email function)
send_email(
to=user.email,
subject='Password Reset Request',
template='email/reset_password.html',
user=user,
reset_url=reset_url
)

flash('If your email is registered, you will receive a password reset link.', 'info')
return redirect(url_for('login'))

return render_template('forgot_password.html')

@app.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
user = verify_reset_token(token)
if not user:
flash('Invalid or expired token', 'error')
return redirect(url_for('forgot_password'))

if request.method == 'POST':
password = request.form['password']
confirm_password = request.form['confirm_password']

if password != confirm_password:
flash('Passwords do not match', 'error')
return render_template('reset_password.html', token=token)

is_strong, message = validate_password_strength(password)
if not is_strong:
flash(message, 'error')
return render_template('reset_password.html', token=token)

user.set_password(password)
db.session.commit()

flash('Your password has been reset!', 'success')
return redirect(url_for('login'))

return render_template('reset_password.html', token=token)

Security Middleware

Rate Limiting

pip install Flask-Limiter
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)

@app.route('/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
# Login logic
pass

CSRF Protection

from flask_wtf.csrf import CSRFProtect

csrf = CSRFProtect(app)

# Exempt specific routes
@app.route('/api/webhook', methods=['POST'])
@csrf.exempt
def webhook():
return "OK"

Session Security

Session Configuration

from datetime import timedelta

app.config.update(
SESSION_COOKIE_SECURE=True, # HTTPS only
SESSION_COOKIE_HTTPONLY=True, # No JavaScript access
SESSION_COOKIE_SAMESITE='Lax',
PERMANENT_SESSION_LIFETIME=timedelta(hours=1)
)

Session Management

from flask import session
from datetime import datetime

@app.before_request
def make_session_permanent():
session.permanent = True

@app.before_request
def check_session_timeout():
if 'user_id' in session:
last_activity = session.get('last_activity')
if last_activity:
if datetime.now() - last_activity > app.permanent_session_lifetime:
session.clear()
flash('Session expired. Please log in again.', 'warning')
return redirect(url_for('login'))

session['last_activity'] = datetime.now()

Testing Authentication

Authentication Tests

import pytest
from app import app, db, User

@pytest.fixture
def client():
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'

with app.test_client() as client:
with app.app_context():
db.create_all()
yield client
db.drop_all()

@pytest.fixture
def user():
user = User(username='testuser', email='test@example.com')
user.set_password('testpassword')
db.session.add(user)
db.session.commit()
return user

def test_login(client, user):
response = client.post('/login', data={
'username': 'testuser',
'password': 'testpassword'
})
assert response.status_code == 302 # Redirect after login

def test_invalid_login(client, user):
response = client.post('/login', data={
'username': 'testuser',
'password': 'wrongpassword'
})
assert b'Invalid username or password' in response.data

def test_protected_route(client, user):
# Test without login
response = client.get('/dashboard')
assert response.status_code == 302 # Redirect to login

# Test with login
client.post('/login', data={
'username': 'testuser',
'password': 'testpassword'
})
response = client.get('/dashboard')
assert response.status_code == 200

Best Practices

Security Checklist

# 1. Always hash passwords
user.set_password(password) # Never store plain text

# 2. Use HTTPS in production
app.config['SESSION_COOKIE_SECURE'] = True

# 3. Implement rate limiting
@limiter.limit("5 per minute")
def login():
pass

# 4. Validate input
@login_required
def update_profile():
username = request.form.get('username', '').strip()
if not username or len(username) < 3:
flash('Invalid username', 'error')
return redirect(url_for('profile'))

# 5. Use CSRF protection
csrf = CSRFProtect(app)

# 6. Implement proper session management
session.permanent = True
app.permanent_session_lifetime = timedelta(hours=1)

# 7. Log security events
import logging
logger = logging.getLogger(__name__)

@app.route('/login', methods=['POST'])
def login():
username = request.form['username']
if not user or not user.check_password(password):
logger.warning(f'Failed login attempt for username: {username}')
else:
logger.info(f'Successful login for username: {username}')