Fundamentals
Comprehensive guide to Web Server Gateway Interface (WSGI) concepts, implementation, and best practices.
WSGI Specification Overview
Basic WSGI Application Structure
def application(environ, start_response):
"""
WSGI application callable
Args:
environ: Dict containing request environment variables
start_response: Callable to initiate HTTP response
Returns:
Iterable of byte strings representing response body
"""
status = '200 OK'
headers = [('Content-type', 'text/plain')]
start_response(status, headers)
return [b'Hello World']
Environment Dictionary
def print_environ(environ, start_response):
"""Print all environment variables"""
import pprint
# Key environment variables
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
query = environ.get('QUERY_STRING', '')
content_type = environ.get('CONTENT_TYPE', '')
content_length = environ.get('CONTENT_LENGTH', '')
# Headers are in HTTP_* format
host = environ.get('HTTP_HOST', '')
user_agent = environ.get('HTTP_USER_AGENT', '')
# Server information
server_name = environ['SERVER_NAME']
server_port = environ['SERVER_PORT']
response_body = f"""
Method: {method}
Path: {path}
Query: {query}
Host: {host}
User-Agent: {user_agent}
Content-Type: {content_type}
Content-Length: {content_length}
Server: {server_name}:{server_port}
All environ:
{pprint.pformat(environ)}
"""
start_response('200 OK', [('Content-Type', 'text/plain')])
return [response_body.encode('utf-8')]
WSGI Application Patterns
Class-Based Application
class WSGIApplication:
def __init__(self, config=None):
self.config = config or {}
self.routes = {}
def __call__(self, environ, start_response):
"""Make the class callable as WSGI app"""
return self.handle_request(environ, start_response)
def handle_request(self, environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
# Route handling
handler = self.routes.get((method, path))
if handler:
return handler(environ, start_response)
# 404 Not Found
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not Found']
def add_route(self, method, path, handler):
"""Add a route handler"""
self.routes[(method, path)] = handler
# Usage
app = WSGIApplication()
def home_handler(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/html')])
return [b'<h1>Home Page</h1>']
def api_handler(environ, start_response):
start_response('200 OK', [('Content-Type', 'application/json')])
return [b'{"message": "API Response"}']
app.add_route('GET', '/', home_handler)
app.add_route('GET', '/api', api_handler)
Request Processing
import urllib.parse
import json
def parse_request(environ):
"""Parse WSGI environ into request data"""
# Parse query string
query_string = environ.get('QUERY_STRING', '')
query_params = urllib.parse.parse_qs(query_string)
# Parse form data (POST)
form_data = {}
if environ['REQUEST_METHOD'] == 'POST':
try:
content_length = int(environ.get('CONTENT_LENGTH', 0))
if content_length > 0:
body = environ['wsgi.input'].read(content_length)
content_type = environ.get('CONTENT_TYPE', '')
if content_type.startswith('application/x-www-form-urlencoded'):
form_data = urllib.parse.parse_qs(body.decode('utf-8'))
elif content_type.startswith('application/json'):
form_data = json.loads(body.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
pass
# Parse cookies
cookies = {}
cookie_header = environ.get('HTTP_COOKIE', '')
if cookie_header:
for cookie in cookie_header.split(';'):
if '=' in cookie:
key, value = cookie.strip().split('=', 1)
cookies[key] = value
return {
'method': environ['REQUEST_METHOD'],
'path': environ['PATH_INFO'],
'query_params': query_params,
'form_data': form_data,
'cookies': cookies,
'headers': {k[5:].replace('_', '-'): v
for k, v in environ.items()
if k.startswith('HTTP_')},
'content_type': environ.get('CONTENT_TYPE', ''),
'content_length': environ.get('CONTENT_LENGTH', ''),
}
def advanced_app(environ, start_response):
"""Advanced WSGI app with request parsing"""
request = parse_request(environ)
# Route based on path
if request['path'] == '/':
response_body = b'<h1>Home</h1>'
headers = [('Content-Type', 'text/html')]
elif request['path'] == '/api/echo':
response_data = {
'method': request['method'],
'query_params': request['query_params'],
'form_data': request['form_data'],
'cookies': request['cookies']
}
response_body = json.dumps(response_data).encode('utf-8')
headers = [('Content-Type', 'application/json')]
else:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not Found']
start_response('200 OK', headers)
return [response_body]
WSGI Middleware
Basic Middleware Pattern
class WSGIMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
# Pre-processing
# ... modify environ or perform checks
# Call the wrapped application
return self.app(environ, start_response)
Logging Middleware
import time
import logging
class LoggingMiddleware:
def __init__(self, app, logger=None):
self.app = app
self.logger = logger or logging.getLogger(__name__)
def __call__(self, environ, start_response):
start_time = time.time()
# Capture response info
response_info = {}
def logging_start_response(status, headers, exc_info=None):
response_info['status'] = status
response_info['headers'] = headers
return start_response(status, headers, exc_info)
# Call the application
result = self.app(environ, logging_start_response)
# Log the request
duration = time.time() - start_time
self.logger.info(
f"{environ['REQUEST_METHOD']} {environ['PATH_INFO']} "
f"{response_info.get('status', 'Unknown')} "
f"{duration:.3f}s"
)
return result
Authentication Middleware
import base64
class BasicAuthMiddleware:
def __init__(self, app, users=None):
self.app = app
self.users = users or {} # {'username': 'password'}
def __call__(self, environ, start_response):
# Check for Authorization header
auth_header = environ.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Basic '):
return self.unauthorized(start_response)
# Decode credentials
try:
credentials = base64.b64decode(auth_header[6:]).decode('utf-8')
username, password = credentials.split(':', 1)
except (ValueError, UnicodeDecodeError):
return self.unauthorized(start_response)
# Validate credentials
if username not in self.users or self.users[username] != password:
return self.unauthorized(start_response)
# Add user info to environ
environ['AUTH_USER'] = username
# Call the application
return self.app(environ, start_response)
def unauthorized(self, start_response):
start_response('401 Unauthorized', [
('Content-Type', 'text/plain'),
('WWW-Authenticate', 'Basic realm="Protected Area"')
])
return [b'Unauthorized']
# Usage
app = WSGIApplication()
auth_app = BasicAuthMiddleware(app, {'admin': 'password123'})
CORS Middleware
class CORSMiddleware:
def __init__(self, app, allow_origin='*', allow_methods=None, allow_headers=None):
self.app = app
self.allow_origin = allow_origin
self.allow_methods = allow_methods or ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
self.allow_headers = allow_headers or ['Content-Type', 'Authorization']
def __call__(self, environ, start_response):
# Handle preflight requests
if environ['REQUEST_METHOD'] == 'OPTIONS':
headers = [
('Access-Control-Allow-Origin', self.allow_origin),
('Access-Control-Allow-Methods', ', '.join(self.allow_methods)),
('Access-Control-Allow-Headers', ', '.join(self.allow_headers)),
]
start_response('200 OK', headers)
return [b'']
# Modify start_response to add CORS headers
def cors_start_response(status, headers, exc_info=None):
headers.append(('Access-Control-Allow-Origin', self.allow_origin))
return start_response(status, headers, exc_info)
return self.app(environ, cors_start_response)
WSGI Server Integration
Gunicorn Configuration
# gunicorn.conf.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "sync"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 30
keepalive = 2
preload_app = True
uWSGI Configuration
# uwsgi.ini
[uwsgi]
module = app:application
master = true
processes = 4
socket = /tmp/uwsgi.sock
chmod-socket = 666
vacuum = true
die-on-term = true
Mod_WSGI Configuration
# Apache configuration
LoadModule wsgi_module modules/mod_wsgi.so
<VirtualHost *:80>
ServerName example.com
DocumentRoot /var/www/html
WSGIDaemonProcess myapp python-path=/path/to/app
WSGIProcessGroup myapp
WSGIScriptAlias / /path/to/app/app.wsgi
<Directory /path/to/app>
WSGIApplicationGroup %{GLOBAL}
Require all granted
</Directory>
</VirtualHost>
Error Handling and Debugging
Error Middleware
import traceback
import sys
class ErrorMiddleware:
def __init__(self, app, debug=False):
self.app = app
self.debug = debug
def __call__(self, environ, start_response):
try:
return self.app(environ, start_response)
except Exception as e:
if self.debug:
return self.debug_response(e, start_response)
else:
return self.error_response(start_response)
def debug_response(self, exception, start_response):
"""Show detailed error in debug mode"""
tb = traceback.format_exc()
error_html = f"""
<html>
<body>
<h1>Application Error</h1>
<h2>{exception.__class__.__name__}: {exception}</h2>
<pre>{tb}</pre>
</body>
</html>
"""
start_response('500 Internal Server Error', [
('Content-Type', 'text/html')
])
return [error_html.encode('utf-8')]
def error_response(self, start_response):
"""Generic error response for production"""
start_response('500 Internal Server Error', [
('Content-Type', 'text/plain')
])
return [b'Internal Server Error']
Custom Exception Handling
class HTTPException(Exception):
def __init__(self, status_code, message, headers=None):
self.status_code = status_code
self.message = message
self.headers = headers or [('Content-Type', 'text/plain')]
super().__init__(message)
class HTTPErrorMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
try:
return self.app(environ, start_response)
except HTTPException as e:
status_line = f"{e.status_code} {self.get_status_text(e.status_code)}"
start_response(status_line, e.headers)
return [e.message.encode('utf-8')]
def get_status_text(self, code):
status_texts = {
400: 'Bad Request',
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
405: 'Method Not Allowed',
500: 'Internal Server Error',
}
return status_texts.get(code, 'Unknown')
# Usage in application
def app_with_errors(environ, start_response):
path = environ['PATH_INFO']
if path == '/error':
raise HTTPException(400, 'Bad Request: Invalid path')
elif path == '/forbidden':
raise HTTPException(403, 'Access Denied')
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b'OK']
Performance Optimization
Response Streaming
def streaming_response(environ, start_response):
"""Stream large responses"""
def generate_data():
for i in range(1000):
yield f"Line {i}\n".encode('utf-8')
start_response('200 OK', [('Content-Type', 'text/plain')])
return generate_data()
Connection Pooling
import threading
from contextlib import contextmanager
class ConnectionPool:
def __init__(self, create_connection, max_connections=10):
self.create_connection = create_connection
self.max_connections = max_connections
self.pool = []
self.lock = threading.Lock()
@contextmanager
def get_connection(self):
with self.lock:
if self.pool:
connection = self.pool.pop()
else:
connection = self.create_connection()
try:
yield connection
finally:
with self.lock:
if len(self.pool) < self.max_connections:
self.pool.append(connection)
else:
# Close excess connections
if hasattr(connection, 'close'):
connection.close()
Testing WSGI Applications
Test Client
import io
import sys
from urllib.parse import urlencode
class WSGITestClient:
def __init__(self, app):
self.app = app
def get(self, path, query_params=None, headers=None):
return self.request('GET', path, query_params=query_params, headers=headers)
def post(self, path, data=None, json_data=None, headers=None):
return self.request('POST', path, data=data, json_data=json_data, headers=headers)
def request(self, method, path, query_params=None, data=None, json_data=None, headers=None):
# Build environ
environ = {
'REQUEST_METHOD': method,
'PATH_INFO': path,
'QUERY_STRING': urlencode(query_params or {}),
'CONTENT_TYPE': '',
'CONTENT_LENGTH': '0',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '80',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': io.BytesIO(),
'wsgi.errors': sys.stderr,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
}
# Add headers
for key, value in (headers or {}).items():
key = key.upper().replace('-', '_')
if key not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
key = f'HTTP_{key}'
environ[key] = value
# Add body data
if json_data:
import json
body = json.dumps(json_data).encode('utf-8')
environ['CONTENT_TYPE'] = 'application/json'
environ['CONTENT_LENGTH'] = str(len(body))
environ['wsgi.input'] = io.BytesIO(body)
elif data:
if isinstance(data, dict):
body = urlencode(data).encode('utf-8')
environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
else:
body = data.encode('utf-8') if isinstance(data, str) else data
environ['CONTENT_LENGTH'] = str(len(body))
environ['wsgi.input'] = io.BytesIO(body)
# Capture response
response_data = {}
def start_response(status, headers, exc_info=None):
response_data['status'] = status
response_data['headers'] = headers
return lambda x: None
# Call application
result = self.app(environ, start_response)
body = b''.join(result)
return {
'status': response_data['status'],
'headers': dict(response_data['headers']),
'body': body,
'text': body.decode('utf-8'),
}
# Usage
def test_app():
client = WSGITestClient(my_app)
# Test GET request
response = client.get('/')
assert response['status'] == '200 OK'
assert b'Hello' in response['body']
# Test POST request
response = client.post('/api/echo', json_data={'message': 'test'})
assert response['status'] == '200 OK'
assert 'application/json' in response['headers']['Content-Type']
Production Deployment
Environment Configuration
import os
from functools import lru_cache
class Config:
def __init__(self):
self.DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
self.PORT = int(os.getenv('PORT', 8000))
self.HOST = os.getenv('HOST', '0.0.0.0')
self.WORKERS = int(os.getenv('WORKERS', 4))
self.LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
self.DATABASE_URL = os.getenv('DATABASE_URL')
self.SECRET_KEY = os.getenv('SECRET_KEY', 'dev-key-change-in-production')
@lru_cache()
def get_config():
return Config()
# Application factory
def create_app():
config = get_config()
# Create base application
app = MyWSGIApp()
# Add middleware based on config
if config.DEBUG:
app = ErrorMiddleware(app, debug=True)
app = LoggingMiddleware(app)
app = CORSMiddleware(app)
return app
# For deployment
application = create_app()
Health Check Endpoint
def health_check_app(environ, start_response):
"""Health check endpoint for load balancers"""
path = environ['PATH_INFO']
if path == '/health':
# Perform health checks
checks = {
'database': check_database(),
'cache': check_cache(),
'disk_space': check_disk_space(),
}
all_healthy = all(checks.values())
status = '200 OK' if all_healthy else '503 Service Unavailable'
response = {
'status': 'healthy' if all_healthy else 'unhealthy',
'checks': checks,
'timestamp': time.time(),
}
start_response(status, [('Content-Type', 'application/json')])
return [json.dumps(response).encode('utf-8')]
# Call main application
return main_app(environ, start_response)
def check_database():
# Implement database health check
return True
def check_cache():
# Implement cache health check
return True
def check_disk_space():
# Implement disk space check
return True
This comprehensive guide covers WSGI fundamentals from basic concepts to production deployment patterns. Use these patterns as building blocks for your own WSGI applications and middleware.