Advanced Features
Complete guide to WebSockets, authentication, security, and deployment patterns in Tornado. Production-ready examples for advanced use cases.
WebSocket Support
Basic WebSocket Handler
import tornado.websocket
import tornado.web
import json
import asyncio
class WebSocketHandler(tornado.websocket.WebSocketHandler):
clients = set()
def open(self):
"""Called when WebSocket connection is opened"""
self.clients.add(self)
print(f"WebSocket opened. Total clients: {len(self.clients)}")
# Send welcome message
self.write_message({
"type": "welcome",
"message": "Connected to WebSocket server"
})
def on_message(self, message):
"""Handle incoming messages"""
try:
data = json.loads(message)
msg_type = data.get("type", "unknown")
if msg_type == "echo":
self.write_message({
"type": "echo",
"message": data.get("message", "")
})
elif msg_type == "broadcast":
self.broadcast_message(data.get("message", ""))
elif msg_type == "ping":
self.write_message({"type": "pong"})
except json.JSONDecodeError:
self.write_message({
"type": "error",
"message": "Invalid JSON format"
})
def on_close(self):
"""Called when WebSocket connection is closed"""
self.clients.discard(self)
print(f"WebSocket closed. Total clients: {len(self.clients)}")
def broadcast_message(self, message):
"""Broadcast message to all connected clients"""
for client in self.clients:
if client != self:
client.write_message({
"type": "broadcast",
"message": message
})
def check_origin(self, origin):
"""Override to allow connections from specific origins"""
# In production, be more restrictive
return True
Chat Application WebSocket
import tornado.websocket
import tornado.web
import json
import uuid
from datetime import datetime
class ChatWebSocketHandler(tornado.websocket.WebSocketHandler):
rooms = {} # room_id -> set of clients
users = {} # client -> user_info
def open(self):
self.room_id = None
self.user_id = None
print("Client connected")
def on_message(self, message):
try:
data = json.loads(message)
action = data.get("action")
if action == "join":
self.handle_join(data)
elif action == "message":
self.handle_message(data)
elif action == "leave":
self.handle_leave()
elif action == "typing":
self.handle_typing(data)
except json.JSONDecodeError:
self.send_error("Invalid JSON format")
def handle_join(self, data):
self.room_id = data.get("room_id")
self.user_id = data.get("user_id")
username = data.get("username", f"User{uuid.uuid4().hex[:8]}")
if not self.room_id:
self.send_error("Room ID is required")
return
# Add to room
if self.room_id not in self.rooms:
self.rooms[self.room_id] = set()
self.rooms[self.room_id].add(self)
self.users[self] = {
"user_id": self.user_id,
"username": username,
"joined_at": datetime.now().isoformat()
}
# Notify user
self.write_message({
"type": "joined",
"room_id": self.room_id,
"username": username
})
# Notify other users in room
self.broadcast_to_room({
"type": "user_joined",
"username": username,
"user_count": len(self.rooms[self.room_id])
})
def handle_message(self, data):
if not self.room_id or self not in self.users:
self.send_error("Must join a room first")
return
message = data.get("message", "").strip()
if not message:
return
user_info = self.users[self]
# Broadcast message to room
self.broadcast_to_room({
"type": "message",
"user_id": user_info["user_id"],
"username": user_info["username"],
"message": message,
"timestamp": datetime.now().isoformat()
})
def handle_typing(self, data):
if not self.room_id or self not in self.users:
return
user_info = self.users[self]
is_typing = data.get("typing", False)
# Broadcast typing status to room (excluding sender)
self.broadcast_to_room({
"type": "typing",
"user_id": user_info["user_id"],
"username": user_info["username"],
"typing": is_typing
}, exclude_self=True)
def handle_leave(self):
if self.room_id and self in self.rooms.get(self.room_id, set()):
self.leave_room()
def leave_room(self):
if self.room_id and self in self.rooms.get(self.room_id, set()):
self.rooms[self.room_id].discard(self)
if self in self.users:
username = self.users[self]["username"]
del self.users[self]
# Notify room
self.broadcast_to_room({
"type": "user_left",
"username": username,
"user_count": len(self.rooms[self.room_id])
})
# Clean up empty room
if not self.rooms[self.room_id]:
del self.rooms[self.room_id]
def broadcast_to_room(self, message, exclude_self=False):
if not self.room_id:
return
for client in self.rooms.get(self.room_id, set()):
if exclude_self and client == self:
continue
client.write_message(message)
def send_error(self, message):
self.write_message({
"type": "error",
"message": message
})
def on_close(self):
self.leave_room()
print("Client disconnected")
Real-time Data Stream WebSocket
import tornado.websocket
import asyncio
import json
import random
from datetime import datetime
class DataStreamWebSocketHandler(tornado.websocket.WebSocketHandler):
def open(self):
self.stream_task = None
self.is_streaming = False
print("Data stream client connected")
def on_message(self, message):
try:
data = json.loads(message)
command = data.get("command")
if command == "start":
self.start_streaming(data)
elif command == "stop":
self.stop_streaming()
elif command == "config":
self.update_config(data)
except json.JSONDecodeError:
self.write_message({
"type": "error",
"message": "Invalid JSON format"
})
def start_streaming(self, config):
if self.is_streaming:
return
self.is_streaming = True
self.stream_config = {
"interval": config.get("interval", 1.0),
"data_type": config.get("data_type", "random"),
"count": config.get("count", 100)
}
# Start streaming task
self.stream_task = asyncio.create_task(self.stream_data())
self.write_message({
"type": "streaming_started",
"config": self.stream_config
})
def stop_streaming(self):
if not self.is_streaming:
return
self.is_streaming = False
if self.stream_task:
self.stream_task.cancel()
self.write_message({
"type": "streaming_stopped"
})
def update_config(self, config):
if hasattr(self, 'stream_config'):
self.stream_config.update(config.get("config", {}))
self.write_message({
"type": "config_updated",
"config": self.stream_config
})
async def stream_data(self):
count = 0
max_count = self.stream_config.get("count", 100)
try:
while self.is_streaming and count < max_count:
# Generate data based on type
data = self.generate_data(self.stream_config["data_type"])
# Send data
self.write_message({
"type": "data",
"payload": data,
"sequence": count,
"timestamp": datetime.now().isoformat()
})
count += 1
await asyncio.sleep(self.stream_config["interval"])
except asyncio.CancelledError:
pass
finally:
self.is_streaming = False
def generate_data(self, data_type):
if data_type == "random":
return {
"value": random.uniform(0, 100),
"category": random.choice(["A", "B", "C"])
}
elif data_type == "stock":
return {
"symbol": "AAPL",
"price": random.uniform(150, 200),
"volume": random.randint(1000, 10000)
}
elif data_type == "sensor":
return {
"temperature": random.uniform(20, 30),
"humidity": random.uniform(40, 60),
"pressure": random.uniform(1000, 1020)
}
return {"value": random.random()}
def on_close(self):
self.stop_streaming()
print("Data stream client disconnected")
Authentication & Security
Basic Authentication
import tornado.web
import hashlib
import hmac
import base64
class BaseAuthHandler(tornado.web.RequestHandler):
def get_current_user(self):
"""Override to return current user"""
user_id = self.get_secure_cookie("user_id")
if user_id:
return self.get_user_by_id(user_id.decode('utf-8'))
return None
def get_user_by_id(self, user_id):
# In real app, query from database
users = {
"1": {"id": "1", "username": "admin", "email": "admin@example.com"},
"2": {"id": "2", "username": "user", "email": "user@example.com"}
}
return users.get(user_id)
class LoginHandler(BaseAuthHandler):
def get(self):
self.render("login.html")
def post(self):
username = self.get_body_argument("username")
password = self.get_body_argument("password")
# Validate credentials
user = self.authenticate_user(username, password)
if user:
self.set_secure_cookie("user_id", user["id"])
self.redirect("/dashboard")
else:
self.render("login.html", error="Invalid credentials")
def authenticate_user(self, username, password):
# In real app, check against database with hashed passwords
users = {
"admin": {"id": "1", "username": "admin", "password": "admin_hash"},
"user": {"id": "2", "username": "user", "password": "user_hash"}
}
user = users.get(username)
if user and self.verify_password(password, user["password"]):
return user
return None
def verify_password(self, password, hashed):
# In real app, use proper password hashing (bcrypt, scrypt, etc.)
return password == "password" # Simplified for example
class LogoutHandler(BaseAuthHandler):
def post(self):
self.clear_cookie("user_id")
self.redirect("/")
class AuthRequiredHandler(BaseAuthHandler):
@tornado.web.authenticated
def get(self):
user = self.current_user
self.write(f"Hello, {user['username']}!")
JWT Authentication
import tornado.web
import jwt
import datetime
import json
class JWTAuthHandler(tornado.web.RequestHandler):
SECRET_KEY = "your-secret-key-here"
def generate_token(self, user_data):
"""Generate JWT token"""
payload = {
"user_id": user_data["id"],
"username": user_data["username"],
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=24),
"iat": datetime.datetime.utcnow()
}
return jwt.encode(payload, self.SECRET_KEY, algorithm="HS256")
def verify_token(self, token):
"""Verify JWT token"""
try:
payload = jwt.decode(token, self.SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def get_current_user(self):
"""Get current user from JWT token"""
auth_header = self.request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ")[1]
payload = self.verify_token(token)
if payload:
return {
"id": payload["user_id"],
"username": payload["username"]
}
return None
class JWTLoginHandler(JWTAuthHandler):
def post(self):
data = json.loads(self.request.body)
username = data.get("username")
password = data.get("password")
# Authenticate user
user = self.authenticate_user(username, password)
if user:
token = self.generate_token(user)
self.write({
"token": token,
"user": {
"id": user["id"],
"username": user["username"]
}
})
else:
self.set_status(401)
self.write({"error": "Invalid credentials"})
def authenticate_user(self, username, password):
# Simplified authentication
if username == "admin" and password == "password":
return {"id": "1", "username": "admin"}
return None
class JWTProtectedHandler(JWTAuthHandler):
def prepare(self):
"""Check authentication before handling request"""
if not self.current_user:
self.set_status(401)
self.write({"error": "Authentication required"})
self.finish()
def get(self):
user = self.current_user
self.write({"message": f"Hello, {user['username']}!"})
API Key Authentication
import tornado.web
import uuid
import hashlib
class APIKeyAuthHandler(tornado.web.RequestHandler):
# In production, store in database
API_KEYS = {
"api_key_1": {"user_id": "1", "permissions": ["read", "write"]},
"api_key_2": {"user_id": "2", "permissions": ["read"]}
}
def get_api_key_info(self, api_key):
"""Get API key information"""
return self.API_KEYS.get(api_key)
def require_api_key(self, required_permissions=None):
"""Decorator to require API key authentication"""
def decorator(method):
def wrapper(self, *args, **kwargs):
api_key = self.get_api_key()
if not api_key:
self.set_status(401)
self.write({"error": "API key required"})
return
key_info = self.get_api_key_info(api_key)
if not key_info:
self.set_status(401)
self.write({"error": "Invalid API key"})
return
# Check permissions
if required_permissions:
user_permissions = key_info.get("permissions", [])
if not all(perm in user_permissions for perm in required_permissions):
self.set_status(403)
self.write({"error": "Insufficient permissions"})
return
# Store key info for use in handler
self.api_key_info = key_info
return method(self, *args, **kwargs)
return wrapper
return decorator
def get_api_key(self):
"""Extract API key from request"""
# Check header
api_key = self.request.headers.get("X-API-Key")
if api_key:
return api_key
# Check query parameter
api_key = self.get_argument("api_key", default=None)
if api_key:
return api_key
return None
class APIKeyProtectedHandler(APIKeyAuthHandler):
@APIKeyAuthHandler.require_api_key(required_permissions=["read"])
def get(self):
user_id = self.api_key_info["user_id"]
self.write({"message": f"Data for user {user_id}"})
@APIKeyAuthHandler.require_api_key(required_permissions=["write"])
def post(self):
user_id = self.api_key_info["user_id"]
data = json.loads(self.request.body)
self.write({"message": f"Data saved for user {user_id}", "data": data})
CORS Support
import tornado.web
class CORSHandler(tornado.web.RequestHandler):
def set_default_headers(self):
"""Set CORS headers for all requests"""
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-API-Key")
self.set_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
self.set_header("Access-Control-Max-Age", "86400") # 24 hours
def options(self, *args):
"""Handle preflight requests"""
self.set_status(204)
self.finish()
class RestrictedCORSHandler(tornado.web.RequestHandler):
ALLOWED_ORIGINS = [
"https://yourdomain.com",
"https://api.yourdomain.com",
"http://localhost:3000" # For development
]
def set_default_headers(self):
origin = self.request.headers.get("Origin")
if origin in self.ALLOWED_ORIGINS:
self.set_header("Access-Control-Allow-Origin", origin)
self.set_header("Access-Control-Allow-Credentials", "true")
self.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization")
self.set_header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE")
def options(self, *args):
self.set_status(204)
self.finish()
Production Deployment
Application Configuration
import tornado.web
import tornado.options
import os
import logging
# Configuration class
class Config:
def __init__(self):
self.DEBUG = os.environ.get("DEBUG", "False").lower() == "true"
self.PORT = int(os.environ.get("PORT", "8888"))
self.HOST = os.environ.get("HOST", "0.0.0.0")
self.SECRET_KEY = os.environ.get("SECRET_KEY", self.generate_secret_key())
self.DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///app.db")
self.REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
self.LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
self.WORKERS = int(os.environ.get("WORKERS", "1"))
# SSL Configuration
self.SSL_CERT = os.environ.get("SSL_CERT")
self.SSL_KEY = os.environ.get("SSL_KEY")
# CORS
self.CORS_ORIGINS = os.environ.get("CORS_ORIGINS", "*").split(",")
# Rate limiting
self.RATE_LIMIT_REQUESTS = int(os.environ.get("RATE_LIMIT_REQUESTS", "100"))
self.RATE_LIMIT_WINDOW = int(os.environ.get("RATE_LIMIT_WINDOW", "60"))
def generate_secret_key(self):
import secrets
return secrets.token_urlsafe(32)
def setup_logging(self):
logging.basicConfig(
level=getattr(logging, self.LOG_LEVEL.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Production application
class ProductionApp:
def __init__(self, config):
self.config = config
self.app = None
self.setup_logging()
def setup_logging(self):
self.config.setup_logging()
# Add request logging
tornado.log.enable_pretty_logging()
def create_app(self):
settings = {
"debug": self.config.DEBUG,
"cookie_secret": self.config.SECRET_KEY,
"xsrf_cookies": True,
"static_path": "static",
"template_path": "templates",
"gzip": True,
"compress_response": True,
}
# Production optimizations
if not self.config.DEBUG:
settings.update({
"compiled_template_cache": True,
"static_hash_cache": True,
"serve_traceback": False,
})
handlers = [
(r"/", MainHandler),
(r"/api/.*", APIHandler),
(r"/health", HealthCheckHandler),
]
self.app = tornado.web.Application(handlers, **settings)
return self.app
def run(self):
app = self.create_app()
if self.config.SSL_CERT and self.config.SSL_KEY:
# HTTPS
ssl_options = {
"certfile": self.config.SSL_CERT,
"keyfile": self.config.SSL_KEY,
}
app.listen(self.config.PORT, address=self.config.HOST, ssl_options=ssl_options)
logging.info(f"HTTPS server started on {self.config.HOST}:{self.config.PORT}")
else:
# HTTP
app.listen(self.config.PORT, address=self.config.HOST)
logging.info(f"HTTP server started on {self.config.HOST}:{self.config.PORT}")
tornado.ioloop.IOLoop.current().start()
# Health check handler
class HealthCheckHandler(tornado.web.RequestHandler):
def get(self):
self.write({
"status": "healthy",
"timestamp": datetime.datetime.now().isoformat()
})
# Main entry point
if __name__ == "__main__":
config = Config()
app = ProductionApp(config)
app.run()
Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
# Expose port
EXPOSE 8888
# Health check
HEALTHCHECK \
CMD curl -f http://localhost:8888/health || exit 1
# Run application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- '8888:8888'
environment:
- DEBUG=false
- PORT=8888
- HOST=0.0.0.0
- SECRET_KEY=your-production-secret-key
- DATABASE_URL=postgresql://user:pass@db:5432/myapp
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
restart: unless-stopped
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:8888/health']
interval: 30s
timeout: 10s
retries: 3
db:
image: postgres:15
environment:
- POSTGRES_DB=myapp
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- '80:80'
- '443:443'
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- app
restart: unless-stopped
volumes:
postgres_data:
Nginx Configuration
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream tornado_app {
server app:8888;
}
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
# Gzip compression
gzip on;
gzip_vary on;
gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
server {
listen 80;
server_name yourdomain.com;
# Redirect HTTP to HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name yourdomain.com;
# SSL configuration
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
# Security headers
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains";
# Static files
location /static/ {
alias /app/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
# API endpoints with rate limiting
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://tornado_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket upgrade
location /ws/ {
proxy_pass http://tornado_app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# All other requests
location / {
proxy_pass http://tornado_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}
Process Management with Supervisor
# /etc/supervisor/conf.d/tornado.conf
[program:tornado]
command=python /app/app.py
directory=/app
user=app
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/tornado.log
environment=
DEBUG=false,
PORT=8888,
SECRET_KEY=your-production-secret-key,
DATABASE_URL=postgresql://user:pass@localhost:5432/myapp
This comprehensive guide covers WebSockets, authentication, security, and production deployment patterns for Tornado applications. These patterns are production-ready and follow security best practices.