Web Development & APIs
HTTP Client - Requests Library
Basic HTTP Requests
import requests
# GET request
response = requests.get('https://api.github.com/users/octocat')
print(response.status_code) # 200
print(response.headers['content-type']) # application/json
data = response.json() # Parse JSON response
# POST request with JSON data
payload = {'name': 'John', 'age': 30}
response = requests.post('https://api.example.com/users', json=payload)
# POST request with form data
form_data = {'username': 'john', 'password': 'secret'}
response = requests.post('https://api.example.com/login', data=form_data)
# PUT request
response = requests.put('https://api.example.com/users/123', json={'name': 'Jane'})
# DELETE request
response = requests.delete('https://api.example.com/users/123')
Request Headers and Parameters
# Custom headers
headers = {
'User-Agent': 'MyApp/1.0',
'Authorization': 'Bearer token123',
'Content-Type': 'application/json'
}
response = requests.get('https://api.example.com/data', headers=headers)
# Query parameters
params = {'page': 1, 'limit': 10, 'search': 'python'}
response = requests.get('https://api.example.com/search', params=params)
# URL becomes: https://api.example.com/search?page=1&limit=10&search=python
# Timeout
response = requests.get('https://api.example.com/slow', timeout=5)
# Cookies
cookies = {'session_id': 'abc123'}
response = requests.get('https://api.example.com/protected', cookies=cookies)
Authentication
# Basic authentication
from requests.auth import HTTPBasicAuth
response = requests.get('https://api.example.com/protected',
auth=HTTPBasicAuth('username', 'password'))
# API key authentication
headers = {'X-API-Key': 'your-api-key'}
response = requests.get('https://api.example.com/data', headers=headers)
# Bearer token authentication
headers = {'Authorization': 'Bearer your-jwt-token'}
response = requests.get('https://api.example.com/protected', headers=headers)
# OAuth 2.0 (using requests-oauthlib)
from requests_oauthlib import OAuth2Session
oauth = OAuth2Session(client_id='your-client-id')
response = oauth.get('https://api.example.com/protected')
Sessions and Connection Reuse
# Using sessions for connection reuse and cookie persistence
with requests.Session() as session:
# Login
session.post('https://api.example.com/login', data={'user': 'john', 'pass': 'secret'})
# Session maintains cookies automatically
response = session.get('https://api.example.com/profile')
# Set default headers for all requests in session
session.headers.update({'User-Agent': 'MyApp/1.0'})
# Multiple requests with same session
for i in range(10):
response = session.get(f'https://api.example.com/data/{i}')
Error Handling
import requests
from requests.exceptions import RequestException, Timeout, ConnectionError
try:
response = requests.get('https://api.example.com/data', timeout=5)
response.raise_for_status() # Raises HTTPError for bad responses
data = response.json()
except Timeout:
print("Request timed out")
except ConnectionError:
print("Connection error")
except requests.HTTPError as e:
print(f"HTTP error: {e}")
except requests.RequestException as e:
print(f"Request error: {e}")
except ValueError: # JSON decode error
print("Invalid JSON response")
HTTP Server - Built-in Modules
urllib - Lower Level HTTP Client
import urllib.request
import urllib.parse
import json
# GET request
response = urllib.request.urlopen('https://api.github.com/users/octocat')
data = response.read().decode('utf-8')
user_data = json.loads(data)
# POST request
data = {'name': 'John', 'age': 30}
post_data = urllib.parse.urlencode(data).encode('utf-8')
req = urllib.request.Request('https://api.example.com/users', data=post_data)
response = urllib.request.urlopen(req)
# With headers
req = urllib.request.Request('https://api.example.com/data')
req.add_header('User-Agent', 'MyApp/1.0')
req.add_header('Authorization', 'Bearer token123')
response = urllib.request.urlopen(req)
Simple HTTP Server
import http.server
import socketserver
from urllib.parse import urlparse, parse_qs
class CustomHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
# Parse URL and query parameters
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
if parsed_url.path == '/api/users':
# Return JSON response
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_data = {'users': ['John', 'Jane', 'Bob']}
self.wfile.write(json.dumps(response_data).encode())
else:
# Serve static files
super().do_GET()
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'message': 'Data received', 'data': post_data.decode()}
self.wfile.write(json.dumps(response).encode())
# Start server
PORT = 8000
with socketserver.TCPServer(("", PORT), CustomHandler) as httpd:
print(f"Server running on port {PORT}")
httpd.serve_forever()
JSON APIs
JSON Parsing and Creation
import json
# Parse JSON string
json_string = '{"name": "John", "age": 30, "city": "New York"}'
data = json.loads(json_string)
print(data['name']) # John
# Convert Python object to JSON
person = {'name': 'Jane', 'age': 25, 'skills': ['Python', 'JavaScript']}
json_string = json.dumps(person)
print(json_string) # {"name": "Jane", "age": 25, "skills": ["Python", "JavaScript"]}
# Pretty print JSON
json_string = json.dumps(person, indent=2)
print(json_string)
# Handle special types
from datetime import datetime
data = {'timestamp': datetime.now()}
json_string = json.dumps(data, default=str) # Convert non-serializable to string
Working with JSON Files
# Read JSON from file
with open('data.json', 'r') as f:
data = json.load(f)
# Write JSON to file
data = {'users': [{'name': 'John', 'age': 30}]}
with open('output.json', 'w') as f:
json.dump(data, f, indent=2)
# Handle encoding issues
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
API Response Handling
import requests
def fetch_user_data(user_id):
"""Fetch user data from API with proper error handling"""
try:
response = requests.get(f'https://api.example.com/users/{user_id}')
response.raise_for_status()
# Check content type
if 'application/json' in response.headers.get('content-type', ''):
return response.json()
else:
print("Response is not JSON")
return None
except requests.exceptions.JSONDecodeError:
print("Invalid JSON response")
return None
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
# Usage
user_data = fetch_user_data(123)
if user_data:
print(f"User: {user_data['name']}")
Web Scraping with BeautifulSoup
Basic Web Scraping
import requests
from bs4 import BeautifulSoup
# Fetch and parse HTML
url = 'https://example.com'
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# Find elements
title = soup.find('title').text
headings = soup.find_all('h1')
paragraphs = soup.find_all('p')
# Find by class and ID
content = soup.find('div', class_='content')
header = soup.find('div', id='header')
# CSS selectors
articles = soup.select('article.post')
links = soup.select('a[href]')
Advanced Scraping Techniques
from bs4 import BeautifulSoup
import requests
import time
def scrape_with_retry(url, max_retries=3):
"""Scrape with retry logic"""
for attempt in range(max_retries):
try:
headers = {'User-Agent': 'Mozilla/5.0 (compatible; Web Scraper)'}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return BeautifulSoup(response.content, 'html.parser')
except requests.RequestException as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise e
# Extract data from tables
soup = scrape_with_retry('https://example.com/table-data')
table = soup.find('table')
rows = table.find_all('tr')
data = []
for row in rows[1:]: # Skip header
cells = row.find_all(['td', 'th'])
row_data = [cell.text.strip() for cell in cells]
data.append(row_data)
# Handle forms
form = soup.find('form')
inputs = form.find_all('input')
form_data = {}
for input_tag in inputs:
name = input_tag.get('name')
value = input_tag.get('value', '')
if name:
form_data[name] = value
Handling Dynamic Content
# For JavaScript-heavy sites, use Selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def scrape_dynamic_content(url):
"""Scrape JavaScript-rendered content"""
driver = webdriver.Chrome() # Requires chromedriver
try:
driver.get(url)
# Wait for specific element to load
wait = WebDriverWait(driver, 10)
element = wait.until(EC.presence_of_element_located((By.CLASS_NAME, "content")))
# Get page source after JS execution
soup = BeautifulSoup(driver.page_source, 'html.parser')
return soup
finally:
driver.quit()
# Headless browser option
options = webdriver.ChromeOptions()
options.add_argument('--headless')
driver = webdriver.Chrome(options=options)
Flask Web Framework
Basic Flask Application
from flask import Flask, request, jsonify, render_template
app = Flask(__name__)
# Basic route
@app.route('/')
def home():
return '<h1>Hello, World!</h1>'
# Route with parameters
@app.route('/user/<string:name>')
def user_profile(name):
return f'<h1>Hello, {name}!</h1>'
# Multiple HTTP methods
@app.route('/api/users', methods=['GET', 'POST'])
def users():
if request.method == 'GET':
return jsonify({'users': ['John', 'Jane']})
elif request.method == 'POST':
data = request.get_json()
return jsonify({'message': 'User created', 'user': data})
# Query parameters
@app.route('/search')
def search():
query = request.args.get('q', '')
page = request.args.get('page', 1, type=int)
return f'Searching for: {query} (page {page})'
if __name__ == '__main__':
app.run(debug=True)
Templates and Forms
from flask import Flask, render_template, request, redirect, url_for, flash
app = Flask(__name__)
app.secret_key = 'your-secret-key'
@app.route('/')
def index():
return render_template('index.html', title='Home')
@app.route('/contact', methods=['GET', 'POST'])
def contact():
if request.method == 'POST':
name = request.form['name']
email = request.form['email']
message = request.form['message']
# Process form data
flash('Message sent successfully!', 'success')
return redirect(url_for('contact'))
return render_template('contact.html')
# File upload
@app.route('/upload', methods=['GET', 'POST'])
def upload():
if request.method == 'POST':
file = request.files['file']
if file and file.filename:
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return 'File uploaded successfully'
return render_template('upload.html')
Flask REST API
from flask import Flask, request, jsonify
from flask_cors import CORS
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# In-memory database
users = []
next_id = 1
@app.route('/api/users', methods=['GET'])
def get_users():
return jsonify({'users': users})
@app.route('/api/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = next((u for u in users if u['id'] == user_id), None)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({'user': user})
@app.route('/api/users', methods=['POST'])
def create_user():
global next_id
data = request.get_json()
# Validation
if not data or 'name' not in data:
return jsonify({'error': 'Name is required'}), 400
user = {
'id': next_id,
'name': data['name'],
'email': data.get('email', '')
}
users.append(user)
next_id += 1
return jsonify({'user': user}), 201
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
user = next((u for u in users if u['id'] == user_id), None)
if not user:
return jsonify({'error': 'User not found'}), 404
data = request.get_json()
user.update(data)
return jsonify({'user': user})
@app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
global users
users = [u for u in users if u['id'] != user_id]
return jsonify({'message': 'User deleted'}), 204
FastAPI Framework
Basic FastAPI Application
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# Pydantic models for request/response validation
class User(BaseModel):
name: str
email: str
age: Optional[int] = None
class UserResponse(BaseModel):
id: int
name: str
email: str
age: Optional[int] = None
# In-memory database
users_db = []
next_id = 1
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/users", response_model=List[UserResponse])
async def get_users():
return users_db
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
user = next((u for u in users_db if u["id"] == user_id), None)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users", response_model=UserResponse)
async def create_user(user: User):
global next_id
user_dict = user.dict()
user_dict["id"] = next_id
users_db.append(user_dict)
next_id += 1
return user_dict
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: User):
existing_user = next((u for u in users_db if u["id"] == user_id), None)
if not existing_user:
raise HTTPException(status_code=404, detail="User not found")
existing_user.update(user.dict())
return existing_user
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
global users_db
users_db = [u for u in users_db if u["id"] != user_id]
return {"message": "User deleted"}
Advanced FastAPI Features
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import asyncio
import aiohttp
app = FastAPI()
security = HTTPBearer()
# Dependency injection
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
if token != "valid-token":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return {"user_id": 1, "username": "john"}
# Background tasks
from fastapi import BackgroundTasks
def send_email(email: str, message: str):
# Simulate sending email
print(f"Sending email to {email}: {message}")
@app.post("/send-notification/")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(send_email, email, "Welcome!")
return {"message": "Notification sent"}
# Async endpoint with external API call
@app.get("/external-data")
async def get_external_data():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
data = await response.json()
return data
# WebSocket support
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except:
pass
FastAPI with Database
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
# Database setup
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Database model
class UserDB(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
Base.metadata.create_all(bind=engine)
# Pydantic models
class UserCreate(BaseModel):
name: str
email: str
class UserResponse(BaseModel):
id: int
name: str
email: str
class Config:
orm_mode = True
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users", response_model=UserResponse)
async def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = UserDB(**user.dict())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(UserDB).filter(UserDB.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
Authentication and Authorization
JWT Token Authentication
import jwt
from datetime import datetime, timedelta
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
def generate_token(user_id):
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def verify_token(token):
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['user_id']
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token is missing'}), 401
if token.startswith('Bearer '):
token = token[7:]
user_id = verify_token(token)
if not user_id:
return jsonify({'error': 'Invalid token'}), 401
return f(user_id, *args, **kwargs)
return decorated
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
# Validate credentials (implement your logic)
if username == 'admin' and password == 'password':
token = generate_token(user_id=1)
return jsonify({'token': token})
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/protected')
@token_required
def protected_route(user_id):
return jsonify({'message': f'Hello user {user_id}', 'data': 'sensitive data'})
Session-Based Authentication
from flask import Flask, request, session, jsonify
import hashlib
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# Mock user database
users = {
'admin': {
'password': hashlib.sha256('password'.encode()).hexdigest(),
'role': 'admin'
}
}
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
user = users.get(username)
if user and user['password'] == hash_password(password):
session['user_id'] = username
session['role'] = user['role']
return jsonify({'message': 'Login successful'})
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/logout', methods=['POST'])
def logout():
session.clear()
return jsonify({'message': 'Logout successful'})
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if 'user_id' not in session:
return jsonify({'error': 'Authentication required'}), 401
return f(*args, **kwargs)
return decorated
@app.route('/profile')
@login_required
def profile():
return jsonify({
'user_id': session['user_id'],
'role': session['role']
})
OAuth 2.0 Implementation
from flask import Flask, request, redirect, session, jsonify
import requests
from urllib.parse import urlencode
app = Flask(__name__)
app.secret_key = 'your-secret-key'
# OAuth configuration
OAUTH_CONFIG = {
'client_id': 'your-github-client-id',
'client_secret': 'your-github-client-secret',
'authorization_url': 'https://github.com/login/oauth/authorize',
'token_url': 'https://github.com/login/oauth/access_token',
'user_info_url': 'https://api.github.com/user',
'redirect_uri': 'http://localhost:5000/callback'
}
@app.route('/login')
def login():
params = {
'client_id': OAUTH_CONFIG['client_id'],
'redirect_uri': OAUTH_CONFIG['redirect_uri'],
'scope': 'user:email'
}
auth_url = f"{OAUTH_CONFIG['authorization_url']}?{urlencode(params)}"
return redirect(auth_url)
@app.route('/callback')
def callback():
code = request.args.get('code')
if not code:
return jsonify({'error': 'Authorization code not provided'}), 400
# Exchange code for access token
token_data = {
'client_id': OAUTH_CONFIG['client_id'],
'client_secret': OAUTH_CONFIG['client_secret'],
'code': code,
'redirect_uri': OAUTH_CONFIG['redirect_uri']
}
headers = {'Accept': 'application/json'}
response = requests.post(OAUTH_CONFIG['token_url'], data=token_data, headers=headers)
token_info = response.json()
if 'access_token' not in token_info:
return jsonify({'error': 'Failed to get access token'}), 400
# Get user info
headers = {'Authorization': f"token {token_info['access_token']}"}
user_response = requests.get(OAUTH_CONFIG['user_info_url'], headers=headers)
user_info = user_response.json()
# Store user info in session
session['user_id'] = user_info['id']
session['username'] = user_info['login']
session['access_token'] = token_info['access_token']
return jsonify({'message': 'Login successful', 'user': user_info})
API Design Patterns and Best Practices
RESTful API Design
from flask import Flask, request, jsonify
from http import HTTPStatus
app = Flask(__name__)
# Resource-based URLs
@app.route('/api/v1/users', methods=['GET'])
def get_users():
"""GET /api/v1/users - Get all users"""
users = get_all_users() # Your database logic
return jsonify({
'data': users,
'meta': {
'count': len(users),
'page': 1,
'per_page': 10
}
})
@app.route('/api/v1/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
"""GET /api/v1/users/123 - Get specific user"""
user = get_user_by_id(user_id)
if not user:
return jsonify({'error': 'User not found'}), HTTPStatus.NOT_FOUND
return jsonify({'data': user})
@app.route('/api/v1/users', methods=['POST'])
def create_user():
"""POST /api/v1/users - Create new user"""
data = request.get_json()
# Validation
if not data or 'name' not in data:
return jsonify({'error': 'Name is required'}), HTTPStatus.BAD_REQUEST
user = create_new_user(data)
return jsonify({'data': user}), HTTPStatus.CREATED
@app.route('/api/v1/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""PUT /api/v1/users/123 - Update entire user"""
data = request.get_json()
user = update_user_by_id(user_id, data)
if not user:
return jsonify({'error': 'User not found'}), HTTPStatus.NOT_FOUND
return jsonify({'data': user})
@app.route('/api/v1/users/<int:user_id>', methods=['PATCH'])
def partial_update_user(user_id):
"""PATCH /api/v1/users/123 - Partial update"""
data = request.get_json()
user = partial_update_user_by_id(user_id, data)
if not user:
return jsonify({'error': 'User not found'}), HTTPStatus.NOT_FOUND
return jsonify({'data': user})
@app.route('/api/v1/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""DELETE /api/v1/users/123 - Delete user"""
success = delete_user_by_id(user_id)
if not success:
return jsonify({'error': 'User not found'}), HTTPStatus.NOT_FOUND
return '', HTTPStatus.NO_CONTENT
API Versioning
from flask import Flask, Blueprint
app = Flask(__name__)
# Version 1 API
v1_bp = Blueprint('v1', __name__, url_prefix='/api/v1')
@v1_bp.route('/users')
def get_users_v1():
return jsonify({'users': [], 'version': '1.0'})
# Version 2 API
v2_bp = Blueprint('v2', __name__, url_prefix='/api/v2')
@v2_bp.route('/users')
def get_users_v2():
return jsonify({
'data': [],
'meta': {'version': '2.0', 'count': 0}
})
app.register_blueprint(v1_bp)
app.register_blueprint(v2_bp)
# Header-based versioning
@app.route('/api/users')
def get_users():
version = request.headers.get('API-Version', '1.0')
if version == '2.0':
return get_users_v2()
return get_users_v1()
Rate Limiting
from flask import Flask, request, jsonify
from functools import wraps
import time
from collections import defaultdict
app = Flask(__name__)
# Simple in-memory rate limiter
rate_limit_storage = defaultdict(list)
def rate_limit(max_requests=100, window=3600):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
client_ip = request.remote_addr
now = time.time()
# Clean old requests
rate_limit_storage[client_ip] = [
req_time for req_time in rate_limit_storage[client_ip]
if now - req_time < window
]
# Check rate limit
if len(rate_limit_storage[client_ip]) >= max_requests:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': window
}), 429
# Add current request
rate_limit_storage[client_ip].append(now)
return f(*args, **kwargs)
return decorated
return decorator
@app.route('/api/data')
@rate_limit(max_requests=10, window=60) # 10 requests per minute
def get_data():
return jsonify({'data': 'sensitive information'})
API Documentation with OpenAPI
from flask import Flask
from flask_restx import Api, Resource, fields
app = Flask(__name__)
api = Api(app, doc='/docs/', title='User API', version='1.0')
# Define models for documentation
user_model = api.model('User', {
'id': fields.Integer(required=True, description='User ID'),
'name': fields.String(required=True, description='User name'),
'email': fields.String(required=True, description='User email')
})
user_input_model = api.model('UserInput', {
'name': fields.String(required=True, description='User name'),
'email': fields.String(required=True, description='User email')
})
@api.route('/users')
class UserList(Resource):
@api.doc('get_users')
@api.marshal_list_with(user_model)
def get(self):
"""Get all users"""
return users_db
@api.doc('create_user')
@api.expect(user_input_model)
@api.marshal_with(user_model, code=201)
def post(self):
"""Create a new user"""
data = api.payload
user = create_user(data)
return user, 201
@api.route('/users/<int:user_id>')
class User(Resource):
@api.doc('get_user')
@api.marshal_with(user_model)
def get(self, user_id):
"""Get a specific user"""
user = get_user_by_id(user_id)
if not user:
api.abort(404, 'User not found')
return user
Error Handling in Web Applications
Flask Error Handling
from flask import Flask, jsonify, request
from werkzeug.exceptions import HTTPException
import logging
app = Flask(__name__)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Custom exception classes
class ValidationError(Exception):
def __init__(self, message, status_code=400):
super().__init__(message)
self.message = message
self.status_code = status_code
class DatabaseError(Exception):
pass
# Global error handlers
@app.errorhandler(ValidationError)
def handle_validation_error(error):
logger.warning(f"Validation error: {error.message}")
return jsonify({
'error': 'Validation Error',
'message': error.message
}), error.status_code
@app.errorhandler(DatabaseError)
def handle_database_error(error):
logger.error(f"Database error: {str(error)}")
return jsonify({
'error': 'Internal Server Error',
'message': 'Database operation failed'
}), 500
@app.errorhandler(HTTPException)
def handle_http_exception(error):
logger.info(f"HTTP error: {error.code} - {error.description}")
return jsonify({
'error': error.name,
'message': error.description
}), error.code
@app.errorhandler(Exception)
def handle_generic_error(error):
logger.error(f"Unexpected error: {str(error)}")
return jsonify({
'error': 'Internal Server Error',
'message': 'An unexpected error occurred'
}), 500
# Route with error handling
@app.route('/api/users', methods=['POST'])
def create_user():
try:
data = request.get_json()
# Validation
if not data:
raise ValidationError('No data provided')
if 'name' not in data or len(data['name']) < 2:
raise ValidationError('Name must be at least 2 characters long')
if 'email' not in data or '@' not in data['email']:
raise ValidationError('Valid email is required')
# Database operation
user = create_user_in_db(data)
return jsonify({'data': user}), 201
except ValidationError:
raise # Re-raise to be handled by error handler
except Exception as e:
logger.error(f"Error creating user: {str(e)}")
raise DatabaseError(f"Failed to create user: {str(e)}")
FastAPI Error Handling
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import logging
app = FastAPI()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Custom exception classes
class DatabaseException(Exception):
def __init__(self, message: str):
self.message = message
class BusinessLogicException(Exception):
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
# Custom exception handlers
@app.exception_handler(DatabaseException)
async def database_exception_handler(request: Request, exc: DatabaseException):
logger.error(f"Database error: {exc.message}")
return JSONResponse(
status_code=500,
content={
"error": "Database Error",
"message": "Internal server error occurred"
}
)
@app.exception_handler(BusinessLogicException)
async def business_logic_exception_handler(request: Request, exc: BusinessLogicException):
logger.warning(f"Business logic error: {exc.message}")
return JSONResponse(
status_code=exc.status_code,
content={
"error": "Business Logic Error",
"message": exc.message
}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
logger.warning(f"Validation error: {exc.errors()}")
return JSONResponse(
status_code=422,
content={
"error": "Validation Error",
"message": "Request validation failed",
"details": exc.errors()
}
)
# Route with error handling
@app.post("/users")
async def create_user(user_data: dict):
try:
# Business logic validation
if len(user_data.get("name", "")) < 2:
raise BusinessLogicException("Name must be at least 2 characters long")
# Database operation
user = await create_user_in_db(user_data)
return {"data": user}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
raise DatabaseException(f"Failed to create user: {str(e)}")
Testing Web Applications
Testing Flask Applications
import unittest
import json
from app import app, users_db # Your Flask app
class TestUserAPI(unittest.TestCase):
def setUp(self):
self.app = app.test_client()
self.app.testing = True
users_db.clear() # Clear test database
def test_get_users_empty(self):
response = self.app.get('/api/users')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(len(data['users']), 0)
def test_create_user(self):
user_data = {'name': 'John Doe', 'email': 'john@example.com'}
response = self.app.post('/api/users',
json=user_data,
content_type='application/json')
self.assertEqual(response.status_code, 201)
data = json.loads(response.data)
self.assertEqual(data['user']['name'], 'John Doe')
def test_create_user_validation(self):
# Test missing name
user_data = {'email': 'john@example.com'}
response = self.app.post('/api/users',
json=user_data,
content_type='application/json')
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertIn('error', data)
def test_get_user_not_found(self):
response = self.app.get('/api/users/999')
self.assertEqual(response.status_code, 404)
def test_authentication_required(self):
response = self.app.get('/api/protected')
self.assertEqual(response.status_code, 401)
def test_with_authentication(self):
# Login first
login_data = {'username': 'admin', 'password': 'password'}
login_response = self.app.post('/login', json=login_data)
token = json.loads(login_response.data)['token']
# Make authenticated request
headers = {'Authorization': f'Bearer {token}'}
response = self.app.get('/api/protected', headers=headers)
self.assertEqual(response.status_code, 200)
if __name__ == '__main__':
unittest.main()
Testing with pytest and fixtures
import pytest
import json
from app import app, db, User
@pytest.fixture
def client():
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
with app.test_client() as client:
with app.app_context():
db.create_all()
yield client
db.drop_all()
@pytest.fixture
def sample_user():
return {
'name': 'John Doe',
'email': 'john@example.com',
'age': 30
}
def test_create_user(client, sample_user):
response = client.post('/api/users', json=sample_user)
assert response.status_code == 201
data = json.loads(response.data)
assert data['user']['name'] == sample_user['name']
def test_get_users(client, sample_user):
# Create a user first
client.post('/api/users', json=sample_user)
# Get all users
response = client.get('/api/users')
assert response.status_code == 200
data = json.loads(response.data)
assert len(data['users']) == 1
@pytest.mark.parametrize("invalid_data,expected_error", [
({}, "Name is required"),
({'name': ''}, "Name is required"),
({'name': 'John'}, "Email is required"),
({'name': 'John', 'email': 'invalid'}, "Invalid email format"),
])
def test_create_user_validation(client, invalid_data, expected_error):
response = client.post('/api/users', json=invalid_data)
assert response.status_code == 400
data = json.loads(response.data)
assert expected_error in data['error']
Testing FastAPI Applications
from fastapi.testclient import TestClient
import pytest
from app import app
client = TestClient(app)
def test_read_root():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello World"}
def test_create_user():
user_data = {"name": "John Doe", "email": "john@example.com"}
response = client.post("/users", json=user_data)
assert response.status_code == 201
data = response.json()
assert data["name"] == "John Doe"
assert "id" in data
def test_get_user():
# Create user first
user_data = {"name": "Jane Doe", "email": "jane@example.com"}
create_response = client.post("/users", json=user_data)
user_id = create_response.json()["id"]
# Get user
response = client.get(f"/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["name"] == "Jane Doe"
def test_get_user_not_found():
response = client.get("/users/999")
assert response.status_code == 404
assert response.json()["detail"] == "User not found"
@pytest.mark.asyncio
async def test_async_endpoint():
response = client.get("/external-data")
assert response.status_code == 200
# Add more assertions based on your endpoint
Mock External APIs
import unittest
from unittest.mock import patch, Mock
import requests
from my_app import fetch_user_data
class TestExternalAPI(unittest.TestCase):
@patch('requests.get')
def test_fetch_user_data_success(self, mock_get):
# Mock successful API response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
'id': 1,
'name': 'John Doe',
'email': 'john@example.com'
}
mock_get.return_value = mock_response
# Test the function
result = fetch_user_data(1)
# Assertions
mock_get.assert_called_once_with('https://api.example.com/users/1')
self.assertEqual(result['name'], 'John Doe')
@patch('requests.get')
def test_fetch_user_data_timeout(self, mock_get):
# Mock timeout exception
mock_get.side_effect = requests.exceptions.Timeout()
# Test the function
result = fetch_user_data(1)
# Assertions
self.assertIsNone(result)
@patch('requests.get')
def test_fetch_user_data_http_error(self, mock_get):
# Mock HTTP error
mock_response = Mock()
mock_response.status_code = 404
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError()
mock_get.return_value = mock_response
# Test the function
result = fetch_user_data(1)
# Assertions
self.assertIsNone(result)
# Using pytest with fixtures
@pytest.fixture
def mock_api_response():
return {
'id': 1,
'name': 'John Doe',
'email': 'john@example.com'
}
def test_fetch_user_data_with_pytest(mock_api_response):
with patch('requests.get') as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = mock_api_response
mock_get.return_value = mock_response
result = fetch_user_data(1)
assert result['name'] == 'John Doe'
assert result['email'] == 'john@example.com'
Performance and Optimization
Caching Strategies
from functools import lru_cache
import time
import redis
from flask import Flask
# In-memory caching with LRU
@lru_cache(maxsize=128)
def expensive_computation(n):
time.sleep(1) # Simulate expensive operation
return n * n
# Redis caching
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cached_api_call(url, expiry=300):
# Check cache first
cached_result = redis_client.get(f"api_cache:{url}")
if cached_result:
return json.loads(cached_result)
# Make API call
response = requests.get(url)
data = response.json()
# Cache result
redis_client.setex(f"api_cache:{url}", expiry, json.dumps(data))
return data
# Flask-Caching
from flask_caching import Cache
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
@app.route('/users')
@cache.cached(timeout=300) # Cache for 5 minutes
def get_users():
users = fetch_users_from_db() # Expensive DB query
return jsonify(users)
# Cache invalidation
@app.route('/users', methods=['POST'])
def create_user():
user = create_user_in_db(request.json)
cache.delete('view//users') # Invalidate cache
return jsonify(user)
Async Programming
import asyncio
import aiohttp
from fastapi import FastAPI
app = FastAPI()
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.json()
@app.get("/aggregate-data")
async def aggregate_data():
urls = [
"https://api.service1.com/data",
"https://api.service2.com/data",
"https://api.service3.com/data"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks)
# Process and combine results
aggregated = {"services": results}
return aggregated
# Database async operations
import asyncpg
async def get_users_async():
conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
try:
users = await conn.fetch("SELECT * FROM users")
return [dict(user) for user in users]
finally:
await conn.close()
@app.get("/users-async")
async def get_users():
users = await get_users_async()
return {"users": users}
This comprehensive cheatsheet covers the essential aspects of Python web development and APIs. It includes practical examples for HTTP clients, web frameworks, authentication, API design, error handling, and testing - all formatted for quick reference and copy-paste usage.