Skip to main content

logging.Filter

The Filter class provides fine-grained control over which log records are processed. Filters can be attached to both Logger and Handler objects to provide more sophisticated filtering than is achievable with levels alone.

Basic Usage

Simple Name-Based Filter

import logging

# Create a filter that only allows records from specific logger
class ModuleFilter(logging.Filter):
def __init__(self, name):
super().__init__()
self.name = name

def filter(self, record):
return record.name.startswith(self.name)

# Set up logging
logger = logging.getLogger('myapp.database')
handler = logging.StreamHandler()
handler.addFilter(ModuleFilter('myapp.database'))

logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

logger.info("This will be logged")

# This logger's messages will be filtered out
other_logger = logging.getLogger('otherapp.module')
other_logger.addHandler(handler)
other_logger.info("This will be filtered out")

Built-in Filter

import logging

# Using the built-in Filter class
filter_obj = logging.Filter('myapp')

logger = logging.getLogger('myapp.module')
handler = logging.StreamHandler()
handler.addFilter(filter_obj)

logger.addHandler(handler)
logger.info("This passes the filter")

# Messages from loggers that don't match are filtered
other_logger = logging.getLogger('other.module')
other_logger.addHandler(handler)
other_logger.info("This is filtered out")

Level-Based Custom Filter

import logging

class LevelRangeFilter(logging.Filter):
"""Filter that only allows records within a level range"""

def __init__(self, min_level, max_level):
super().__init__()
self.min_level = min_level
self.max_level = max_level

def filter(self, record):
return self.min_level <= record.levelno <= self.max_level

# Only allow INFO and WARNING messages
info_warning_filter = LevelRangeFilter(logging.INFO, logging.WARNING)

