Performance & Optimization
Advanced performance optimization techniques for WSGI and ASGI applications, including caching, monitoring, and scaling strategies.
Connection Pooling
Database Connection Pooling
import asyncio
import asyncpg
import psycopg2.pool
from contextlib import asynccontextmanager
# Async connection pool (ASGI)
class AsyncDatabasePool:
def __init__(self, database_url, min_size=10, max_size=20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool = None
async def initialize(self):
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size,
max_queries=50000,
max_inactive_connection_lifetime=300,
command_timeout=60
)
@asynccontextmanager
async def acquire(self):
async with self.pool.acquire() as connection:
yield connection
async def execute(self, query, *args):
async with self.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query, *args):
async with self.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query, *args):
async with self.acquire() as conn:
return await conn.fetchrow(query, *args)
async def close(self):
if self.pool:
await self.pool.close()
# Sync connection pool (WSGI)
class SyncDatabasePool:
def __init__(self, database_url, min_conn=5, max_conn=20):
self.pool = psycopg2.pool.ThreadedConnectionPool(
min_conn, max_conn, database_url
)
@contextmanager
def get_connection(self):
conn = self.pool.getconn()
try:
yield conn
finally:
self.pool.putconn(conn)
def execute(self, query, params=None):
with self.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
def fetch(self, query, params=None):
with self.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, params)
return cursor.fetchall()
Redis Connection Pooling
import redis.asyncio as redis
import redis as sync_redis
from redis.connection import ConnectionPool
# Async Redis pool
class AsyncRedisPool:
def __init__(self, redis_url, max_connections=20):
self.pool = redis.ConnectionPool.from_url(
redis_url,
max_connections=max_connections,
retry_on_timeout=True,
decode_responses=True
)
self.client = redis.Redis(connection_pool=self.pool)
async def get(self, key):
return await self.client.get(key)
async def set(self, key, value, ex=None):
return await self.client.set(key, value, ex=ex)
async def delete(self, key):
return await self.client.delete(key)
async def pipeline(self):
return self.client.pipeline()
async def close(self):
await self.client.close()
# Sync Redis pool
class SyncRedisPool:
def __init__(self, redis_url, max_connections=20):
self.pool = ConnectionPool.from_url(
redis_url,
max_connections=max_connections,
decode_responses=True
)
self.client = sync_redis.Redis(connection_pool=self.pool)
def get(self, key):
return self.client.get(key)
def set(self, key, value, ex=None):
return self.client.set(key, value, ex=ex)
def delete(self, key):
return self.client.delete(key)
def pipeline(self):
return self.client.pipeline()
Caching Strategies
Multi-Level Caching
import time
import hashlib
import pickle
from typing import Any, Optional
class MultiLevelCache:
def __init__(self, l1_size=1000, l2_redis_client=None, l3_db_pool=None):
# Level 1: In-memory LRU cache
self.l1_cache = {}
self.l1_access_order = []
self.l1_max_size = l1_size
# Level 2: Redis cache
self.l2_cache = l2_redis_client
# Level 3: Database cache table
self.l3_cache = l3_db_pool
async def get(self, key: str) -> Optional[Any]:
# Try L1 cache first
if key in self.l1_cache:
self._update_l1_access(key)
return self.l1_cache[key]['data']
# Try L2 cache (Redis)
if self.l2_cache:
data = await self.l2_cache.get(f"cache:{key}")
if data:
# Store in L1 for faster access
self._set_l1(key, pickle.loads(data))
return pickle.loads(data)
# Try L3 cache (Database)
if self.l3_cache:
async with self.l3_cache.acquire() as conn:
row = await conn.fetchrow(
"SELECT data, expires_at FROM cache WHERE key = $1", key
)
if row and row['expires_at'] > time.time():
data = pickle.loads(row['data'])
# Store in upper levels
if self.l2_cache:
await self.l2_cache.set(f"cache:{key}", row['data'], ex=3600)
self._set_l1(key, data)
return data
return None
async def set(self, key: str, value: Any, ttl: int = 3600):
serialized_data = pickle.dumps(value)
expires_at = time.time() + ttl
# Set in all levels
self._set_l1(key, value, expires_at)
if self.l2_cache:
await self.l2_cache.set(f"cache:{key}", serialized_data, ex=ttl)
if self.l3_cache:
async with self.l3_cache.acquire() as conn:
await conn.execute(
"INSERT INTO cache (key, data, expires_at) VALUES ($1, $2, $3) "
"ON CONFLICT (key) DO UPDATE SET data = $2, expires_at = $3",
key, serialized_data, expires_at
)
def _set_l1(self, key: str, value: Any, expires_at: float = None):
if expires_at is None:
expires_at = time.time() + 300 # 5 minutes default
# Evict if cache is full
if len(self.l1_cache) >= self.l1_max_size:
oldest_key = self.l1_access_order.pop(0)
del self.l1_cache[oldest_key]
self.l1_cache[key] = {
'data': value,
'expires_at': expires_at
}
self.l1_access_order.append(key)
def _update_l1_access(self, key: str):
# Move to end for LRU
self.l1_access_order.remove(key)
self.l1_access_order.append(key)
# Cache decorator
def cache_result(ttl=300, key_prefix=""):
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Generate cache key
key_data = f"{key_prefix}{func.__name__}:{args}:{sorted(kwargs.items())}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
# Try cache first
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = await func(*args, **kwargs)
await cache.set(cache_key, result, ttl)
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Generate cache key
key_data = f"{key_prefix}{func.__name__}:{args}:{sorted(kwargs.items())}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
# Try cache first
cached_result = sync_cache.get(cache_key)
if cached_result is not None:
return pickle.loads(cached_result)
# Execute function and cache result
result = func(*args, **kwargs)
sync_cache.set(cache_key, pickle.dumps(result), ex=ttl)
return result
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
Cache Invalidation
class CacheInvalidator:
def __init__(self, cache_client, db_pool):
self.cache = cache_client
self.db = db_pool
self.invalidation_patterns = {}
def register_pattern(self, entity_type, pattern):
"""Register cache invalidation pattern for entity type"""
self.invalidation_patterns[entity_type] = pattern
async def invalidate_entity(self, entity_type, entity_id):
"""Invalidate cache for specific entity"""
pattern = self.invalidation_patterns.get(entity_type)
if pattern:
keys_to_invalidate = pattern(entity_id)
await self.cache.delete(*keys_to_invalidate)
async def invalidate_pattern(self, pattern):
"""Invalidate all keys matching pattern"""
# Redis pattern matching
keys = await self.cache.keys(pattern)
if keys:
await self.cache.delete(*keys)
# Usage example
cache_invalidator = CacheInvalidator(redis_client, db_pool)
# Register patterns
cache_invalidator.register_pattern(
'user',
lambda user_id: [f"user:{user_id}", f"user_posts:{user_id}", f"user_profile:{user_id}"]
)
cache_invalidator.register_pattern(
'post',
lambda post_id: [f"post:{post_id}", f"post_comments:{post_id}"]
)
# Invalidate on updates
async def update_user(user_id, data):
# Update user in database
await db.execute("UPDATE users SET ... WHERE id = $1", user_id)
# Invalidate related cache
await cache_invalidator.invalidate_entity('user', user_id)
Request Optimization
Request Batching
import asyncio
from collections import defaultdict
class RequestBatcher:
def __init__(self, batch_size=10, max_wait_time=0.1):
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.pending_requests = defaultdict(list)
self.timers = {}
async def batch_request(self, request_type, request_data):
"""Add request to batch and return result"""
future = asyncio.Future()
# Add to pending requests
self.pending_requests[request_type].append({
'data': request_data,
'future': future
})
# Start timer if not already running
if request_type not in self.timers:
self.timers[request_type] = asyncio.create_task(
self._wait_and_process(request_type)
)
# Process immediately if batch is full
if len(self.pending_requests[request_type]) >= self.batch_size:
self.timers[request_type].cancel()
await self._process_batch(request_type)
return await future
async def _wait_and_process(self, request_type):
"""Wait for max_wait_time then process batch"""
try:
await asyncio.sleep(self.max_wait_time)
await self._process_batch(request_type)
except asyncio.CancelledError:
pass
async def _process_batch(self, request_type):
"""Process all pending requests of given type"""
if request_type not in self.pending_requests:
return
requests = self.pending_requests[request_type]
del self.pending_requests[request_type]
del self.timers[request_type]
if not requests:
return
try:
# Process batch based on request type
if request_type == 'user_fetch':
results = await self._batch_fetch_users([r['data'] for r in requests])
elif request_type == 'post_fetch':
results = await self._batch_fetch_posts([r['data'] for r in requests])
else:
results = [None] * len(requests)
# Set results for all futures
for request, result in zip(requests, results):
request['future'].set_result(result)
except Exception as e:
# Set exception for all futures
for request in requests:
request['future'].set_exception(e)
async def _batch_fetch_users(self, user_ids):
"""Fetch multiple users in single query"""
async with db_pool.acquire() as conn:
users = await conn.fetch(
"SELECT * FROM users WHERE id = ANY($1)", user_ids
)
# Return users in same order as requested
user_map = {user['id']: user for user in users}
return [user_map.get(user_id) for user_id in user_ids]
async def _batch_fetch_posts(self, post_ids):
"""Fetch multiple posts in single query"""
async with db_pool.acquire() as conn:
posts = await conn.fetch(
"SELECT * FROM posts WHERE id = ANY($1)", post_ids
)
post_map = {post['id']: post for post in posts}
return [post_map.get(post_id) for post_id in post_ids]
# Usage
batcher = RequestBatcher()
async def get_user(user_id):
return await batcher.batch_request('user_fetch', user_id)
async def get_post(post_id):
return await batcher.batch_request('post_fetch', post_id)
Response Streaming
import asyncio
import json
# ASGI streaming response
async def streaming_response(scope, receive, send):
"""Stream large datasets"""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'application/json']],
})
# Stream opening bracket
await send({
'type': 'http.response.body',
'body': b'{"users": [',
'more_body': True,
})
# Stream data in chunks
first_item = True
async for user in get_users_stream():
if not first_item:
await send({
'type': 'http.response.body',
'body': b',',
'more_body': True,
})
first_item = False
user_json = json.dumps(user).encode('utf-8')
await send({
'type': 'http.response.body',
'body': user_json,
'more_body': True,
})
# Stream closing
await send({
'type': 'http.response.body',
'body': b']}',
'more_body': False,
})
async def get_users_stream():
"""Generator for streaming users from database"""
async with db_pool.acquire() as conn:
async with conn.transaction():
cursor = await conn.cursor("SELECT * FROM users ORDER BY id")
while True:
rows = await cursor.fetch(100) # Fetch in chunks
if not rows:
break
for row in rows:
yield dict(row)
# WSGI streaming response
def wsgi_streaming_response(environ, start_response):
"""WSGI streaming response"""
start_response('200 OK', [('Content-Type', 'application/json')])
def generate():
yield b'{"users": ['
first_item = True
for user in get_users_sync_stream():
if not first_item:
yield b','
first_item = False
yield json.dumps(user).encode('utf-8')
yield b']}'
return generate()
Database Optimization
Query Optimization
class QueryOptimizer:
def __init__(self, db_pool):
self.db = db_pool
self.query_cache = {}
self.prepared_statements = {}
async def prepare_statement(self, name, query):
"""Prepare frequently used statements"""
async with self.db.acquire() as conn:
self.prepared_statements[name] = await conn.prepare(query)
async def execute_prepared(self, name, *args):
"""Execute prepared statement"""
if name not in self.prepared_statements:
raise ValueError(f"Statement {name} not prepared")
async with self.db.acquire() as conn:
return await self.prepared_statements[name].fetch(*args)
async def bulk_insert(self, table, columns, data):
"""Optimized bulk insert"""
if not data:
return
placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
values_clause = ', '.join([f'({placeholders})' for _ in data])
query = f"INSERT INTO {table} ({', '.join(columns)}) VALUES {values_clause}"
# Flatten data for parameterized query
flat_data = [item for row in data for item in row]
async with self.db.acquire() as conn:
return await conn.execute(query, *flat_data)
async def bulk_update(self, table, updates, id_column='id'):
"""Optimized bulk update using temporary table"""
if not updates:
return
# Create temporary table
columns = list(updates[0].keys())
temp_table = f"temp_{table}_{int(time.time())}"
async with self.db.acquire() as conn:
async with conn.transaction():
# Create temp table
create_temp = f"""
CREATE TEMP TABLE {temp_table} AS
SELECT * FROM {table} WHERE FALSE
"""
await conn.execute(create_temp)
# Insert update data
await self.bulk_insert(temp_table, columns,
[[update[col] for col in columns] for update in updates])
# Perform bulk update
set_clause = ', '.join([f"{col} = t.{col}" for col in columns if col != id_column])
update_query = f"""
UPDATE {table}
SET {set_clause}
FROM {temp_table} t
WHERE {table}.{id_column} = t.{id_column}
"""
await conn.execute(update_query)
Connection Pool Optimization
class OptimizedConnectionPool:
def __init__(self, database_url, min_size=5, max_size=20):
self.database_url = database_url
self.min_size = min_size
self.max_size = max_size
self.pool = None
self.connection_metrics = {
'total_connections': 0,
'active_connections': 0,
'idle_connections': 0,
'failed_connections': 0
}
async def initialize(self):
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_size,
max_size=self.max_size,
max_queries=50000,
max_inactive_connection_lifetime=300,
command_timeout=60,
server_settings={
'application_name': 'my_app',
'tcp_keepalives_idle': '600',
'tcp_keepalives_interval': '30',
'tcp_keepalives_count': '3',
}
)
async def get_pool_stats(self):
"""Get connection pool statistics"""
return {
'size': self.pool.get_size(),
'min_size': self.pool.get_min_size(),
'max_size': self.pool.get_max_size(),
'idle_size': self.pool.get_idle_size(),
'metrics': self.connection_metrics
}
@contextmanager
async def acquire_with_metrics(self):
"""Acquire connection with metrics tracking"""
start_time = time.time()
try:
async with self.pool.acquire() as conn:
self.connection_metrics['active_connections'] += 1
yield conn
except Exception as e:
self.connection_metrics['failed_connections'] += 1
raise
finally:
self.connection_metrics['active_connections'] -= 1
duration = time.time() - start_time
# Log slow queries
if duration > 1.0:
logging.warning(f"Slow database operation: {duration:.2f}s")
Monitoring and Profiling
Performance Monitoring
import time
import psutil
from collections import defaultdict
class PerformanceMonitor:
def __init__(self):
self.metrics = defaultdict(list)
self.start_time = time.time()
def record_metric(self, name, value, timestamp=None):
"""Record a performance metric"""
if timestamp is None:
timestamp = time.time()
self.metrics[name].append({
'value': value,
'timestamp': timestamp
})
# Keep only last 1000 entries
if len(self.metrics[name]) > 1000:
self.metrics[name] = self.metrics[name][-1000:]
def get_system_metrics(self):
"""Get current system metrics"""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'network_io': psutil.net_io_counters()._asdict(),
'uptime': time.time() - self.start_time
}
def get_metric_summary(self, name, window_seconds=300):
"""Get metric summary for time window"""
cutoff_time = time.time() - window_seconds
recent_metrics = [
m for m in self.metrics[name]
if m['timestamp'] > cutoff_time
]
if not recent_metrics:
return None
values = [m['value'] for m in recent_metrics]
return {
'count': len(values),
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values),
'latest': values[-1]
}
# Performance monitoring middleware
class PerformanceMiddleware:
def __init__(self, app, monitor):
self.app = app
self.monitor = monitor
async def __call__(self, scope, receive, send):
if scope['type'] != 'http':
await self.app(scope, receive, send)
return
start_time = time.time()
request_size = 0
response_size = 0
# Track request size
async def receive_with_size():
message = await receive()
if message['type'] == 'http.request':
nonlocal request_size
request_size += len(message.get('body', b''))
return message
# Track response size and status
response_status = None
async def send_with_metrics(message):
nonlocal response_size, response_status
if message['type'] == 'http.response.start':
response_status = message['status']
elif message['type'] == 'http.response.body':
response_size += len(message.get('body', b''))
await send(message)
try:
await self.app(scope, receive_with_size, send_with_metrics)
finally:
duration = time.time() - start_time
# Record metrics
self.monitor.record_metric('request_duration', duration)
self.monitor.record_metric('request_size', request_size)
self.monitor.record_metric('response_size', response_size)
if response_status:
self.monitor.record_metric(f'status_{response_status}', 1)
# Log slow requests
if duration > 1.0:
logging.warning(
f"Slow request: {scope['method']} {scope['path']} "
f"took {duration:.2f}s"
)
Memory Profiling
import tracemalloc
import gc
from memory_profiler import profile
class MemoryProfiler:
def __init__(self):
self.snapshots = []
tracemalloc.start()
def take_snapshot(self, name):
"""Take memory snapshot"""
snapshot = tracemalloc.take_snapshot()
self.snapshots.append({
'name': name,
'snapshot': snapshot,
'timestamp': time.time()
})
def compare_snapshots(self, name1, name2):
"""Compare two memory snapshots"""
snap1 = next(s for s in self.snapshots if s['name'] == name1)
snap2 = next(s for s in self.snapshots if s['name'] == name2)
top_stats = snap2['snapshot'].compare_to(snap1['snapshot'], 'lineno')
print(f"Memory comparison between {name1} and {name2}:")
for stat in top_stats[:10]:
print(stat)
def get_memory_usage(self):
"""Get current memory usage"""
current, peak = tracemalloc.get_traced_memory()
return {
'current': current / 1024 / 1024, # MB
'peak': peak / 1024 / 1024, # MB
'gc_stats': gc.get_stats()
}
# Memory monitoring decorator
def monitor_memory(func):
@wraps(func)
async def wrapper(*args, **kwargs):
profiler = MemoryProfiler()
profiler.take_snapshot('start')
try:
result = await func(*args, **kwargs)
return result
finally:
profiler.take_snapshot('end')
profiler.compare_snapshots('start', 'end')
return wrapper
This comprehensive guide covers advanced performance optimization techniques including connection pooling, caching strategies, request optimization, database optimization, and monitoring for both WSGI and ASGI applications.