Error Handling & Debugging
Exception Handling Fundamentals
Basic Try/Except Structure
# Basic exception handling
try:
result = risky_operation()
except SpecificError:
handle_specific_error()
except (Error1, Error2):
handle_multiple_errors()
except Exception as e:
handle_any_error(e)
else:
# Runs if no exception occurred
success_operation()
finally:
# Always runs
cleanup_operation()
Exception Flow Control
# Complete exception handling structure
def divide_numbers(a, b):
try:
print(f"Attempting to divide {a} by {b}")
result = a / b
except ZeroDivisionError:
print("Cannot divide by zero!")
return None
except TypeError:
print("Both arguments must be numbers!")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
else:
print("Division successful!")
return result
finally:
print("Division operation completed")
# Usage
result = divide_numbers(10, 2) # Normal case
result = divide_numbers(10, 0) # ZeroDivisionError
result = divide_numbers(10, "2") # TypeError
Exception Chaining and Context
# Exception chaining with raise ... from
def process_data(data):
try:
return int(data)
except ValueError as e:
raise ProcessingError(f"Failed to process {data}") from e
# Implicit exception chaining
def nested_error():
try:
1 / 0
except ZeroDivisionError:
raise ValueError("Something went wrong") # Chained automatically
# Suppress exception chaining
def suppress_chain():
try:
1 / 0
except ZeroDivisionError:
raise ValueError("New error") from None # Suppress chain
# Access exception context
try:
nested_error()
except ValueError as e:
print(f"Current exception: {e}")
print(f"Caused by: {e.__cause__}")
print(f"Context: {e.__context__}")
Built-in Exception Types
Common Built-in Exceptions
# ValueError - Wrong value type
int("not_a_number") # ValueError
# TypeError - Wrong type
"string" + 5 # TypeError
# KeyError - Missing dictionary key
my_dict = {"a": 1}
my_dict["b"] # KeyError
# IndexError - List index out of range
my_list = [1, 2, 3]
my_list[10] # IndexError
# AttributeError - Missing attribute
my_string = "hello"
my_string.nonexistent_method() # AttributeError
# FileNotFoundError - File doesn't exist
open("nonexistent_file.txt") # FileNotFoundError
# ImportError/ModuleNotFoundError - Module issues
import nonexistent_module # ModuleNotFoundError
# SyntaxError - Invalid syntax (compile time)
# eval("invalid syntax here") # SyntaxError
# NameError - Undefined variable
print(undefined_variable) # NameError
# ZeroDivisionError - Division by zero
10 / 0 # ZeroDivisionError
Exception Hierarchy Example
# Understanding exception hierarchy
def handle_exceptions():
try:
# Various operations that might fail
pass
except KeyError:
print("Specific key error")
except LookupError:
print("General lookup error (parent of KeyError)")
except Exception:
print("Any other exception")
except BaseException:
print("System-level exceptions")
# Check exception inheritance
print(issubclass(KeyError, LookupError)) # True
print(issubclass(ValueError, Exception)) # True
Custom Exceptions
Creating Custom Exceptions
# Basic custom exception
class CustomError(Exception):
pass
# Custom exception with attributes
class ValidationError(Exception):
def __init__(self, message, field=None, value=None):
super().__init__(message)
self.field = field
self.value = value
def __str__(self):
base = super().__str__()
if self.field:
return f"{base} (field: {self.field}, value: {self.value})"
return base
# Exception with custom behavior
class RetryableError(Exception):
def __init__(self, message, max_retries=3):
super().__init__(message)
self.max_retries = max_retries
self.retry_count = 0
def can_retry(self):
return self.retry_count < self.max_retries
def increment_retry(self):
self.retry_count += 1
# Usage examples
try:
raise ValidationError("Invalid email", field="email", value="invalid@")
except ValidationError as e:
print(f"Validation failed: {e}")
print(f"Field: {e.field}, Value: {e.value}")
Exception Hierarchies
# Create exception hierarchy for a web application
class AppError(Exception):
"""Base exception for application"""
pass
class ValidationError(AppError):
"""Data validation errors"""
pass
class AuthenticationError(AppError):
"""Authentication failures"""
pass
class PermissionError(AppError):
"""Permission denied errors"""
pass
class DatabaseError(AppError):
"""Database operation errors"""
pass
class ConnectionError(DatabaseError):
"""Database connection errors"""
pass
# Usage with specific handling
def handle_app_errors():
try:
# Application operations
pass
except ValidationError:
# Handle validation specifically
pass
except AuthenticationError:
# Handle auth specifically
pass
except DatabaseError:
# Handle all database errors
pass
except AppError:
# Handle any app error
pass
Debugging Techniques
Using pdb Debugger
import pdb
def problematic_function(data):
result = []
for item in data:
# Set breakpoint here
pdb.set_trace()
processed = item * 2
result.append(processed)
return result
# Python 3.7+ breakpoint() function
def modern_debugging(data):
result = []
for item in data:
breakpoint() # Modern way to set breakpoint
processed = item * 2
result.append(processed)
return result
# Conditional breakpoint
def conditional_debug(data):
for i, item in enumerate(data):
if i > 5: # Only break after 5 iterations
breakpoint()
process_item(item)
pdb Commands Reference
# Common pdb commands (use in debugger)
"""
l (list) - Show current code
n (next) - Execute next line
s (step) - Step into function calls
c (continue) - Continue execution
p <var> - Print variable value
pp <var> - Pretty print variable
w (where) - Show stack trace
u (up) - Go up in stack
d (down) - Go down in stack
q (quit) - Quit debugger
h (help) - Show help
"""
# Post-mortem debugging
def run_with_postmortem():
try:
problematic_function([1, 2, 3, "invalid"])
except Exception:
import pdb
pdb.post_mortem() # Debug at point of failure
Advanced Debugging Techniques
import traceback
import sys
# Custom exception handler
def debug_exception_handler(exc_type, exc_value, exc_traceback):
if exc_type is KeyboardInterrupt:
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
print("=" * 60)
print("UNCAUGHT EXCEPTION")
print("=" * 60)
traceback.print_exception(exc_type, exc_value, exc_traceback)
# Start debugger on uncaught exception
import pdb
pdb.post_mortem(exc_traceback)
# Install custom exception handler
sys.excepthook = debug_exception_handler
# Trace function calls
def trace_function_calls(frame, event, arg):
if event == 'call':
filename = frame.f_code.co_filename
lineno = frame.f_lineno
function_name = frame.f_code.co_name
print(f"Calling {function_name} at {filename}:{lineno}")
return trace_function_calls
# Enable tracing
def debug_with_trace():
sys.settrace(trace_function_calls)
# Your code here
problematic_function([1, 2, 3])
sys.settrace(None) # Disable tracing
Logging
Basic Logging Setup
import logging
# Basic configuration
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
# Create logger
logger = logging.getLogger(__name__)
# Log levels
logger.debug("Debug message")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
logger.critical("Critical message")
# Log with exception info
try:
1 / 0
except ZeroDivisionError:
logger.exception("Division by zero occurred")
# Or manually:
logger.error("Division by zero", exc_info=True)
Advanced Logging Configuration
import logging
import logging.handlers
from datetime import datetime
# Custom formatter
class ColoredFormatter(logging.Formatter):
COLORS = {
'DEBUG': '\033[36m', # Cyan
'INFO': '\033[32m', # Green
'WARNING': '\033[33m', # Yellow
'ERROR': '\033[31m', # Red
'CRITICAL': '\033[35m', # Magenta
'RESET': '\033[0m' # Reset
}
def format(self, record):
color = self.COLORS.get(record.levelname, self.COLORS['RESET'])
record.levelname = f"{color}{record.levelname}{self.COLORS['RESET']}"
return super().format(record)
# Setup comprehensive logging
def setup_logging():
# Create logger
logger = logging.getLogger('my_app')
logger.setLevel(logging.DEBUG)
# Console handler with colors
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = ColoredFormatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
# File handler with rotation
file_handler = logging.handlers.RotatingFileHandler(
'app.log', maxBytes=10*1024*1024, backupCount=5
)
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(file_formatter)
# Add handlers
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
# Usage
logger = setup_logging()
logger.info("Application started")
Structured Logging
import logging
import json
from datetime import datetime
class JsonFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Add exception info if present
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
return json.dumps(log_entry)
# Context manager for request logging
class RequestContext:
def __init__(self, request_id, user_id=None):
self.request_id = request_id
self.user_id = user_id
self.old_factory = logging.getLogRecordFactory()
def __enter__(self):
def record_factory(*args, **kwargs):
record = self.old_factory(*args, **kwargs)
record.request_id = self.request_id
if self.user_id:
record.user_id = self.user_id
return record
logging.setLogRecordFactory(record_factory)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
logging.setLogRecordFactory(self.old_factory)
# Usage
with RequestContext("req-123", "user-456"):
logger.info("Processing user request")
Testing
unittest Framework
import unittest
from unittest.mock import Mock, patch, MagicMock
class TestCalculator(unittest.TestCase):
def setUp(self):
"""Run before each test method"""
self.calc = Calculator()
def tearDown(self):
"""Run after each test method"""
pass
def test_addition(self):
"""Test basic addition"""
result = self.calc.add(2, 3)
self.assertEqual(result, 5)
def test_division_by_zero(self):
"""Test exception handling"""
with self.assertRaises(ZeroDivisionError):
self.calc.divide(10, 0)
def test_division_by_zero_message(self):
"""Test exception message"""
with self.assertRaisesRegex(ZeroDivisionError, "division by zero"):
self.calc.divide(10, 0)
def test_with_mock(self):
"""Test with mocked dependencies"""
with patch('calculator.database.get_value') as mock_get:
mock_get.return_value = 42
result = self.calc.get_stored_value()
self.assertEqual(result, 42)
mock_get.assert_called_once()
@unittest.skip("Temporarily disabled")
def test_skip_example(self):
"""This test will be skipped"""
pass
@unittest.skipIf(sys.platform == "win32", "Not supported on Windows")
def test_unix_specific(self):
"""This test only runs on Unix"""
pass
# Test suite
def suite():
suite = unittest.TestSuite()
suite.addTest(TestCalculator('test_addition'))
suite.addTest(TestCalculator('test_division_by_zero'))
return suite
if __name__ == '__main__':
unittest.main(verbosity=2)
pytest Framework
import pytest
from unittest.mock import Mock, patch
# Basic test
def test_addition():
assert add(2, 3) == 5
# Exception testing
def test_division_by_zero():
with pytest.raises(ZeroDivisionError):
divide(10, 0)
def test_exception_message():
with pytest.raises(ZeroDivisionError, match="division by zero"):
divide(10, 0)
# Parametrized tests
@pytest.mark.parametrize("a,b,expected", [
(2, 3, 5),
(0, 5, 5),
(-1, 1, 0),
(10, -5, 5)
])
def test_addition_parametrized(a, b, expected):
assert add(a, b) == expected
# Fixtures
@pytest.fixture
def calculator():
"""Create a calculator instance for testing"""
return Calculator()
@pytest.fixture
def sample_data():
"""Provide sample data for tests"""
return [1, 2, 3, 4, 5]
def test_with_fixture(calculator, sample_data):
"""Test using fixtures"""
result = calculator.sum_list(sample_data)
assert result == 15
# Mocking with pytest
def test_with_mock(mocker):
"""Test with pytest-mock plugin"""
mock_api = mocker.patch('my_module.external_api_call')
mock_api.return_value = {'status': 'success'}
result = my_function()
assert result == {'status': 'success'}
mock_api.assert_called_once()
# Custom markers
@pytest.mark.slow
def test_slow_operation():
"""Test marked as slow"""
pass
@pytest.mark.integration
def test_database_integration():
"""Integration test"""
pass
# Run specific markers: pytest -m "not slow"
doctest for Documentation Testing
def factorial(n):
"""
Calculate factorial of n.
>>> factorial(0)
1
>>> factorial(1)
1
>>> factorial(5)
120
>>> factorial(-1)
Traceback (most recent call last):
...
ValueError: Factorial is not defined for negative numbers
"""
if n < 0:
raise ValueError("Factorial is not defined for negative numbers")
if n <= 1:
return 1
return n * factorial(n - 1)
# Run doctests
if __name__ == "__main__":
import doctest
doctest.testmod(verbose=True)
# Or from command line: python -m doctest module.py
Profiling and Performance Analysis
cProfile for Performance Analysis
import cProfile
import pstats
from functools import wraps
def profile_function(func):
"""Decorator to profile a function"""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
# Print stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions
return result
return wrapper
@profile_function
def slow_function():
"""Function to profile"""
total = 0
for i in range(1000000):
total += i ** 2
return total
# Manual profiling
def manual_profiling():
profiler = cProfile.Profile()
profiler.enable()
# Code to profile
slow_function()
profiler.disable()
# Save to file
profiler.dump_stats('profile_results.prof')
# Analyze results
stats = pstats.Stats('profile_results.prof')
stats.sort_stats('cumulative')
stats.print_stats()
# Command line profiling: python -m cProfile -o profile.prof script.py
Memory Profiling
import tracemalloc
import psutil
import os
from functools import wraps
def memory_profile(func):
"""Decorator to profile memory usage"""
@wraps(func)
def wrapper(*args, **kwargs):
# Start memory tracing
tracemalloc.start()
# Get initial memory
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# Run function
result = func(*args, **kwargs)
# Get final memory
final_memory = process.memory_info().rss / 1024 / 1024 # MB
# Get memory trace
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Function: {func.__name__}")
print(f"Memory change: {final_memory - initial_memory:.2f} MB")
print(f"Peak memory usage: {peak / 1024 / 1024:.2f} MB")
print(f"Current memory usage: {current / 1024 / 1024:.2f} MB")
return result
return wrapper
# Line-by-line memory profiling with memory_profiler
# pip install memory-profiler
# @profile # Uncomment when using line_profiler
def memory_intensive_function():
"""Function that uses a lot of memory"""
data = [i ** 2 for i in range(1000000)]
return sum(data)
# Usage: python -m memory_profiler script.py
Timing and Performance Measurement
import time
import timeit
from functools import wraps
from contextlib import contextmanager
def timing_decorator(func):
"""Decorator to time function execution"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
print(f"{func.__name__} took {end_time - start_time:.4f} seconds")
return result
return wrapper
@contextmanager
def timer():
"""Context manager for timing code blocks"""
start = time.perf_counter()
try:
yield
finally:
end = time.perf_counter()
print(f"Elapsed time: {end - start:.4f} seconds")
# Usage examples
@timing_decorator
def slow_operation():
time.sleep(1)
return "done"
def benchmark_functions():
"""Benchmark multiple implementations"""
# Implementation 1
def list_comprehension():
return [x ** 2 for x in range(10000)]
# Implementation 2
def map_function():
return list(map(lambda x: x ** 2, range(10000)))
# Implementation 3
def for_loop():
result = []
for x in range(10000):
result.append(x ** 2)
return result
# Benchmark each
funcs = [list_comprehension, map_function, for_loop]
for func in funcs:
time_taken = timeit.timeit(func, number=100)
print(f"{func.__name__}: {time_taken:.4f} seconds")
# Context manager usage
with timer():
slow_operation()
Code Quality Tools
Linting with flake8 and pylint
# Example code with various issues for linting
import os, sys # Multiple imports on one line (E401)
import json
def calculate_area(radius): # Missing docstring
pi = 3.14159
area = pi * radius * radius # Could use ** operator
return area
def process_data(data):
result = []
for item in data:
if item > 0:
result.append(item * 2)
else:
pass # Unnecessary pass statement
return result
# Unused variable
unused_var = "This is not used"
# Line too long (over 79 characters)
very_long_line = "This is a very long line that exceeds the recommended 79 character limit for Python code"
# Configuration files:
# .flake8
"""
[flake8]
max-line-length = 88
exclude = .git,__pycache__,venv
ignore = E203,W503
"""
# .pylintrc (generate with: pylint --generate-rcfile > .pylintrc)
"""
[MESSAGES CONTROL]
disable = C0111,R0903,C0103
[FORMAT]
max-line-length = 88
"""
Code Formatting with black and isort
# Before formatting
from os import path
import sys
from typing import List,Dict
import json
def calculate_stats(data:List[int])->Dict[str,float]:
if not data:return {}
total=sum(data)
average=total/len(data)
return {"total":total,"average":average,"count":len(data)}
class DataProcessor:
def __init__(self,data):
self.data=data
def process(self):
return [x*2 for x in self.data if x>0]
# After black formatting (automatic)
from os import path
import sys
from typing import Dict, List
import json
def calculate_stats(data: List[int]) -> Dict[str, float]:
if not data:
return {}
total = sum(data)
average = total / len(data)
return {"total": total, "average": average, "count": len(data)}
class DataProcessor:
def __init__(self, data):
self.data = data
def process(self):
return [x * 2 for x in self.data if x > 0]
# pyproject.toml configuration
"""
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
exclude = '''
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
)/
'''
[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 88
"""
Type Checking with mypy
from typing import List, Dict, Optional, Union, Callable, Any
import mypy
# Type annotations
def process_numbers(numbers: List[int]) -> Dict[str, float]:
"""Process a list of numbers and return statistics."""
if not numbers:
return {}
total: int = sum(numbers)
average: float = total / len(numbers)
return {
"total": total,
"average": average,
"count": len(numbers)
}
# Optional types
def find_user(user_id: int) -> Optional[Dict[str, Any]]:
"""Find user by ID, return None if not found."""
# Simulate database lookup
if user_id > 0:
return {"id": user_id, "name": "John Doe"}
return None
# Union types
def format_value(value: Union[int, float, str]) -> str:
"""Format different types of values."""
if isinstance(value, (int, float)):
return f"{value:.2f}"
return str(value)
# Callable types
def apply_operation(numbers: List[int], operation: Callable[[int], int]) -> List[int]:
"""Apply an operation to each number."""
return [operation(num) for num in numbers]
# Generic types
from typing import TypeVar, Generic
T = TypeVar('T')
class Stack(Generic[T]):
def __init__(self) -> None:
self._items: List[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> T:
return self._items.pop()
def is_empty(self) -> bool:
return len(self._items) == 0
# mypy.ini configuration
"""
[mypy]
python_version = 3.8
warn_return_any = True
warn_unused_configs = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
check_untyped_defs = True
disallow_untyped_decorators = True
no_implicit_optional = True
warn_redundant_casts = True
warn_unused_ignores = True
warn_no_return = True
warn_unreachable = True
strict_equality = True
"""
Error Handling Best Practices
Exception Handling Patterns
# 1. Specific exception handling
def read_config_file(filename: str) -> dict:
"""Read configuration from file with specific error handling."""
try:
with open(filename, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"Config file {filename} not found, using defaults")
return {}
except json.JSONDecodeError as e:
print(f"Invalid JSON in {filename}: {e}")
return {}
except PermissionError:
print(f"No permission to read {filename}")
return {}
# 2. Error context and logging
import logging
logger = logging.getLogger(__name__)
def process_user_data(user_id: int) -> Optional[dict]:
"""Process user data with proper error context."""
try:
user_data = fetch_user_data(user_id)
processed_data = transform_data(user_data)
save_processed_data(processed_data)
return processed_data
except UserNotFoundError:
logger.warning(f"User {user_id} not found")
return None
except DataValidationError as e:
logger.error(f"Data validation failed for user {user_id}: {e}")
return None
except DatabaseError as e:
logger.error(f"Database error processing user {user_id}: {e}")
raise # Re-raise for upstream handling
except Exception as e:
logger.exception(f"Unexpected error processing user {user_id}")
raise
# 3. Retry mechanism
import time
import random
def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
"""Decorator for retrying failed operations."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
if attempt == max_retries - 1:
raise
wait_time = delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time:.2f}s")
time.sleep(wait_time)
return wrapper
return decorator
@retry_on_failure(max_retries=3, delay=1.0)
def fetch_data_from_api(url: str) -> dict:
"""Fetch data from API with retry logic."""
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
Error Recovery Strategies
# 1. Graceful degradation
def get_user_preferences(user_id: int) -> dict:
"""Get user preferences with fallback to defaults."""
try:
# Try to get from cache first
preferences = cache.get(f"user_prefs_{user_id}")
if preferences:
return preferences
# Fall back to database
preferences = database.get_user_preferences(user_id)
cache.set(f"user_prefs_{user_id}", preferences, timeout=300)
return preferences
except CacheError:
logger.warning("Cache unavailable, using database")
try:
return database.get_user_preferences(user_id)
except DatabaseError:
logger.error("Database unavailable, using defaults")
return get_default_preferences()
except DatabaseError:
logger.error("Database unavailable, using defaults")
return get_default_preferences()
# 2. Circuit breaker pattern
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == 'open':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'half-open'
else:
raise CircuitBreakerError("Circuit breaker is open")
try:
result = func(*args, **kwargs)
self.reset()
return result
except Exception as e:
self.record_failure()
raise
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
def reset(self):
self.failure_count = 0
self.state = 'closed'
# Usage
api_circuit_breaker = CircuitBreaker()
def call_external_api():
return api_circuit_breaker.call(requests.get, "https://api.example.com/data")
Troubleshooting Common Errors
Import and Module Errors
# Common import issues and solutions
# 1. ModuleNotFoundError
try:
import some_module
except ModuleNotFoundError as e:
print(f"Module not found: {e}")
print("Solutions:")
print("- Install module: pip install some_module")
print("- Check virtual environment")
print("- Verify PYTHONPATH")
# 2. Circular import detection
import sys
def detect_circular_imports():
"""Detect potential circular imports."""
modules = list(sys.modules.keys())
for module_name in modules:
module = sys.modules[module_name]
if hasattr(module, '__file__') and module.__file__:
print(f"Loaded: {module_name} from {module.__file__}")
# 3. Import path debugging
def debug_import_path():
"""Debug Python import path."""
import sys
print("Python path:")
for path in sys.path:
print(f" {path}")
print("\nLoaded modules:")
for name, module in sys.modules.items():
if hasattr(module, '__file__') and module.__file__:
print(f" {name}: {module.__file__}")
Memory and Performance Issues
import gc
import weakref
from collections import defaultdict
def debug_memory_leaks():
"""Debug memory leaks and reference cycles."""
# Force garbage collection
gc.collect()
# Get object counts by type
obj_counts = defaultdict(int)
for obj in gc.get_objects():
obj_type = type(obj).__name__
obj_counts[obj_type] += 1
# Show top object types
print("Top object types by count:")
for obj_type, count in sorted(obj_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" {obj_type}: {count}")
# Check for reference cycles
print(f"\nGarbage collector stats: {gc.get_stats()}")
# Find objects with references
referrers = gc.get_referrers
for obj in gc.get_objects():
if isinstance(obj, (list, dict)) and len(referrers(obj)) > 3:
print(f"Object with many references: {type(obj)}")
# Weak reference example to prevent memory leaks
class Observer:
def __init__(self):
self._observers = weakref.WeakSet()
def add_observer(self, observer):
self._observers.add(observer)
def notify(self, message):
# Observers are automatically removed when garbage collected
for observer in self._observers:
observer.handle_notification(message)
# Memory-efficient data processing
def process_large_file(filename):
"""Process large file without loading everything into memory."""
def read_chunks(file_obj, chunk_size=8192):
while True:
chunk = file_obj.read(chunk_size)
if not chunk:
break
yield chunk
with open(filename, 'r') as f:
for chunk in read_chunks(f):
# Process chunk
yield process_chunk(chunk)
Debugging Async/Await Issues
import asyncio
import logging
# Enable asyncio debugging
logging.basicConfig(level=logging.DEBUG)
asyncio.get_event_loop().set_debug(True)
async def debug_async_operations():
"""Debug common async/await issues."""
# 1. Deadlock detection
async def potential_deadlock():
# This could cause a deadlock
await asyncio.sleep(1)
return "done"
# Use timeout to prevent hanging
try:
result = await asyncio.wait_for(potential_deadlock(), timeout=5.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out - possible deadlock")
# 2. Exception handling in async code
async def async_operation_with_error():
await asyncio.sleep(0.1)
raise ValueError("Something went wrong")
try:
await async_operation_with_error()
except ValueError as e:
print(f"Caught async exception: {e}")
# 3. Debugging task cancellation
async def cancellable_task():
try:
while True:
await asyncio.sleep(1)
print("Task running...")
except asyncio.CancelledError:
print("Task was cancelled")
raise
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Handled task cancellation")
# Run async debugging
if __name__ == "__main__":
asyncio.run(debug_async_operations())
Integration with Development Tools
IDE Integration
# .vscode/settings.json for VS Code
"""
{
"python.defaultInterpreterPath": "./venv/bin/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
"python.sortImports.args": ["--profile", "black"],
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.testing.nosetestsEnabled": false
}
"""
# PyCharm debugging configurations
def pycharm_debug_config():
"""
PyCharm Debug Configuration:
1. Run/Debug Configurations
2. Add Python configuration
3. Set script path and parameters
4. Configure environment variables
5. Set working directory
6. Enable "Debug" mode
"""
pass
# Debugging in Jupyter notebooks
def jupyter_debug_setup():
"""
Jupyter Debugging Setup:
1. Install ipdb: pip install ipdb
2. Use %debug magic command
3. Enable automatic debugging: %pdb on
4. Use breakpoint() in cells
"""
pass
Pre-commit Hooks
# .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-json
- id: check-toml
- id: check-xml
- id: debug-statements
- id: name-tests-test
- id: requirements-txt-fixer
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
language_version: python3.8
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ['--profile', 'black']
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
additional_dependencies: [flake8-docstrings]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
hooks:
- id: mypy
additional_dependencies: [types-all]
This comprehensive Python Error Handling & Debugging cheatsheet covers all the essential topics for effectively handling errors and debugging Python applications. Keep it handy for quick reference during development and debugging sessions!