logger = logging.getLogger('filtered_app')
handler = logging.StreamHandler()
handler.addFilter(info_warning_filter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

logger.debug("This is filtered out") # Below min level
logger.info("This passes") # Within range
logger.warning("This passes") # Within range
logger.error("This is filtered out") # Above max level

Filter API Reference

Constructor

MethodDescriptionParametersReturn Type
Filter(name='')Create filtername: Logger name prefixFilter

Core Methods

MethodDescriptionParametersReturn Type
filter(record)Determine if record should be processedrecord: LogRecord instancebool

Integration Methods

MethodDescriptionParametersReturn Type
Logger.addFilter(filter)Add filter to loggerfilter: Filter instanceNone
Logger.removeFilter(filter)Remove filter from loggerfilter: Filter instanceNone
Handler.addFilter(filter)Add filter to handlerfilter: Filter instanceNone
Handler.removeFilter(filter)Remove filter from handlerfilter: Filter instanceNone

Advanced Filtering Patterns

Context-Based Filtering

import logging
import threading

class UserFilter(logging.Filter):
"""Filter based on current user context"""

def __init__(self, allowed_users=None):
super().__init__()
self.allowed_users = set(allowed_users or [])

def filter(self, record):
# Get current user from thread-local storage
current_user = getattr(threading.current_thread(), 'user_id', None)

if not self.allowed_users:
return True # No restriction

return current_user in self.allowed_users

# Usage
admin_filter = UserFilter(['admin', 'superuser'])
logger = logging.getLogger('admin_only')
handler = logging.FileHandler('admin.log')
handler.addFilter(admin_filter)
logger.addHandler(handler)

# Simulate user context
threading.current_thread().user_id = 'admin'
logger.info("Admin action performed") # Will be logged

threading.current_thread().user_id = 'regular_user'
logger.info("Regular user action") # Will be filtered out

Rate-Limiting Filter

import logging
import time
from collections import defaultdict

class RateLimitFilter(logging.Filter):
"""Filter that limits log rate per logger"""

def __init__(self, max_per_minute=60):
super().__init__()
self.max_per_minute = max_per_minute
self.log_counts = defaultdict(list)

def filter(self, record):
now = time.time()
logger_name = record.name

# Clean old entries
cutoff = now - 60 # 1 minute ago
self.log_counts[logger_name] = [
timestamp for timestamp in self.log_counts[logger_name]
if timestamp > cutoff
]

# Check if we're under the limit
if len(self.log_counts[logger_name]) < self.max_per_minute:
self.log_counts[logger_name].append(now)
return True

return False

# Limit to 10 messages per minute
rate_limiter = RateLimitFilter(max_per_minute=10)
logger = logging.getLogger('rate_limited')
handler = logging.StreamHandler()
handler.addFilter(rate_limiter)
logger.addHandler(handler)

# Rapid logging - only first 10 will pass
for i in range(20):
logger.info(f"Message {i}")

Sensitive Data Filter

import logging
import re

class SensitiveDataFilter(logging.Filter):
"""Filter that masks sensitive information"""

def __init__(self):
super().__init__()
# Patterns for sensitive data
self.patterns = [
(re.compile(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'), '****-****-****-****'), # Credit cards
(re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), '***@***.***'), # Emails
(re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), '***-**-****'), # SSN
(re.compile(r'password[=:\s]+[^\s]+', re.IGNORECASE), 'password=***'), # Passwords
]

def filter(self, record):
# Mask sensitive data in the message
if hasattr(record, 'msg') and record.msg:
message = str(record.msg)
for pattern, replacement in self.patterns:
message = pattern.sub(replacement, message)
record.msg = message

return True # Always allow the record, just modified

# Usage
sensitive_filter = SensitiveDataFilter()
logger = logging.getLogger('secure_app')
handler = logging.StreamHandler()
handler.addFilter(sensitive_filter)
logger.addHandler(handler)

logger.info("User email: john.doe@example.com") # Will be masked
logger.info("Credit card: 1234-5678-9012-3456") # Will be masked

Error Classification Filter

import logging

class ErrorClassificationFilter(logging.Filter):
"""Filter that adds error classification"""

def __init__(self):
super().__init__()
self.error_patterns = {
'NETWORK': ['connection', 'timeout', 'socket', 'network'],
'DATABASE': ['sql', 'database', 'connection pool', 'deadlock'],
'SECURITY': ['unauthorized', 'forbidden', 'authentication', 'permission'],
'VALIDATION': ['invalid', 'validation', 'format', 'schema'],
}

def filter(self, record):
if record.levelno >= logging.WARNING:
message = str(record.msg).lower()

# Classify the error
for category, keywords in self.error_patterns.items():
if any(keyword in message for keyword in keywords):
record.error_category = category
break
else:
record.error_category = 'UNKNOWN'

return True

# Usage with custom formatter
class ClassifiedFormatter(logging.Formatter):
def format(self, record):
if hasattr(record, 'error_category'):
record.message = record.getMessage()
return f"[{record.error_category}] {super().format(record)}"
return super().format(record)

error_filter = ErrorClassificationFilter()
formatter = ClassifiedFormatter('%(levelname)s - %(message)s')

logger = logging.getLogger('classified_app')
handler = logging.StreamHandler()
handler.addFilter(error_filter)
handler.setFormatter(formatter)
logger.addHandler(handler)

logger.error("Database connection timeout") # [DATABASE] ERROR - Database connection timeout
logger.warning("Invalid user input format") # [VALIDATION] WARNING - Invalid user input format

Filter Chaining

Multiple Filters

import logging

class DebugModeFilter(logging.Filter):
"""Only allow debug messages in debug mode"""

def __init__(self, debug_mode=False):
super().__init__()
self.debug_mode = debug_mode

def filter(self, record):
if record.levelno == logging.DEBUG:
return self.debug_mode
return True

class BusinessHoursFilter(logging.Filter):
"""Only log during business hours (for non-critical messages)"""

def filter(self, record):
import datetime
now = datetime.datetime.now()

# Always allow ERROR and CRITICAL
if record.levelno >= logging.ERROR:
return True

# Business hours: 9 AM to 5 PM, Monday to Friday
if (now.weekday() < 5 and # Monday = 0, Sunday = 6
9 <= now.hour < 17):
return True

return False

# Chain multiple filters
logger = logging.getLogger('business_app')
handler = logging.StreamHandler()

# Add multiple filters
handler.addFilter(DebugModeFilter(debug_mode=True))
handler.addFilter(BusinessHoursFilter())

logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

logger.debug("Debug message") # Passes if in debug mode AND business hours
logger.error("Error message") # Always passes (critical level)

Conditional Filter Activation

import logging
import os

class ConditionalFilter(logging.Filter):
"""Filter that can be enabled/disabled based on conditions"""

def __init__(self, condition_func):
super().__init__()
self.condition_func = condition_func

def filter(self, record):
return self.condition_func()

# Example conditions
def is_development():
return os.getenv('ENVIRONMENT') == 'development'

def is_high_traffic():
# Could check current load, request count, etc.
return False

# Usage
dev_filter = ConditionalFilter(is_development)
traffic_filter = ConditionalFilter(lambda: not is_high_traffic())

logger = logging.getLogger('conditional_app')
handler = logging.StreamHandler()
handler.addFilter(dev_filter)
handler.addFilter(traffic_filter)
logger.addHandler(handler)

Advanced Filter Techniques

Dynamic Message Enrichment

import logging
import uuid
import threading

class EnrichmentFilter(logging.Filter):
"""Filter that enriches log records with additional context"""

def filter(self, record):
# Add correlation ID
if not hasattr(record, 'correlation_id'):
record.correlation_id = str(uuid.uuid4())[:8]

# Add thread information
record.thread_name = threading.current_thread().name

# Add memory usage (example)
import psutil
process = psutil.Process()
record.memory_mb = round(process.memory_info().rss / 1024 / 1024, 1)

return True

# Use with custom formatter
enriched_formatter = logging.Formatter(
'%(asctime)s [%(correlation_id)s] [%(thread_name)s] '
'[MEM:%(memory_mb)sMB] %(levelname)s - %(message)s'
)

logger = logging.getLogger('enriched_app')
handler = logging.StreamHandler()
handler.addFilter(EnrichmentFilter())
handler.setFormatter(enriched_formatter)
logger.addHandler(handler)

logger.info("Enriched log message")

Performance Monitoring Filter

import logging
import time
import functools

class PerformanceFilter(logging.Filter):
"""Filter that tracks logging performance"""

def __init__(self, threshold_ms=10):
super().__init__()
self.threshold_ms = threshold_ms
self.slow_loggers = set()

def filter(self, record):
start_time = time.perf_counter()

# The actual filtering logic
result = True # Always pass through

# Measure time taken
duration_ms = (time.perf_counter() - start_time) * 1000

if duration_ms > self.threshold_ms:
self.slow_loggers.add(record.name)
# Could log this to a separate performance logger
print(f"SLOW LOGGING: {record.name} took {duration_ms:.2f}ms")

return result

# Monitor logging performance
perf_filter = PerformanceFilter(threshold_ms=5)
logger = logging.getLogger('monitored_app')
handler = logging.StreamHandler()
handler.addFilter(perf_filter)
logger.addHandler(handler)

Filter Configuration

Configuration-Based Filters

import logging
import logging.config

# Configuration dictionary
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'filters': {
'level_range': {
'()': 'logging_filters.LevelRangeFilter',
'min_level': 'INFO',
'max_level': 'WARNING',
},
'rate_limit': {
'()': 'logging_filters.RateLimitFilter',
'max_per_minute': 30,
},
},
'formatters': {
'standard': {
'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'standard',
'filters': ['level_range', 'rate_limit'],
},
},
'loggers': {
'my_app': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': False,
},
},
}

