Skip to main content

Fundamentals

Comprehensive guide to Web Server Gateway Interface (WSGI) concepts, implementation, and best practices.

WSGI Specification Overview

Basic WSGI Application Structure

def application(environ, start_response):
"""
WSGI application callable

Args:
environ: Dict containing request environment variables
start_response: Callable to initiate HTTP response

Returns:
Iterable of byte strings representing response body
"""
status = '200 OK'
headers = [('Content-type', 'text/plain')]
start_response(status, headers)
return [b'Hello World']

Environment Dictionary

def print_environ(environ, start_response):
"""Print all environment variables"""
import pprint

# Key environment variables
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
query = environ.get('QUERY_STRING', '')
content_type = environ.get('CONTENT_TYPE', '')
content_length = environ.get('CONTENT_LENGTH', '')

# Headers are in HTTP_* format
host = environ.get('HTTP_HOST', '')
user_agent = environ.get('HTTP_USER_AGENT', '')

# Server information
server_name = environ['SERVER_NAME']
server_port = environ['SERVER_PORT']

response_body = f"""
Method: {method}
Path: {path}
Query: {query}
Host: {host}
User-Agent: {user_agent}
Content-Type: {content_type}
Content-Length: {content_length}
Server: {server_name}:{server_port}

All environ:
{pprint.pformat(environ)}
"""

start_response('200 OK', [('Content-Type', 'text/plain')])
return [response_body.encode('utf-8')]

WSGI Application Patterns

Class-Based Application

class WSGIApplication:
def __init__(self, config=None):
self.config = config or {}
self.routes = {}

def __call__(self, environ, start_response):
"""Make the class callable as WSGI app"""
return self.handle_request(environ, start_response)

