Fundamentals
Comprehensive guide to Asynchronous Server Gateway Interface (ASGI) concepts, implementation, and async patterns.
ASGI Specification Overview
Basic ASGI Application Structure
async def application(scope, receive, send):
"""
ASGI application callable
Args:
scope: Dict containing connection information
receive: Callable to receive messages from client
send: Callable to send messages to client
"""
if scope['type'] == 'http':
await handle_http(scope, receive, send)
elif scope['type'] == 'websocket':
await handle_websocket(scope, receive, send)
else:
raise ValueError(f"Unknown scope type: {scope['type']}")
async def handle_http(scope, receive, send):
"""Handle HTTP requests"""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Hello World',
})
Scope Dictionary Structure
async def debug_scope(scope, receive, send):
"""Print scope information"""
import json
# HTTP scope information
if scope['type'] == 'http':
method = scope['method']
path = scope['path']
query_string = scope.get('query_string', b'').decode('utf-8')
headers = {k.decode(): v.decode() for k, v in scope.get('headers', [])}
scope_info = {
'type': scope['type'],
'method': method,
'path': path,
'query_string': query_string,
'headers': headers,
'server': scope.get('server'),
'client': scope.get('client'),
'asgi': scope.get('asgi'),
}
response_body = json.dumps(scope_info, indent=2).encode('utf-8')
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': response_body,
})
ASGI Application Patterns
Class-Based Application
class ASGIApplication:
def __init__(self, config=None):
self.config = config or {}
self.http_routes = {}
self.websocket_routes = {}
async def __call__(self, scope, receive, send):
"""Make the class callable as ASGI app"""
if scope['type'] == 'http':
await self.handle_http(scope, receive, send)
elif scope['type'] == 'websocket':
await self.handle_websocket(scope, receive, send)
else:
raise ValueError(f"Unknown scope type: {scope['type']}")
async def handle_http(self, scope, receive, send):
method = scope['method']
path = scope['path']
# Route handling
handler = self.http_routes.get((method, path))
if handler:
await handler(scope, receive, send)
else:
await self.not_found(scope, receive, send)
async def handle_websocket(self, scope, receive, send):
path = scope['path']
handler = self.websocket_routes.get(path)
if handler:
await handler(scope, receive, send)
else:
await send({'type': 'websocket.close', 'code': 1000})
async def not_found(self, scope, receive, send):
await send({
'type': 'http.response.start',
'status': 404,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Not Found',
})
def add_http_route(self, method, path, handler):
self.http_routes[(method, path)] = handler
def add_websocket_route(self, path, handler):
self.websocket_routes[path] = handler
# Usage
app = ASGIApplication()
async def home_handler(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/html']],
})
await send({
'type': 'http.response.body',
'body': b'<h1>Home Page</h1>',
})
async def api_handler(scope, receive, send):
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': b'{"message": "API Response"}',
})
app.add_http_route('GET', '/', home_handler)
app.add_http_route('GET', '/api', api_handler)
Request Body Processing
async def parse_request_body(scope, receive):
"""Parse request body from ASGI messages"""
body = b''
while True:
message = await receive()
if message['type'] == 'http.request':
body += message.get('body', b'')
if not message.get('more_body', False):
break
else:
break
return body
async def parse_json_request(scope, receive, send):
"""Parse JSON request body"""
import json
try:
body = await parse_request_body(scope, receive)
if body:
data = json.loads(body.decode('utf-8'))
else:
data = {}
# Echo back the JSON data
response_data = {
'received': data,
'method': scope['method'],
'path': scope['path']
}
response_body = json.dumps(response_data).encode('utf-8')
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': response_body,
})
except json.JSONDecodeError:
await send({
'type': 'http.response.start',
'status': 400,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Invalid JSON',
})
async def parse_form_request(scope, receive, send):
"""Parse form-encoded request body"""
from urllib.parse import parse_qs
body = await parse_request_body(scope, receive)
# Parse form data
form_data = {}
content_type = dict(scope.get('headers', [])).get(b'content-type', b'').decode()
if content_type.startswith('application/x-www-form-urlencoded'):
form_data = parse_qs(body.decode('utf-8'))
response_data = {
'form_data': form_data,
'method': scope['method'],
'path': scope['path']
}
response_body = json.dumps(response_data).encode('utf-8')
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': response_body,
})
WebSocket Support
Basic WebSocket Handler
async def websocket_handler(scope, receive, send):
"""Basic WebSocket handler"""
# Accept the WebSocket connection
await send({'type': 'websocket.accept'})
try:
while True:
# Wait for messages from client
message = await receive()
if message['type'] == 'websocket.receive':
# Handle text and binary messages
if 'text' in message:
text = message['text']
# Echo back the message
await send({
'type': 'websocket.send',
'text': f'Echo: {text}'
})
elif 'bytes' in message:
data = message['bytes']
# Echo back binary data
await send({
'type': 'websocket.send',
'bytes': data
})
elif message['type'] == 'websocket.disconnect':
# Client disconnected
break
except Exception as e:
# Handle errors
await send({
'type': 'websocket.close',
'code': 1011, # Internal error
'reason': str(e)
})
WebSocket Chat Application
import asyncio
import json
from collections import defaultdict
class WebSocketChatApp:
def __init__(self):
self.connections = defaultdict(set) # room_id -> set of connections
self.user_rooms = {} # connection_id -> room_id
async def __call__(self, scope, receive, send):
if scope['type'] == 'websocket':
await self.handle_websocket(scope, receive, send)
else:
await self.handle_http(scope, receive, send)
async def handle_websocket(self, scope, receive, send):
path = scope['path']
if path.startswith('/chat/'):
room_id = path.split('/')[-1]
await self.chat_handler(scope, receive, send, room_id)
else:
await send({'type': 'websocket.close', 'code': 1000})
async def chat_handler(self, scope, receive, send, room_id):
# Accept connection
await send({'type': 'websocket.accept'})
# Generate connection ID
connection_id = id(send)
# Add to room
self.connections[room_id].add(send)
self.user_rooms[connection_id] = room_id
# Notify room of new user
await self.broadcast_to_room(room_id, {
'type': 'user_joined',
'message': f'User {connection_id} joined the room',
'room': room_id,
'user_count': len(self.connections[room_id])
})
try:
while True:
message = await receive()
if message['type'] == 'websocket.receive':
if 'text' in message:
try:
data = json.loads(message['text'])
await self.handle_chat_message(data, room_id, connection_id)
except json.JSONDecodeError:
await send({
'type': 'websocket.send',
'text': json.dumps({'error': 'Invalid JSON'})
})
elif message['type'] == 'websocket.disconnect':
break
except Exception as e:
print(f"WebSocket error: {e}")
finally:
# Clean up
self.connections[room_id].discard(send)
if connection_id in self.user_rooms:
del self.user_rooms[connection_id]
# Notify room of user leaving
await self.broadcast_to_room(room_id, {
'type': 'user_left',
'message': f'User {connection_id} left the room',
'room': room_id,
'user_count': len(self.connections[room_id])
})
async def handle_chat_message(self, data, room_id, connection_id):
"""Handle chat messages"""
message_type = data.get('type')
if message_type == 'chat':
# Broadcast chat message to room
await self.broadcast_to_room(room_id, {
'type': 'chat',
'message': data.get('message', ''),
'user': connection_id,
'room': room_id,
'timestamp': asyncio.get_event_loop().time()
})
elif message_type == 'typing':
# Broadcast typing indicator to room (except sender)
await self.broadcast_to_room(room_id, {
'type': 'typing',
'user': connection_id,
'room': room_id
}, exclude=[connection_id])
async def broadcast_to_room(self, room_id, message, exclude=None):
"""Broadcast message to all connections in a room"""
exclude = exclude or []
message_text = json.dumps(message)
# Send to all connections in room
for connection in self.connections[room_id].copy():
if id(connection) not in exclude:
try:
await connection({
'type': 'websocket.send',
'text': message_text
})
except Exception as e:
# Remove dead connections
self.connections[room_id].discard(connection)
print(f"Removed dead connection: {e}")
async def handle_http(self, scope, receive, send):
"""Serve WebSocket client HTML"""
if scope['path'] == '/':
html = """
<!DOCTYPE html>
<html>
<head><title>WebSocket Chat</title></head>
<body>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="Type message...">
<button onclick="sendMessage()">Send</button>
<script>
const ws = new WebSocket('ws://localhost:8000/chat/room1');
const messages = document.getElementById('messages');
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
messages.innerHTML += '<div>' + JSON.stringify(data) + '</div>';
};
function sendMessage() {
const input = document.getElementById('messageInput');
ws.send(JSON.stringify({
type: 'chat',
message: input.value
}));
input.value = '';
}
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
"""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/html']],
})
await send({
'type': 'http.response.body',
'body': html.encode('utf-8'),
})
else:
await send({
'type': 'http.response.start',
'status': 404,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Not Found',
})
# Usage
chat_app = WebSocketChatApp()
ASGI Middleware
Basic Middleware Pattern
class ASGIMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# Pre-processing
# ... modify scope or perform checks
# Call the wrapped application
await self.app(scope, receive, send)
Logging Middleware
import time
import logging
class LoggingMiddleware:
def __init__(self, app, logger=None):
self.app = app
self.logger = logger or logging.getLogger(__name__)
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
await self.log_http_request(scope, receive, send)
elif scope['type'] == 'websocket':
await self.log_websocket_connection(scope, receive, send)
else:
await self.app(scope, receive, send)
async def log_http_request(self, scope, receive, send):
start_time = time.time()
# Capture response info
response_info = {'status': None}
async def logging_send(message):
if message['type'] == 'http.response.start':
response_info['status'] = message['status']
await send(message)
# Call the application
await self.app(scope, receive, logging_send)
# Log the request
duration = time.time() - start_time
self.logger.info(
f"HTTP {scope['method']} {scope['path']} "
f"{response_info['status']} {duration:.3f}s"
)
async def log_websocket_connection(self, scope, receive, send):
self.logger.info(f"WebSocket connection to {scope['path']}")
async def logging_send(message):
if message['type'] == 'websocket.close':
self.logger.info(f"WebSocket closed {scope['path']}")
await send(message)
await self.app(scope, receive, logging_send)
Authentication Middleware
import jwt
import json
class JWTAuthMiddleware:
def __init__(self, app, secret_key, protected_paths=None):
self.app = app
self.secret_key = secret_key
self.protected_paths = protected_paths or []
async def __call__(self, scope, receive, send):
# Check if path requires authentication
if scope['path'] in self.protected_paths:
if scope['type'] == 'http':
await self.authenticate_http(scope, receive, send)
elif scope['type'] == 'websocket':
await self.authenticate_websocket(scope, receive, send)
else:
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
async def authenticate_http(self, scope, receive, send):
# Get Authorization header
headers = dict(scope.get('headers', []))
auth_header = headers.get(b'authorization', b'').decode()
if not auth_header.startswith('Bearer '):
await self.unauthorized(send)
return
token = auth_header[7:]
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
# Add user info to scope
scope['user'] = payload
await self.app(scope, receive, send)
except jwt.InvalidTokenError:
await self.unauthorized(send)
async def authenticate_websocket(self, scope, receive, send):
# For WebSocket, check token in query parameters
query_string = scope.get('query_string', b'').decode()
from urllib.parse import parse_qs
query_params = parse_qs(query_string)
token = query_params.get('token', [None])[0]
if not token:
await send({'type': 'websocket.close', 'code': 1008})
return
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
scope['user'] = payload
await self.app(scope, receive, send)
except jwt.InvalidTokenError:
await send({'type': 'websocket.close', 'code': 1008})
async def unauthorized(self, send):
await send({
'type': 'http.response.start',
'status': 401,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': json.dumps({'error': 'Unauthorized'}).encode('utf-8'),
})
Background Tasks and Async Operations
Background Task Execution
import asyncio
from typing import Callable, Any
class BackgroundTaskMiddleware:
def __init__(self, app):
self.app = app
self.background_tasks = set()
async def __call__(self, scope, receive, send):
# Add background task execution to scope
scope['background_tasks'] = self.add_background_task
await self.app(scope, receive, send)
def add_background_task(self, func: Callable, *args, **kwargs):
"""Add a background task to be executed after response"""
task = asyncio.create_task(func(*args, **kwargs))
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
# Usage in application
async def send_email(to_email, subject, body):
"""Simulate sending email"""
await asyncio.sleep(2) # Simulate network delay
print(f"Email sent to {to_email}: {subject}")
async def user_signup_handler(scope, receive, send):
"""Handle user signup with background email task"""
body = await parse_request_body(scope, receive)
data = json.loads(body.decode('utf-8'))
# Process signup (save to database, etc.)
user_email = data['email']
# Add background task for sending welcome email
background_tasks = scope.get('background_tasks')
if background_tasks:
background_tasks(
send_email,
user_email,
'Welcome!',
'Thank you for signing up!'
)
# Send immediate response
await send({
'type': 'http.response.start',
'status': 201,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': json.dumps({'message': 'User created successfully'}).encode('utf-8'),
})
Database Connection Pooling
import asyncio
import aiopg # For PostgreSQL
from contextlib import asynccontextmanager
class DatabaseMiddleware:
def __init__(self, app, database_url):
self.app = app
self.database_url = database_url
self.pool = None
async def __call__(self, scope, receive, send):
if not self.pool:
self.pool = await aiopg.create_pool(self.database_url)
# Add database connection to scope
scope['db'] = self.get_db_connection
await self.app(scope, receive, send)
@asynccontextmanager
async def get_db_connection(self):
async with self.pool.acquire() as conn:
yield conn
# Usage in application
async def get_users_handler(scope, receive, send):
"""Get users from database"""
get_db = scope.get('db')
if not get_db:
await send({
'type': 'http.response.start',
'status': 500,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Database not available',
})
return
try:
async with get_db() as db:
async with db.cursor() as cursor:
await cursor.execute("SELECT id, name, email FROM users")
users = await cursor.fetchall()
response_data = {
'users': [{'id': u[0], 'name': u[1], 'email': u[2]} for u in users]
}
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': json.dumps(response_data).encode('utf-8'),
})
except Exception as e:
await send({
'type': 'http.response.start',
'status': 500,
'headers': [[b'content-type', b'application/json']],
})
await send({
'type': 'http.response.body',
'body': json.dumps({'error': str(e)}).encode('utf-8'),
})
Server-Sent Events (SSE)
SSE Implementation
import asyncio
import json
import time
async def sse_handler(scope, receive, send):
"""Server-Sent Events handler"""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/event-stream'],
[b'cache-control', b'no-cache'],
[b'connection', b'keep-alive'],
[b'access-control-allow-origin', b'*'],
],
})
# Send initial connection message
await send({
'type': 'http.response.body',
'body': f'data: {json.dumps({"type": "connected", "timestamp": time.time()})}\n\n'.encode(),
'more_body': True,
})
# Send periodic updates
counter = 0
while True:
try:
counter += 1
data = {
'type': 'update',
'counter': counter,
'timestamp': time.time(),
'message': f'Update #{counter}'
}
message = f'data: {json.dumps(data)}\n\n'
await send({
'type': 'http.response.body',
'body': message.encode(),
'more_body': True,
})
await asyncio.sleep(1) # Send update every second
except Exception as e:
# Client disconnected
break
# End the response
await send({
'type': 'http.response.body',
'body': b'',
'more_body': False,
})
# HTML client for SSE
async def sse_client_handler(scope, receive, send):
"""Serve SSE client HTML"""
html = """
<!DOCTYPE html>
<html>
<head><title>Server-Sent Events</title></head>
<body>
<h1>Server-Sent Events Demo</h1>
<div id="messages"></div>
<script>
const eventSource = new EventSource('/sse');
const messages = document.getElementById('messages');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
messages.innerHTML += '<div>' +
new Date(data.timestamp * 1000).toLocaleString() +
': ' + data.message + '</div>';
};
eventSource.onerror = function(event) {
messages.innerHTML += '<div>Error: Connection lost</div>';
};
</script>
</body>
</html>
"""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/html']],
})
await send({
'type': 'http.response.body',
'body': html.encode('utf-8'),
})
Testing ASGI Applications
ASGI Test Client
import asyncio
import json
from typing import Dict, Any, List, Optional
class ASGITestClient:
def __init__(self, app):
self.app = app
async def get(self, path: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
return await self.request('GET', path, headers=headers)
async def post(self, path: str, data: Optional[Dict] = None,
json_data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
return await self.request('POST', path, data=data, json_data=json_data, headers=headers)
async def request(self, method: str, path: str, data: Optional[Dict] = None,
json_data: Optional[Dict] = None, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
# Build scope
scope = {
'type': 'http',
'method': method,
'path': path,
'query_string': b'',
'headers': [],
'server': ('testserver', 80),
'client': ('testclient', 12345),
'asgi': {'version': '3.0', 'spec_version': '2.1'},
}
# Add headers
if headers:
for key, value in headers.items():
scope['headers'].append([key.lower().encode(), value.encode()])
# Prepare request body
body = b''
if json_data:
body = json.dumps(json_data).encode('utf-8')
scope['headers'].append([b'content-type', b'application/json'])
elif data:
from urllib.parse import urlencode
body = urlencode(data).encode('utf-8')
scope['headers'].append([b'content-type', b'application/x-www-form-urlencoded'])
# Create receive callable
async def receive():
return {
'type': 'http.request',
'body': body,
'more_body': False,
}
# Capture response
response_data = {'status': None, 'headers': [], 'body': b''}
async def send(message):
if message['type'] == 'http.response.start':
response_data['status'] = message['status']
response_data['headers'] = message.get('headers', [])
elif message['type'] == 'http.response.body':
response_data['body'] += message.get('body', b'')
# Call the application
await self.app(scope, receive, send)
return {
'status': response_data['status'],
'headers': dict(response_data['headers']),
'body': response_data['body'],
'text': response_data['body'].decode('utf-8'),
}
async def websocket_connect(self, path: str, query_params: Optional[Dict] = None) -> 'WebSocketTestSession':
return WebSocketTestSession(self.app, path, query_params)
class WebSocketTestSession:
def __init__(self, app, path: str, query_params: Optional[Dict] = None):
self.app = app
self.path = path
self.query_params = query_params or {}
self.sent_messages = []
self.received_messages = []
self.connected = False
async def connect(self):
from urllib.parse import urlencode
query_string = urlencode(self.query_params).encode('utf-8')
scope = {
'type': 'websocket',
'path': self.path,
'query_string': query_string,
'headers': [],
'server': ('testserver', 80),
'client': ('testclient', 12345),
'asgi': {'version': '3.0', 'spec_version': '2.1'},
}
async def receive():
if not self.connected:
return {'type': 'websocket.connect'}
elif self.sent_messages:
return self.sent_messages.pop(0)
else:
# Simulate waiting for messages
await asyncio.sleep(0.1)
return {'type': 'websocket.receive', 'text': ''}
async def send(message):
if message['type'] == 'websocket.accept':
self.connected = True
elif message['type'] == 'websocket.send':
self.received_messages.append(message)
# Start the WebSocket connection
self.task = asyncio.create_task(self.app(scope, receive, send))
await asyncio.sleep(0.1) # Let connection establish
return self
async def send_text(self, text: str):
self.sent_messages.append({
'type': 'websocket.receive',
'text': text
})
async def send_json(self, data: Dict):
await self.send_text(json.dumps(data))
async def receive_text(self) -> str:
while not self.received_messages:
await asyncio.sleep(0.1)
message = self.received_messages.pop(0)
return message.get('text', '')
async def receive_json(self) -> Dict:
text = await self.receive_text()
return json.loads(text)
async def close(self):
self.sent_messages.append({'type': 'websocket.disconnect'})
if hasattr(self, 'task'):
await self.task
# Usage
async def test_asgi_app():
client = ASGITestClient(my_app)
# Test HTTP request
response = await client.get('/')
assert response['status'] == 200
assert b'Hello' in response['body']
# Test JSON POST
response = await client.post('/api/echo', json_data={'message': 'test'})
assert response['status'] == 200
# Test WebSocket
ws = await client.websocket_connect('/ws')
await ws.connect()
await ws.send_json({'type': 'hello', 'message': 'test'})
response = await ws.receive_json()
assert response['type'] == 'hello'
await ws.close()
This comprehensive guide covers ASGI fundamentals including HTTP handling, WebSocket support, middleware patterns, background tasks, and testing. Use these patterns to build modern async Python web applications.