Async Operations
Complete guide to asynchronous programming in Tornado with async/await, coroutines, and non-blocking patterns. Copy-paste ready examples for all async scenarios.
Async Request Handlers
Basic Async Handler
import tornado.web
import tornado.httpclient
import asyncio
class AsyncHandler(tornado.web.RequestHandler):
async def get(self):
# Async operation
result = await self.fetch_data()
self.write({"result": result})
async def post(self):
# Multiple async operations
data = tornado.escape.json_decode(self.request.body)
# Run operations concurrently
results = await asyncio.gather(
self.process_data(data),
self.log_request(),
self.update_cache()
)
self.write({"status": "success", "results": results})
async def fetch_data(self):
# Simulate async operation
await asyncio.sleep(0.1)
return "fetched data"
async def process_data(self, data):
await asyncio.sleep(0.2)
return f"processed: {data}"
async def log_request(self):
await asyncio.sleep(0.05)
return "logged"
async def update_cache(self):
await asyncio.sleep(0.1)
return "cache updated"
Async with Database Operations
import tornado.web
import asyncio
import aiopg
class DatabaseHandler(tornado.web.RequestHandler):
async def get(self):
# Get database connection
conn = await self.get_db_connection()
try:
# Execute query
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users WHERE active = %s", (True,))
users = await cursor.fetchall()
self.write({"users": users})
finally:
conn.close()
async def post(self):
# Insert new user
data = tornado.escape.json_decode(self.request.body)
conn = await self.get_db_connection()
try:
async with conn.cursor() as cursor:
await cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
(data["name"], data["email"])
)
user_id = await cursor.fetchone()
await conn.commit()
self.write({"status": "created", "user_id": user_id[0]})
except Exception as e:
await conn.rollback()
self.set_status(500)
self.write({"error": str(e)})
finally:
conn.close()
async def get_db_connection(self):
# Database connection setup
dsn = "host=localhost dbname=mydb user=myuser password=mypass"
return await aiopg.connect(dsn)
Async with Error Handling
import tornado.web
import asyncio
import logging
class ErrorHandlingHandler(tornado.web.RequestHandler):
async def get(self):
try:
# Multiple async operations with error handling
results = await asyncio.gather(
self.safe_operation_1(),
self.safe_operation_2(),
self.safe_operation_3(),
return_exceptions=True
)
# Process results and handle exceptions
success_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append(f"Operation {i+1} failed: {str(result)}")
else:
success_results.append(result)
response = {
"success_count": len(success_results),
"results": success_results
}
if errors:
response["errors"] = errors
self.write(response)
except Exception as e:
logging.error(f"Unexpected error: {e}")
self.set_status(500)
self.write({"error": "Internal server error"})
async def safe_operation_1(self):
await asyncio.sleep(0.1)
return "Operation 1 success"
async def safe_operation_2(self):
await asyncio.sleep(0.2)
# Simulate occasional failure
if random.random() < 0.3:
raise Exception("Operation 2 failed")
return "Operation 2 success"
async def safe_operation_3(self):
await asyncio.sleep(0.15)
return "Operation 3 success"
HTTP Client Operations
Basic HTTP Client
import tornado.httpclient
import tornado.web
class HTTPClientHandler(tornado.web.RequestHandler):
async def get(self):
# Simple GET request
http_client = tornado.httpclient.AsyncHTTPClient()
try:
response = await http_client.fetch("https://api.example.com/data")
data = tornado.escape.json_decode(response.body)
self.write(data)
except tornado.httpclient.HTTPError as e:
self.set_status(e.code)
self.write({"error": f"HTTP Error: {e.code}"})
except Exception as e:
self.set_status(500)
self.write({"error": str(e)})
finally:
http_client.close()
HTTP Client with Custom Headers
class CustomHTTPClientHandler(tornado.web.RequestHandler):
async def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
# Custom headers and request options
headers = {
"User-Agent": "MyApp/1.0",
"Authorization": "Bearer token123",
"Content-Type": "application/json"
}
request = tornado.httpclient.HTTPRequest(
url="https://api.example.com/secure-data",
method="GET",
headers=headers,
connect_timeout=5.0,
request_timeout=10.0,
validate_cert=True
)
try:
response = await http_client.fetch(request)
self.write(tornado.escape.json_decode(response.body))
except tornado.httpclient.HTTPError as e:
self.set_status(e.code)
self.write({"error": f"HTTP Error: {e.code}"})
finally:
http_client.close()
async def post(self):
http_client = tornado.httpclient.AsyncHTTPClient()
# POST with JSON data
data = tornado.escape.json_decode(self.request.body)
request = tornado.httpclient.HTTPRequest(
url="https://api.example.com/create",
method="POST",
headers={"Content-Type": "application/json"},
body=tornado.escape.json_encode(data)
)
try:
response = await http_client.fetch(request)
self.write(tornado.escape.json_decode(response.body))
except tornado.httpclient.HTTPError as e:
self.set_status(e.code)
self.write({"error": f"HTTP Error: {e.code}"})
finally:
http_client.close()
Multiple HTTP Requests
class MultipleRequestsHandler(tornado.web.RequestHandler):
async def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
# URLs to fetch
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments"
]
try:
# Fetch all URLs concurrently
responses = await asyncio.gather(*[
http_client.fetch(url) for url in urls
])
# Process responses
results = {}
for i, response in enumerate(responses):
data = tornado.escape.json_decode(response.body)
results[f"api_{i+1}"] = data
self.write(results)
except Exception as e:
self.set_status(500)
self.write({"error": str(e)})
finally:
http_client.close()
Background Tasks
Long-running Background Tasks
import tornado.web
import tornado.ioloop
import asyncio
import uuid
class BackgroundTaskHandler(tornado.web.RequestHandler):
# Store for tracking tasks
tasks = {}
async def post(self):
# Start background task
task_id = str(uuid.uuid4())
# Get task parameters
data = tornado.escape.json_decode(self.request.body)
duration = data.get("duration", 10)
# Create and start task
task = asyncio.create_task(self.long_running_task(task_id, duration))
self.tasks[task_id] = {
"task": task,
"status": "running",
"progress": 0,
"result": None
}
self.write({"task_id": task_id, "status": "started"})
async def get(self):
# Check task status
task_id = self.get_argument("task_id", default=None)
if not task_id or task_id not in self.tasks:
self.set_status(404)
self.write({"error": "Task not found"})
return
task_info = self.tasks[task_id]
# Check if task is done
if task_info["task"].done():
try:
result = task_info["task"].result()
task_info["status"] = "completed"
task_info["result"] = result
task_info["progress"] = 100
except Exception as e:
task_info["status"] = "failed"
task_info["result"] = str(e)
self.write({
"task_id": task_id,
"status": task_info["status"],
"progress": task_info["progress"],
"result": task_info["result"]
})
async def long_running_task(self, task_id, duration):
"""Simulate long-running task with progress updates"""
try:
steps = 10
for i in range(steps):
# Simulate work
await asyncio.sleep(duration / steps)
# Update progress
progress = int((i + 1) / steps * 100)
self.tasks[task_id]["progress"] = progress
# Simulate some processing
print(f"Task {task_id}: Step {i+1}/{steps} completed")
return f"Task {task_id} completed successfully"
except Exception as e:
print(f"Task {task_id} failed: {e}")
raise
Periodic Tasks
import tornado.ioloop
import asyncio
import logging
class PeriodicTaskManager:
def __init__(self):
self.tasks = {}
self.running = False
def start(self):
"""Start all periodic tasks"""
self.running = True
# Start different periodic tasks
self.tasks["cleanup"] = asyncio.create_task(self.cleanup_task())
self.tasks["heartbeat"] = asyncio.create_task(self.heartbeat_task())
self.tasks["monitor"] = asyncio.create_task(self.monitor_task())
logging.info("Periodic tasks started")
def stop(self):
"""Stop all periodic tasks"""
self.running = False
for task_name, task in self.tasks.items():
if not task.done():
task.cancel()
logging.info("Periodic tasks stopped")
async def cleanup_task(self):
"""Run cleanup every 30 minutes"""
while self.running:
try:
await asyncio.sleep(30 * 60) # 30 minutes
await self.perform_cleanup()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Cleanup task error: {e}")
async def heartbeat_task(self):
"""Send heartbeat every 60 seconds"""
while self.running:
try:
await asyncio.sleep(60) # 1 minute
await self.send_heartbeat()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Heartbeat task error: {e}")
async def monitor_task(self):
"""Monitor system every 5 minutes"""
while self.running:
try:
await asyncio.sleep(5 * 60) # 5 minutes
await self.monitor_system()
except asyncio.CancelledError:
break
except Exception as e:
logging.error(f"Monitor task error: {e}")
async def perform_cleanup(self):
logging.info("Performing cleanup...")
# Cleanup logic here
await asyncio.sleep(1) # Simulate cleanup work
async def send_heartbeat(self):
logging.info("Sending heartbeat...")
# Heartbeat logic here
await asyncio.sleep(0.1) # Simulate heartbeat work
async def monitor_system(self):
logging.info("Monitoring system...")
# Monitoring logic here
await asyncio.sleep(0.5) # Simulate monitoring work
# Usage in application
task_manager = PeriodicTaskManager()
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
# Start periodic tasks
task_manager.start()
try:
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
task_manager.stop()
Async Context Managers
Database Connection Manager
import asyncio
import aiopg
class DatabaseManager:
def __init__(self, dsn):
self.dsn = dsn
self.pool = None
async def __aenter__(self):
self.pool = await aiopg.create_pool(self.dsn)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.pool:
self.pool.close()
await self.pool.wait_closed()
async def execute_query(self, query, params=None):
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, params)
return await cursor.fetchall()
async def execute_transaction(self, queries):
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
try:
for query, params in queries:
await cursor.execute(query, params)
await conn.commit()
return True
except Exception:
await conn.rollback()
raise
class DatabaseHandler(tornado.web.RequestHandler):
async def get(self):
dsn = "host=localhost dbname=mydb user=myuser password=mypass"
async with DatabaseManager(dsn) as db:
users = await db.execute_query("SELECT * FROM users")
self.write({"users": users})
HTTP Client Session Manager
import tornado.httpclient
class HTTPSessionManager:
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.headers = headers or {}
self.client = None
async def __aenter__(self):
self.client = tornado.httpclient.AsyncHTTPClient()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.client:
self.client.close()
async def get(self, path, **kwargs):
url = f"{self.base_url}{path}"
headers = {**self.headers, **kwargs.get('headers', {})}
request = tornado.httpclient.HTTPRequest(
url=url,
method="GET",
headers=headers,
**kwargs
)
return await self.client.fetch(request)
async def post(self, path, data=None, **kwargs):
url = f"{self.base_url}{path}"
headers = {**self.headers, **kwargs.get('headers', {})}
request = tornado.httpclient.HTTPRequest(
url=url,
method="POST",
headers=headers,
body=tornado.escape.json_encode(data) if data else None,
**kwargs
)
return await self.client.fetch(request)
class APIHandler(tornado.web.RequestHandler):
async def get(self):
headers = {"Authorization": "Bearer token123"}
async with HTTPSessionManager("https://api.example.com", headers) as session:
response1 = await session.get("/users")
response2 = await session.get("/posts")
self.write({
"users": tornado.escape.json_decode(response1.body),
"posts": tornado.escape.json_decode(response2.body)
})
Async Decorators
Rate Limiting Decorator
import asyncio
import time
from functools import wraps
class RateLimiter:
def __init__(self, max_calls, time_window):
self.max_calls = max_calls
self.time_window = time_window
self.calls = {}
async def is_allowed(self, key):
current_time = time.time()
if key not in self.calls:
self.calls[key] = []
# Remove old calls
self.calls[key] = [call_time for call_time in self.calls[key]
if current_time - call_time < self.time_window]
# Check if limit exceeded
if len(self.calls[key]) >= self.max_calls:
return False
# Add current call
self.calls[key].append(current_time)
return True
# Global rate limiter instance
rate_limiter = RateLimiter(max_calls=10, time_window=60) # 10 calls per minute
def rate_limit(limiter):
def decorator(method):
@wraps(method)
async def wrapper(self, *args, **kwargs):
client_ip = self.request.remote_ip
if not await limiter.is_allowed(client_ip):
self.set_status(429)
self.write({"error": "Rate limit exceeded"})
return
return await method(self, *args, **kwargs)
return wrapper
return decorator
class RateLimitedHandler(tornado.web.RequestHandler):
@rate_limit(rate_limiter)
async def get(self):
# Simulate some work
await asyncio.sleep(0.1)
self.write({"message": "Request processed"})
Async Timeout Decorator
import asyncio
from functools import wraps
def async_timeout(seconds):
def decorator(method):
@wraps(method)
async def wrapper(self, *args, **kwargs):
try:
return await asyncio.wait_for(method(self, *args, **kwargs), timeout=seconds)
except asyncio.TimeoutError:
self.set_status(408)
self.write({"error": "Request timeout"})
return wrapper
return decorator
class TimeoutHandler(tornado.web.RequestHandler):
@async_timeout(5.0) # 5 second timeout
async def get(self):
# Simulate long operation
duration = float(self.get_argument("duration", "1"))
await asyncio.sleep(duration)
self.write({"message": f"Completed after {duration} seconds"})
Async Utilities
Async Queue Processing
import asyncio
import logging
class AsyncQueueProcessor:
def __init__(self, max_workers=5):
self.queue = asyncio.Queue()
self.workers = []
self.max_workers = max_workers
self.running = False
async def start(self):
self.running = True
# Start worker tasks
for i in range(self.max_workers):
worker = asyncio.create_task(self.worker(f"worker-{i}"))
self.workers.append(worker)
logging.info(f"Started {self.max_workers} queue workers")
async def stop(self):
self.running = False
# Cancel all workers
for worker in self.workers:
worker.cancel()
# Wait for workers to finish
await asyncio.gather(*self.workers, return_exceptions=True)
logging.info("Queue workers stopped")
async def add_job(self, job_data):
await self.queue.put(job_data)
async def worker(self, worker_name):
while self.running:
try:
# Get job from queue
job_data = await asyncio.wait_for(self.queue.get(), timeout=1.0)
# Process job
await self.process_job(job_data)
# Mark job as done
self.queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
logging.error(f"Worker {worker_name} error: {e}")
async def process_job(self, job_data):
# Simulate job processing
job_type = job_data.get("type", "unknown")
duration = job_data.get("duration", 1)
logging.info(f"Processing job: {job_type}")
await asyncio.sleep(duration)
logging.info(f"Job completed: {job_type}")
# Usage
queue_processor = AsyncQueueProcessor(max_workers=3)
class QueueHandler(tornado.web.RequestHandler):
async def post(self):
data = tornado.escape.json_decode(self.request.body)
# Add job to queue
await queue_processor.add_job(data)
self.write({"status": "job queued"})
Async Cache
import asyncio
import time
from typing import Any, Optional
class AsyncCache:
def __init__(self, default_ttl=3600):
self.cache = {}
self.default_ttl = default_ttl
self.lock = asyncio.Lock()
async def get(self, key: str) -> Optional[Any]:
async with self.lock:
if key in self.cache:
value, expiry = self.cache[key]
if time.time() < expiry:
return value
else:
del self.cache[key]
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
if ttl is None:
ttl = self.default_ttl
expiry = time.time() + ttl
async with self.lock:
self.cache[key] = (value, expiry)
async def delete(self, key: str) -> bool:
async with self.lock:
if key in self.cache:
del self.cache[key]
return True
return False
async def clear(self) -> None:
async with self.lock:
self.cache.clear()
async def cleanup_expired(self) -> None:
current_time = time.time()
expired_keys = []
async with self.lock:
for key, (value, expiry) in self.cache.items():
if current_time >= expiry:
expired_keys.append(key)
for key in expired_keys:
del self.cache[key]
# Usage
cache = AsyncCache(default_ttl=300) # 5 minutes
class CachedHandler(tornado.web.RequestHandler):
async def get(self):
cache_key = f"data:{self.get_argument('id', 'default')}"
# Try to get from cache
cached_data = await cache.get(cache_key)
if cached_data:
self.write({"data": cached_data, "source": "cache"})
else:
# Fetch fresh data
data = await self.fetch_fresh_data()
# Cache the result
await cache.set(cache_key, data, ttl=600) # 10 minutes
self.write({"data": data, "source": "fresh"})
async def fetch_fresh_data(self):
# Simulate data fetching
await asyncio.sleep(0.5)
return {"timestamp": time.time(), "value": "fresh data"}
This comprehensive guide covers all the essential async patterns in Tornado, from basic async handlers to advanced queue processing and caching systems.