# Apply configuration
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger('my_app')

Environment-Based Filter Selection

import logging
import os

def create_filters():
"""Create filters based on environment"""
filters = []

env = os.getenv('ENVIRONMENT', 'development')

if env == 'production':
# Production: Rate limiting and sensitive data masking
filters.extend([
RateLimitFilter(max_per_minute=100),
SensitiveDataFilter(),
])
elif env == 'development':
# Development: More verbose, classification
filters.extend([
DebugModeFilter(debug_mode=True),
ErrorClassificationFilter(),
])
elif env == 'testing':
# Testing: Minimal filtering
pass # No additional filters

return filters

# Apply environment-specific filters
logger = logging.getLogger('env_app')
handler = logging.StreamHandler()

for filter_obj in create_filters():
handler.addFilter(filter_obj)

logger.addHandler(handler)

Common Pitfalls

Filter Return Values

import logging

class IncorrectFilter(logging.Filter):
def filter(self, record):
# WRONG: Returning None (falsy) instead of False
if record.levelno < logging.WARNING:
return None # This will block the record
return True

class CorrectFilter(logging.Filter):
def filter(self, record):
# CORRECT: Explicitly return bool
if record.levelno < logging.WARNING:
return False # Explicitly block
return True # Explicitly allow

Modifying Records Safely

import logging

class UnsafeFilter(logging.Filter):
def filter(self, record):
# DANGEROUS: Modifying record without preserving original
record.msg = record.msg.upper() # Permanently changes the record
return True

class SafeFilter(logging.Filter):
def filter(self, record):
# SAFE: Only modify for this handler's use
if hasattr(record, 'msg'):
# Create a copy for modification
original_msg = record.msg
record.msg = str(record.msg).upper()
# Record will be used by this handler, but original preserved
return True

Memory Leaks in Filters

import logging
import weakref

class MemoryLeakFilter(logging.Filter):
def __init__(self):
super().__init__()
self.all_records = [] # DANGEROUS: Keeps references forever

def filter(self, record):
self.all_records.append(record) # Memory leak!
return True

class MemoryEfficientFilter(logging.Filter):
def __init__(self, max_records=1000):
super().__init__()
self.recent_records = []
self.max_records = max_records

def filter(self, record):
# Keep only recent records
self.recent_records.append(record)
if len(self.recent_records) > self.max_records:
self.recent_records.pop(0) # Remove oldest
return True
  • logging.Logger - Can have filters attached
  • logging.Handler - Can have filters attached
  • logging.Formatter - Processes records after filtering
  • logging.config - Configuration options for filters

See Also