def handle_request(self, environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']

# Route handling
handler = self.routes.get((method, path))
if handler:
return handler(environ, start_response)

# 404 Not Found
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not Found']

def add_route(self, method, path, handler):
"""Add a route handler"""
self.routes[(method, path)] = handler

# Usage
app = WSGIApplication()

def home_handler(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/html')])
return [b'<h1>Home Page</h1>']

def api_handler(environ, start_response):
start_response('200 OK', [('Content-Type', 'application/json')])
return [b'{"message": "API Response"}']

app.add_route('GET', '/', home_handler)
app.add_route('GET', '/api', api_handler)

Request Processing

import urllib.parse
import json

def parse_request(environ):
"""Parse WSGI environ into request data"""
# Parse query string
query_string = environ.get('QUERY_STRING', '')
query_params = urllib.parse.parse_qs(query_string)

# Parse form data (POST)
form_data = {}
if environ['REQUEST_METHOD'] == 'POST':
try:
content_length = int(environ.get('CONTENT_LENGTH', 0))
if content_length > 0:
body = environ['wsgi.input'].read(content_length)
content_type = environ.get('CONTENT_TYPE', '')

if content_type.startswith('application/x-www-form-urlencoded'):
form_data = urllib.parse.parse_qs(body.decode('utf-8'))
elif content_type.startswith('application/json'):
form_data = json.loads(body.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
pass

# Parse cookies
cookies = {}
cookie_header = environ.get('HTTP_COOKIE', '')
if cookie_header:
for cookie in cookie_header.split(';'):
if '=' in cookie:
key, value = cookie.strip().split('=', 1)
cookies[key] = value

return {
'method': environ['REQUEST_METHOD'],
'path': environ['PATH_INFO'],
'query_params': query_params,
'form_data': form_data,
'cookies': cookies,
'headers': {k[5:].replace('_', '-'): v
for k, v in environ.items()
if k.startswith('HTTP_')},
'content_type': environ.get('CONTENT_TYPE', ''),
'content_length': environ.get('CONTENT_LENGTH', ''),
}

def advanced_app(environ, start_response):
"""Advanced WSGI app with request parsing"""
request = parse_request(environ)

# Route based on path
if request['path'] == '/':
response_body = b'<h1>Home</h1>'
headers = [('Content-Type', 'text/html')]

elif request['path'] == '/api/echo':
response_data = {
'method': request['method'],
'query_params': request['query_params'],
'form_data': request['form_data'],
'cookies': request['cookies']
}
response_body = json.dumps(response_data).encode('utf-8')
headers = [('Content-Type', 'application/json')]

else:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not Found']

start_response('200 OK', headers)
return [response_body]

WSGI Middleware

Basic Middleware Pattern

class WSGIMiddleware:
def __init__(self, app):
self.app = app

def __call__(self, environ, start_response):
# Pre-processing
# ... modify environ or perform checks

# Call the wrapped application
return self.app(environ, start_response)

Logging Middleware

import time
import logging

class LoggingMiddleware:
def __init__(self, app, logger=None):
self.app = app
self.logger = logger or logging.getLogger(__name__)

def __call__(self, environ, start_response):
start_time = time.time()

# Capture response info
response_info = {}

def logging_start_response(status, headers, exc_info=None):
response_info['status'] = status
response_info['headers'] = headers
return start_response(status, headers, exc_info)

# Call the application
result = self.app(environ, logging_start_response)

# Log the request
duration = time.time() - start_time
self.logger.info(
f"{environ['REQUEST_METHOD']} {environ['PATH_INFO']} "
f"{response_info.get('status', 'Unknown')} "
f"{duration:.3f}s"
)

return result

Authentication Middleware

import base64

class BasicAuthMiddleware:
def __init__(self, app, users=None):
self.app = app
self.users = users or {} # {'username': 'password'}

def __call__(self, environ, start_response):
# Check for Authorization header
auth_header = environ.get('HTTP_AUTHORIZATION', '')

if not auth_header.startswith('Basic '):
return self.unauthorized(start_response)

# Decode credentials
try:
credentials = base64.b64decode(auth_header[6:]).decode('utf-8')
username, password = credentials.split(':', 1)
except (ValueError, UnicodeDecodeError):
return self.unauthorized(start_response)

# Validate credentials
if username not in self.users or self.users[username] != password:
return self.unauthorized(start_response)

# Add user info to environ
environ['AUTH_USER'] = username

# Call the application
return self.app(environ, start_response)

def unauthorized(self, start_response):
start_response('401 Unauthorized', [
('Content-Type', 'text/plain'),
('WWW-Authenticate', 'Basic realm="Protected Area"')
])
return [b'Unauthorized']

# Usage
app = WSGIApplication()
auth_app = BasicAuthMiddleware(app, {'admin': 'password123'})

CORS Middleware

class CORSMiddleware:
def __init__(self, app, allow_origin='*', allow_methods=None, allow_headers=None):
self.app = app
self.allow_origin = allow_origin
self.allow_methods = allow_methods or ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
self.allow_headers = allow_headers or ['Content-Type', 'Authorization']

def __call__(self, environ, start_response):
# Handle preflight requests
if environ['REQUEST_METHOD'] == 'OPTIONS':
headers = [
('Access-Control-Allow-Origin', self.allow_origin),
('Access-Control-Allow-Methods', ', '.join(self.allow_methods)),
('Access-Control-Allow-Headers', ', '.join(self.allow_headers)),
]
start_response('200 OK', headers)
return [b'']

# Modify start_response to add CORS headers
def cors_start_response(status, headers, exc_info=None):
headers.append(('Access-Control-Allow-Origin', self.allow_origin))
return start_response(status, headers, exc_info)

return self.app(environ, cors_start_response)

WSGI Server Integration

Gunicorn Configuration

# gunicorn.conf.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "sync"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
timeout = 30
keepalive = 2
preload_app = True

uWSGI Configuration

# uwsgi.ini
[uwsgi]
module = app:application
master = true
processes = 4
socket = /tmp/uwsgi.sock
chmod-socket = 666
vacuum = true
die-on-term = true

Mod_WSGI Configuration

# Apache configuration
LoadModule wsgi_module modules/mod_wsgi.so

<VirtualHost *:80>
ServerName example.com
DocumentRoot /var/www/html

WSGIDaemonProcess myapp python-path=/path/to/app
WSGIProcessGroup myapp
WSGIScriptAlias / /path/to/app/app.wsgi

<Directory /path/to/app>
WSGIApplicationGroup %{GLOBAL}
Require all granted
</Directory>
</VirtualHost>

Error Handling and Debugging

Error Middleware

import traceback
import sys

class ErrorMiddleware:
def __init__(self, app, debug=False):
self.app = app
self.debug = debug

def __call__(self, environ, start_response):
try:
return self.app(environ, start_response)
except Exception as e:
if self.debug:
return self.debug_response(e, start_response)
else:
return self.error_response(start_response)

def debug_response(self, exception, start_response):
"""Show detailed error in debug mode"""
tb = traceback.format_exc()
error_html = f"""
<html>
<body>
<h1>Application Error</h1>
<h2>{exception.__class__.__name__}: {exception}</h2>
<pre>{tb}</pre>
</body>
</html>
"""
start_response('500 Internal Server Error', [
('Content-Type', 'text/html')
])
return [error_html.encode('utf-8')]

def error_response(self, start_response):
"""Generic error response for production"""
start_response('500 Internal Server Error', [
('Content-Type', 'text/plain')
])
return [b'Internal Server Error']

Custom Exception Handling

class HTTPException(Exception):
def __init__(self, status_code, message, headers=None):
self.status_code = status_code
self.message = message
self.headers = headers or [('Content-Type', 'text/plain')]
super().__init__(message)

class HTTPErrorMiddleware:
def __init__(self, app):
self.app = app

def __call__(self, environ, start_response):
try:
return self.app(environ, start_response)
except HTTPException as e:
status_line = f"{e.status_code} {self.get_status_text(e.status_code)}"
start_response(status_line, e.headers)
return [e.message.encode('utf-8')]

def get_status_text(self, code):
status_texts = {
400: 'Bad Request',
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
405: 'Method Not Allowed',
500: 'Internal Server Error',
}
return status_texts.get(code, 'Unknown')

# Usage in application
def app_with_errors(environ, start_response):
path = environ['PATH_INFO']

if path == '/error':
raise HTTPException(400, 'Bad Request: Invalid path')
elif path == '/forbidden':
raise HTTPException(403, 'Access Denied')

start_response('200 OK', [('Content-Type', 'text/plain')])
return [b'OK']

Performance Optimization

Response Streaming

def streaming_response(environ, start_response):
"""Stream large responses"""
def generate_data():
for i in range(1000):
yield f"Line {i}\n".encode('utf-8')

start_response('200 OK', [('Content-Type', 'text/plain')])
return generate_data()

Connection Pooling

import threading
from contextlib import contextmanager

class ConnectionPool:
def __init__(self, create_connection, max_connections=10):
self.create_connection = create_connection
self.max_connections = max_connections
self.pool = []
self.lock = threading.Lock()

@contextmanager
def get_connection(self):
with self.lock:
if self.pool:
connection = self.pool.pop()
else:
connection = self.create_connection()

try:
yield connection
finally:
with self.lock:
if len(self.pool) < self.max_connections:
self.pool.append(connection)
else:
# Close excess connections
if hasattr(connection, 'close'):
connection.close()

Testing WSGI Applications

Test Client

import io
import sys
from urllib.parse import urlencode

class WSGITestClient:
def __init__(self, app):
self.app = app

def get(self, path, query_params=None, headers=None):
return self.request('GET', path, query_params=query_params, headers=headers)

def post(self, path, data=None, json_data=None, headers=None):
return self.request('POST', path, data=data, json_data=json_data, headers=headers)

def request(self, method, path, query_params=None, data=None, json_data=None, headers=None):
# Build environ
environ = {
'REQUEST_METHOD': method,
'PATH_INFO': path,
'QUERY_STRING': urlencode(query_params or {}),
'CONTENT_TYPE': '',
'CONTENT_LENGTH': '0',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '80',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': io.BytesIO(),
'wsgi.errors': sys.stderr,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
}

# Add headers
for key, value in (headers or {}).items():
key = key.upper().replace('-', '_')
if key not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
key = f'HTTP_{key}'
environ[key] = value

# Add body data
if json_data:
import json
body = json.dumps(json_data).encode('utf-8')
environ['CONTENT_TYPE'] = 'application/json'
environ['CONTENT_LENGTH'] = str(len(body))
environ['wsgi.input'] = io.BytesIO(body)
elif data:
if isinstance(data, dict):
body = urlencode(data).encode('utf-8')
environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
else:
body = data.encode('utf-8') if isinstance(data, str) else data
environ['CONTENT_LENGTH'] = str(len(body))
environ['wsgi.input'] = io.BytesIO(body)

# Capture response
response_data = {}

def start_response(status, headers, exc_info=None):
response_data['status'] = status
response_data['headers'] = headers
return lambda x: None

# Call application
result = self.app(environ, start_response)
body = b''.join(result)

return {
'status': response_data['status'],
'headers': dict(response_data['headers']),
'body': body,
'text': body.decode('utf-8'),
}

# Usage
def test_app():
client = WSGITestClient(my_app)

# Test GET request
response = client.get('/')
assert response['status'] == '200 OK'
assert b'Hello' in response['body']

# Test POST request
response = client.post('/api/echo', json_data={'message': 'test'})
assert response['status'] == '200 OK'
assert 'application/json' in response['headers']['Content-Type']

Production Deployment

Environment Configuration

import os
from functools import lru_cache

class Config:
def __init__(self):
self.DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
self.PORT = int(os.getenv('PORT', 8000))
self.HOST = os.getenv('HOST', '0.0.0.0')
self.WORKERS = int(os.getenv('WORKERS', 4))
self.LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
self.DATABASE_URL = os.getenv('DATABASE_URL')
self.SECRET_KEY = os.getenv('SECRET_KEY', 'dev-key-change-in-production')

@lru_cache()
def get_config():
return Config()

# Application factory
def create_app():
config = get_config()

# Create base application
app = MyWSGIApp()

# Add middleware based on config
if config.DEBUG:
app = ErrorMiddleware(app, debug=True)

app = LoggingMiddleware(app)
app = CORSMiddleware(app)

return app

# For deployment
application = create_app()

Health Check Endpoint

def health_check_app(environ, start_response):
"""Health check endpoint for load balancers"""
path = environ['PATH_INFO']

if path == '/health':
# Perform health checks
checks = {
'database': check_database(),
'cache': check_cache(),
'disk_space': check_disk_space(),
}

all_healthy = all(checks.values())
status = '200 OK' if all_healthy else '503 Service Unavailable'

response = {
'status': 'healthy' if all_healthy else 'unhealthy',
'checks': checks,
'timestamp': time.time(),
}

start_response(status, [('Content-Type', 'application/json')])
return [json.dumps(response).encode('utf-8')]

# Call main application
return main_app(environ, start_response)

def check_database():
# Implement database health check
return True

def check_cache():
# Implement cache health check
return True

def check_disk_space():
# Implement disk space check
return True

This comprehensive guide covers WSGI fundamentals from basic concepts to production deployment patterns. Use these patterns as building blocks for your own WSGI applications and middleware.