Deployment Patterns
Production deployment strategies for WSGI and ASGI applications with various servers and configurations.
WSGI Deployment
Gunicorn (Recommended for WSGI)
# Basic deployment
gunicorn app:application
# Production configuration
gunicorn \
--bind 0.0.0.0:8000 \
--workers 4 \
--worker-class sync \
--worker-connections 1000 \
--max-requests 1000 \
--max-requests-jitter 100 \
--timeout 30 \
--keepalive 2 \
--preload-app \
--access-logfile /var/log/gunicorn/access.log \
--error-logfile /var/log/gunicorn/error.log \
--log-level info \
--capture-output \
--pid /var/run/gunicorn.pid \
--daemon \
app:application
Gunicorn Configuration File
# gunicorn.conf.py
import multiprocessing
import os
# Server socket
bind = "0.0.0.0:8000"
backlog = 2048
# Worker processes
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "sync"
worker_connections = 1000
timeout = 30
keepalive = 2
max_requests = 1000
max_requests_jitter = 100
# Restart workers after this many requests
max_requests = 1000
# Restart workers after this many seconds
max_requests_jitter = 100
# Load application code before worker processes are forked
preload_app = True
# Logging
accesslog = "/var/log/gunicorn/access.log"
errorlog = "/var/log/gunicorn/error.log"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# Process naming
proc_name = "my_app"
# Server mechanics
pidfile = "/var/run/gunicorn.pid"
user = "www-data"
group = "www-data"
tmp_upload_dir = "/tmp"
# SSL
keyfile = "/path/to/keyfile"
certfile = "/path/to/certfile"
uWSGI Configuration
# uwsgi.ini
[uwsgi]
module = app:application
master = true
processes = 4
socket = /tmp/uwsgi.sock
chmod-socket = 666
vacuum = true
die-on-term = true
logto = /var/log/uwsgi.log
uid = www-data
gid = www-data
chdir = /path/to/app
virtualenv = /path/to/venv
Mod_WSGI (Apache)
# httpd.conf or virtual host
LoadModule wsgi_module modules/mod_wsgi.so
<VirtualHost *:80>
ServerName example.com
DocumentRoot /var/www/html
WSGIDaemonProcess myapp python-path=/path/to/app python-home=/path/to/venv
WSGIProcessGroup myapp
WSGIScriptAlias / /path/to/app/app.wsgi
<Directory /path/to/app>
WSGIApplicationGroup %{GLOBAL}
Require all granted
</Directory>
# Static files
Alias /static /path/to/app/static
<Directory /path/to/app/static>
Require all granted
</Directory>
</VirtualHost>
ASGI Deployment
Uvicorn (Recommended for ASGI)
# Basic deployment
uvicorn app:application
# Production configuration
uvicorn \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--access-log \
--log-level info \
--loop auto \
--http auto \
--ws auto \
--lifespan auto \
--interface auto \
--reload \
app:application
Uvicorn with Gunicorn
# Use Gunicorn as process manager with Uvicorn workers
gunicorn \
--bind 0.0.0.0:8000 \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--access-logfile /var/log/gunicorn/access.log \
--error-logfile /var/log/gunicorn/error.log \
--log-level info \
app:application
Hypercorn
# Basic deployment
hypercorn app:application
# Production configuration
hypercorn \
--bind 0.0.0.0:8000 \
--workers 4 \
--worker-class asyncio \
--access-logfile /var/log/hypercorn/access.log \
--error-logfile /var/log/hypercorn/error.log \
--log-level info \
app:application
Daphne (Django Channels)
# Basic deployment
daphne app:application
# Production configuration
daphne \
--bind 0.0.0.0 \
--port 8000 \
--access-log /var/log/daphne/access.log \
--verbosity 2 \
app:application
Reverse Proxy Configuration
Nginx Configuration
# nginx.conf
upstream app_backend {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
}
server {
listen 80;
server_name example.com;
# Security headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# Gzip compression
gzip on;
gzip_vary on;
gzip_types text/plain text/css text/xml text/javascript application/javascript application/xml+rss application/json;
# Static files
location /static/ {
alias /path/to/app/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
location /media/ {
alias /path/to/app/media/;
expires 30d;
add_header Cache-Control "public, immutable";
}
# Proxy to application
location / {
proxy_pass http://app_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_redirect off;
# Timeouts
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
# Buffer settings
proxy_buffer_size 4k;
proxy_buffers 8 4k;
proxy_busy_buffers_size 8k;
}
# WebSocket support (for ASGI)
location /ws/ {
proxy_pass http://app_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 86400;
}
}
# SSL configuration
server {
listen 443 ssl http2;
server_name example.com;
ssl_certificate /path/to/certificate.pem;
ssl_certificate_key /path/to/private.key;
# SSL settings
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
# Include main server block content here
include /etc/nginx/conf.d/app_locations.conf;
}
Docker Deployment
Dockerfile for WSGI
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN adduser --disabled-password --gecos '' appuser
RUN chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "app:application"]
Docker Compose
version: '3.8'
services:
app:
build: .
ports:
- '8000:8000'
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/dbname
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
volumes:
- ./logs:/app/logs
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- '80:80'
- '443:443'
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/ssl
- ./static:/app/static
depends_on:
- app
restart: unless-stopped
db:
image: postgres:14
environment:
- POSTGRES_DB=dbname
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:alpine
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
postgres_data:
redis_data:
Kubernetes Deployment
Deployment YAML
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-deployment
spec:
replicas: 3
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
containers:
- name: app
image: myapp:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: redis-url
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: '256Mi'
cpu: '250m'
limits:
memory: '512Mi'
cpu: '500m'
---
apiVersion: v1
kind: Service
metadata:
name: app-service
spec:
selector:
app: myapp
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
Environment Configuration
Environment Variables
# config.py
import os
from functools import lru_cache
class Config:
# Server settings
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', 8000))
WORKERS = int(os.getenv('WORKERS', 4))
# Application settings
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key')
# Database settings
DATABASE_URL = os.getenv('DATABASE_URL')
# Redis settings
REDIS_URL = os.getenv('REDIS_URL')
# Logging settings
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FORMAT = os.getenv('LOG_FORMAT', 'json')
# Security settings
ALLOWED_HOSTS = os.getenv('ALLOWED_HOSTS', '*').split(',')
CORS_ORIGINS = os.getenv('CORS_ORIGINS', '*').split(',')
# Performance settings
MAX_REQUESTS = int(os.getenv('MAX_REQUESTS', 1000))
TIMEOUT = int(os.getenv('TIMEOUT', 30))
@lru_cache()
def get_config():
return Config()
.env File
# .env
HOST=0.0.0.0
PORT=8000
WORKERS=4
DEBUG=false
SECRET_KEY=your-secret-key
DATABASE_URL=postgresql://user:pass@localhost:5432/dbname
REDIS_URL=redis://localhost:6379/0
LOG_LEVEL=INFO
ALLOWED_HOSTS=example.com,www.example.com
CORS_ORIGINS=https://example.com,https://www.example.com
MAX_REQUESTS=1000
TIMEOUT=30
Monitoring and Logging
Structured Logging
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
return json.dumps(log_entry)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[logging.StreamHandler()]
)
# Set custom formatter
for handler in logging.root.handlers:
handler.setFormatter(JSONFormatter())
Health Check Endpoints
import time
import psutil
from datetime import datetime
async def health_check(scope, receive, send):
"""Health check endpoint"""
try:
# Check database connection
db_healthy = await check_database()
# Check Redis connection
redis_healthy = await check_redis()
# Check system resources
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=1)
health_data = {
'status': 'healthy' if db_healthy and redis_healthy else 'unhealthy',
'timestamp': datetime.utcnow().isoformat(),
'checks': {
'database': db_healthy,
'redis': redis_healthy,
'memory_usage': memory_usage,
'cpu_usage': cpu_usage,
},
'uptime': time.time() - start_time,
}
status_code = 200 if health_data['status'] == 'healthy' else 503
await send({
'type': 'http.response.start',
'status': status_code,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': json.dumps(health_data).encode('utf-8'),
})
except Exception as e:
await send({
'type': 'http.response.start',
'status': 503,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': json.dumps({'status': 'error', 'message': str(e)}).encode('utf-8'),
})
Performance Tuning
Connection Pooling
import asyncio
import asyncpg
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self, database_url, min_size=10, max_size=20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
@asynccontextmanager
async def acquire(self):
async with self.pool.acquire() as connection:
yield connection
async def close(self):
if self.pool:
await self.pool.close()
# Usage
db_pool = DatabasePool(DATABASE_URL)
await db_pool.initialize()
Caching Strategy
import redis.asyncio as redis
from functools import wraps
import json
import hashlib
class Cache:
def __init__(self, redis_url):
self.redis = redis.from_url(redis_url)
async def get(self, key):
data = await self.redis.get(key)
return json.loads(data) if data else None
async def set(self, key, value, expire=300):
await self.redis.set(key, json.dumps(value), ex=expire)
async def delete(self, key):
await self.redis.delete(key)
def cache_result(expire=300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key
cache_key = f"{func.__name__}:{hashlib.md5(str(args + tuple(kwargs.items())).encode()).hexdigest()}"
# Try to get from cache
cached_result = await cache.get(cache_key)
if cached_result:
return cached_result
# Execute function and cache result
result = await func(*args, **kwargs)
await cache.set(cache_key, result, expire)
return result
return wrapper
return decorator
This comprehensive guide covers production deployment patterns for both WSGI and ASGI applications with various server configurations, containerization, and monitoring strategies.