logging.Filter
The Filter class provides fine-grained control over which log records are processed. Filters can be attached to both Logger and Handler objects to provide more sophisticated filtering than is achievable with levels alone.
Basic Usage
Simple Name-Based Filter
import logging
# Create a filter that only allows records from specific logger
class ModuleFilter(logging.Filter):
def __init__(self, name):
super().__init__()
self.name = name
def filter(self, record):
return record.name.startswith(self.name)
# Set up logging
logger = logging.getLogger('myapp.database')
handler = logging.StreamHandler()
handler.addFilter(ModuleFilter('myapp.database'))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.info("This will be logged")
# This logger's messages will be filtered out
other_logger = logging.getLogger('otherapp.module')
other_logger.addHandler(handler)
other_logger.info("This will be filtered out")
Built-in Filter
import logging
# Using the built-in Filter class
filter_obj = logging.Filter('myapp')
logger = logging.getLogger('myapp.module')
handler = logging.StreamHandler()
handler.addFilter(filter_obj)
logger.addHandler(handler)
logger.info("This passes the filter")
# Messages from loggers that don't match are filtered
other_logger = logging.getLogger('other.module')
other_logger.addHandler(handler)
other_logger.info("This is filtered out")
Level-Based Custom Filter
import logging
class LevelRangeFilter(logging.Filter):
"""Filter that only allows records within a level range"""
def __init__(self, min_level, max_level):
super().__init__()
self.min_level = min_level
self.max_level = max_level
def filter(self, record):
return self.min_level <= record.levelno <= self.max_level
# Only allow INFO and WARNING messages
info_warning_filter = LevelRangeFilter(logging.INFO, logging.WARNING)
logger = logging.getLogger('filtered_app')
handler = logging.StreamHandler()
handler.addFilter(info_warning_filter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.debug("This is filtered out") # Below min level
logger.info("This passes") # Within range
logger.warning("This passes") # Within range
logger.error("This is filtered out") # Above max level
Filter API Reference
Constructor
| Method | Description | Parameters | Return Type |
|---|---|---|---|
Filter(name='') | Create filter | name: Logger name prefix | Filter |
Core Methods
| Method | Description | Parameters | Return Type |
|---|---|---|---|
filter(record) | Determine if record should be processed | record: LogRecord instance | bool |
Integration Methods
| Method | Description | Parameters | Return Type |
|---|---|---|---|
Logger.addFilter(filter) | Add filter to logger | filter: Filter instance | None |
Logger.removeFilter(filter) | Remove filter from logger | filter: Filter instance | None |
Handler.addFilter(filter) | Add filter to handler | filter: Filter instance | None |
Handler.removeFilter(filter) | Remove filter from handler | filter: Filter instance | None |
Advanced Filtering Patterns
Context-Based Filtering
import logging
import threading
class UserFilter(logging.Filter):
"""Filter based on current user context"""
def __init__(self, allowed_users=None):
super().__init__()
self.allowed_users = set(allowed_users or [])
def filter(self, record):
# Get current user from thread-local storage
current_user = getattr(threading.current_thread(), 'user_id', None)
if not self.allowed_users:
return True # No restriction
return current_user in self.allowed_users
# Usage
admin_filter = UserFilter(['admin', 'superuser'])
logger = logging.getLogger('admin_only')
handler = logging.FileHandler('admin.log')
handler.addFilter(admin_filter)
logger.addHandler(handler)
# Simulate user context
threading.current_thread().user_id = 'admin'
logger.info("Admin action performed") # Will be logged
threading.current_thread().user_id = 'regular_user'
logger.info("Regular user action") # Will be filtered out
Rate-Limiting Filter
import logging
import time
from collections import defaultdict
class RateLimitFilter(logging.Filter):
"""Filter that limits log rate per logger"""
def __init__(self, max_per_minute=60):
super().__init__()
self.max_per_minute = max_per_minute
self.log_counts = defaultdict(list)
def filter(self, record):
now = time.time()
logger_name = record.name
# Clean old entries
cutoff = now - 60 # 1 minute ago
self.log_counts[logger_name] = [
timestamp for timestamp in self.log_counts[logger_name]
if timestamp > cutoff
]
# Check if we're under the limit
if len(self.log_counts[logger_name]) < self.max_per_minute:
self.log_counts[logger_name].append(now)
return True
return False
# Limit to 10 messages per minute
rate_limiter = RateLimitFilter(max_per_minute=10)
logger = logging.getLogger('rate_limited')
handler = logging.StreamHandler()
handler.addFilter(rate_limiter)
logger.addHandler(handler)
# Rapid logging - only first 10 will pass
for i in range(20):
logger.info(f"Message {i}")
Sensitive Data Filter
import logging
import re
class SensitiveDataFilter(logging.Filter):
"""Filter that masks sensitive information"""
def __init__(self):
super().__init__()
# Patterns for sensitive data
self.patterns = [
(re.compile(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'), '****-****-****-****'), # Credit cards
(re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), '***@***.***'), # Emails
(re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), '***-**-****'), # SSN
(re.compile(r'password[=:\s]+[^\s]+', re.IGNORECASE), 'password=***'), # Passwords
]
def filter(self, record):
# Mask sensitive data in the message
if hasattr(record, 'msg') and record.msg:
message = str(record.msg)
for pattern, replacement in self.patterns:
message = pattern.sub(replacement, message)
record.msg = message
return True # Always allow the record, just modified
# Usage
sensitive_filter = SensitiveDataFilter()
logger = logging.getLogger('secure_app')
handler = logging.StreamHandler()
handler.addFilter(sensitive_filter)
logger.addHandler(handler)
logger.info("User email: john.doe@example.com") # Will be masked
logger.info("Credit card: 1234-5678-9012-3456") # Will be masked
Error Classification Filter
import logging
class ErrorClassificationFilter(logging.Filter):
"""Filter that adds error classification"""
def __init__(self):
super().__init__()
self.error_patterns = {
'NETWORK': ['connection', 'timeout', 'socket', 'network'],
'DATABASE': ['sql', 'database', 'connection pool', 'deadlock'],
'SECURITY': ['unauthorized', 'forbidden', 'authentication', 'permission'],
'VALIDATION': ['invalid', 'validation', 'format', 'schema'],
}
def filter(self, record):
if record.levelno >= logging.WARNING:
message = str(record.msg).lower()
# Classify the error
for category, keywords in self.error_patterns.items():
if any(keyword in message for keyword in keywords):
record.error_category = category
break
else:
record.error_category = 'UNKNOWN'
return True
# Usage with custom formatter
class ClassifiedFormatter(logging.Formatter):
def format(self, record):
if hasattr(record, 'error_category'):
record.message = record.getMessage()
return f"[{record.error_category}] {super().format(record)}"
return super().format(record)
error_filter = ErrorClassificationFilter()
formatter = ClassifiedFormatter('%(levelname)s - %(message)s')
logger = logging.getLogger('classified_app')
handler = logging.StreamHandler()
handler.addFilter(error_filter)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.error("Database connection timeout") # [DATABASE] ERROR - Database connection timeout
logger.warning("Invalid user input format") # [VALIDATION] WARNING - Invalid user input format
Filter Chaining
Multiple Filters
import logging
class DebugModeFilter(logging.Filter):
"""Only allow debug messages in debug mode"""
def __init__(self, debug_mode=False):
super().__init__()
self.debug_mode = debug_mode
def filter(self, record):
if record.levelno == logging.DEBUG:
return self.debug_mode
return True
class BusinessHoursFilter(logging.Filter):
"""Only log during business hours (for non-critical messages)"""
def filter(self, record):
import datetime
now = datetime.datetime.now()
# Always allow ERROR and CRITICAL
if record.levelno >= logging.ERROR:
return True
# Business hours: 9 AM to 5 PM, Monday to Friday
if (now.weekday() < 5 and # Monday = 0, Sunday = 6
9 <= now.hour < 17):
return True
return False
# Chain multiple filters
logger = logging.getLogger('business_app')
handler = logging.StreamHandler()
# Add multiple filters
handler.addFilter(DebugModeFilter(debug_mode=True))
handler.addFilter(BusinessHoursFilter())
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.debug("Debug message") # Passes if in debug mode AND business hours
logger.error("Error message") # Always passes (critical level)
Conditional Filter Activation
import logging
import os
class ConditionalFilter(logging.Filter):
"""Filter that can be enabled/disabled based on conditions"""
def __init__(self, condition_func):
super().__init__()
self.condition_func = condition_func
def filter(self, record):
return self.condition_func()
# Example conditions
def is_development():
return os.getenv('ENVIRONMENT') == 'development'
def is_high_traffic():
# Could check current load, request count, etc.
return False
# Usage
dev_filter = ConditionalFilter(is_development)
traffic_filter = ConditionalFilter(lambda: not is_high_traffic())
logger = logging.getLogger('conditional_app')
handler = logging.StreamHandler()
handler.addFilter(dev_filter)
handler.addFilter(traffic_filter)
logger.addHandler(handler)
Advanced Filter Techniques
Dynamic Message Enrichment
import logging
import uuid
import threading
class EnrichmentFilter(logging.Filter):
"""Filter that enriches log records with additional context"""
def filter(self, record):
# Add correlation ID
if not hasattr(record, 'correlation_id'):
record.correlation_id = str(uuid.uuid4())[:8]
# Add thread information
record.thread_name = threading.current_thread().name
# Add memory usage (example)
import psutil
process = psutil.Process()
record.memory_mb = round(process.memory_info().rss / 1024 / 1024, 1)
return True
# Use with custom formatter
enriched_formatter = logging.Formatter(
'%(asctime)s [%(correlation_id)s] [%(thread_name)s] '
'[MEM:%(memory_mb)sMB] %(levelname)s - %(message)s'
)
logger = logging.getLogger('enriched_app')
handler = logging.StreamHandler()
handler.addFilter(EnrichmentFilter())
handler.setFormatter(enriched_formatter)
logger.addHandler(handler)
logger.info("Enriched log message")
Performance Monitoring Filter
import logging
import time
import functools
class PerformanceFilter(logging.Filter):
"""Filter that tracks logging performance"""
def __init__(self, threshold_ms=10):
super().__init__()
self.threshold_ms = threshold_ms
self.slow_loggers = set()
def filter(self, record):
start_time = time.perf_counter()
# The actual filtering logic
result = True # Always pass through
# Measure time taken
duration_ms = (time.perf_counter() - start_time) * 1000
if duration_ms > self.threshold_ms:
self.slow_loggers.add(record.name)
# Could log this to a separate performance logger
print(f"SLOW LOGGING: {record.name} took {duration_ms:.2f}ms")
return result
# Monitor logging performance
perf_filter = PerformanceFilter(threshold_ms=5)
logger = logging.getLogger('monitored_app')
handler = logging.StreamHandler()
handler.addFilter(perf_filter)
logger.addHandler(handler)
Filter Configuration
Configuration-Based Filters
import logging
import logging.config
# Configuration dictionary
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'level_range': {
'()': 'logging_filters.LevelRangeFilter',
'min_level': 'INFO',
'max_level': 'WARNING',
},
'rate_limit': {
'()': 'logging_filters.RateLimitFilter',
'max_per_minute': 30,
},
},
'formatters': {
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'standard',
'filters': ['level_range', 'rate_limit'],
},
},
'loggers': {
'my_app': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
},
},
}
# Apply configuration
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('my_app')
Environment-Based Filter Selection
import logging
import os
def create_filters():
"""Create filters based on environment"""
filters = []
env = os.getenv('ENVIRONMENT', 'development')
if env == 'production':
# Production: Rate limiting and sensitive data masking
filters.extend([
RateLimitFilter(max_per_minute=100),
SensitiveDataFilter(),
])
elif env == 'development':
# Development: More verbose, classification
filters.extend([
DebugModeFilter(debug_mode=True),
ErrorClassificationFilter(),
])
elif env == 'testing':
# Testing: Minimal filtering
pass # No additional filters
return filters
# Apply environment-specific filters
logger = logging.getLogger('env_app')
handler = logging.StreamHandler()
for filter_obj in create_filters():
handler.addFilter(filter_obj)
logger.addHandler(handler)
Common Pitfalls
Filter Return Values
import logging
class IncorrectFilter(logging.Filter):
def filter(self, record):
# WRONG: Returning None (falsy) instead of False
if record.levelno < logging.WARNING:
return None # This will block the record
return True
class CorrectFilter(logging.Filter):
def filter(self, record):
# CORRECT: Explicitly return bool
if record.levelno < logging.WARNING:
return False # Explicitly block
return True # Explicitly allow
Modifying Records Safely
import logging
class UnsafeFilter(logging.Filter):
def filter(self, record):
# DANGEROUS: Modifying record without preserving original
record.msg = record.msg.upper() # Permanently changes the record
return True
class SafeFilter(logging.Filter):
def filter(self, record):
# SAFE: Only modify for this handler's use
if hasattr(record, 'msg'):
# Create a copy for modification
original_msg = record.msg
record.msg = str(record.msg).upper()
# Record will be used by this handler, but original preserved
return True
Memory Leaks in Filters
import logging
import weakref
class MemoryLeakFilter(logging.Filter):
def __init__(self):
super().__init__()
self.all_records = [] # DANGEROUS: Keeps references forever
def filter(self, record):
self.all_records.append(record) # Memory leak!
return True
class MemoryEfficientFilter(logging.Filter):
def __init__(self, max_records=1000):
super().__init__()
self.recent_records = []
self.max_records = max_records
def filter(self, record):
# Keep only recent records
self.recent_records.append(record)
if len(self.recent_records) > self.max_records:
self.recent_records.pop(0) # Remove oldest
return True
Related Components
- logging.Logger - Can have filters attached
- logging.Handler - Can have filters attached
- logging.Formatter - Processes records after filtering
- logging.config - Configuration options for filters