Skip to main content

Performance & Optimization

Profiling Tools

cProfile - Built-in Profiler

import cProfile
import pstats

# Profile a function
def slow_function():
total = 0
for i in range(1000000):
total += i * i
return total

# Basic profiling
cProfile.run('slow_function()')

# Save profile to file
cProfile.run('slow_function()', 'profile_output.prof')

# Analyze profile data
stats = pstats.Stats('profile_output.prof')
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions

# Profile script from command line
# python -m cProfile -o profile.prof script.py

Line Profiler - Line-by-Line Analysis

# Install: pip install line_profiler

# Add @profile decorator to functions
@profile
def function_to_profile():
data = []
for i in range(1000):
data.append(i ** 2)
return sum(data)

# Run with: kernprof -l -v script.py

# Alternative: programmatic usage
from line_profiler import LineProfiler

def test_function():
x = [i for i in range(1000)]
y = [i ** 2 for i in x]
return sum(y)

profiler = LineProfiler()
profiler.add_function(test_function)
profiler.enable()
test_function()
profiler.disable()
profiler.print_stats()

Memory Profiler

# Install: pip install memory-profiler

import memory_profiler

# Memory usage of a function
@memory_profiler.profile
def memory_intensive_function():
# Large list creation
big_list = [i for i in range(1000000)]
# Process data
result = sum(big_list)
del big_list
return result

# Monitor memory usage over time
from memory_profiler import profile

@profile
def process_data():
data = []
for i in range(100000):
data.append(i * 2)
return data

# Command line usage
# python -m memory_profiler script.py

# Memory usage monitoring
import psutil
import os

def get_memory_usage():
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # MB

print(f"Memory usage: {get_memory_usage():.2f} MB")

Py-Spy - System-level Profiler

# Install: pip install py-spy

# Profile running Python process
py-spy record -o profile.svg --pid 12345

# Profile with duration
py-spy record -o profile.svg --duration 60 --pid 12345

# Top-like interface
py-spy top --pid 12345

# Profile subprocess
py-spy record -o profile.svg -- python script.py

Memory Management & Optimization

Memory Profiling Techniques

import sys
import gc
from pympler import tracker, muppy, summary

# Track memory usage
def track_memory():
tr = tracker.SummaryTracker()

# Your code here
data = [i for i in range(100000)]

tr.print_diff() # Show memory differences

# Object size inspection
def get_object_size(obj):
return sys.getsizeof(obj)

# Memory snapshot
def memory_snapshot():
all_objects = muppy.get_objects()
sum1 = summary.summarize(all_objects)
summary.print_(sum1)

# Generator vs list memory comparison
def memory_comparison():
# Memory-efficient generator
def number_generator(n):
for i in range(n):
yield i ** 2

# Memory-intensive list
def number_list(n):
return [i ** 2 for i in range(n)]

# Test with large dataset
n = 1000000

# Generator: minimal memory
gen = number_generator(n)
print(f"Generator size: {sys.getsizeof(gen)} bytes")

# List: full memory allocation
lst = number_list(n)
print(f"List size: {sys.getsizeof(lst)} bytes")

Garbage Collection Optimization

import gc
import weakref

# Manual garbage collection
def optimize_garbage_collection():
# Disable automatic GC for performance-critical sections
gc.disable()

# Your performance-critical code
result = heavy_computation()

# Re-enable and force collection
gc.enable()
gc.collect()

return result

# Weak references to avoid circular references
class Parent:
def __init__(self):
self.children = []

def add_child(self, child):
self.children.append(child)
child.parent = weakref.ref(self) # Weak reference

class Child:
def __init__(self):
self.parent = None

# Memory pool pattern
class ObjectPool:
def __init__(self, factory, reset_func=None):
self._factory = factory
self._reset_func = reset_func
self._pool = []

def acquire(self):
if self._pool:
obj = self._pool.pop()
if self._reset_func:
self._reset_func(obj)
return obj
return self._factory()

def release(self, obj):
self._pool.append(obj)

# Usage
def create_list():
return []

def reset_list(lst):
lst.clear()

pool = ObjectPool(create_list, reset_list)

Memory-Efficient Data Structures

import array
from collections import deque
import struct

# Use array for numeric data
def efficient_numeric_storage():
# Regular list (more memory)
regular_list = [1, 2, 3, 4, 5]

# Array (less memory for numbers)
int_array = array.array('i', [1, 2, 3, 4, 5]) # 'i' for int
float_array = array.array('f', [1.0, 2.0, 3.0, 4.0, 5.0]) # 'f' for float

print(f"List size: {sys.getsizeof(regular_list)}")
print(f"Array size: {sys.getsizeof(int_array)}")

# Use deque for frequent insertions/deletions
def efficient_queue_operations():
from collections import deque

# Efficient for append/pop operations
queue = deque()
queue.append(1)
queue.appendleft(0) # O(1) operation
queue.pop()
queue.popleft() # O(1) operation

# Slots for memory-efficient classes
class EfficientClass:
__slots__ = ['x', 'y', 'z'] # Reduce memory usage

def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z

# Struct for binary data
def binary_data_handling():
# Pack data efficiently
data = struct.pack('iif', 10, 20, 3.14)

# Unpack data
a, b, c = struct.unpack('iif', data)

return a, b, c

Concurrency & Parallelism

Threading for I/O-bound Tasks

import threading
import concurrent.futures
import time
import requests

# Basic threading
def io_bound_task(url):
response = requests.get(url)
return response.status_code

def threading_example():
urls = ['http://example.com'] * 10

# Sequential execution
start = time.time()
for url in urls:
io_bound_task(url)
sequential_time = time.time() - start

# Threaded execution
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(io_bound_task, url) for url in urls]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
threaded_time = time.time() - start

print(f"Sequential: {sequential_time:.2f}s")
print(f"Threaded: {threaded_time:.2f}s")

# Thread-safe operations
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()

def increment(self):
with self._lock:
self._value += 1

def get_value(self):
with self._lock:
return self._value

# Producer-consumer pattern
import queue

def producer_consumer_example():
q = queue.Queue()

def producer():
for i in range(10):
q.put(f"item_{i}")
time.sleep(0.1)
q.put(None) # Sentinel

def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Processing {item}")
q.task_done()

# Start threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Multiprocessing for CPU-bound Tasks

import multiprocessing
import concurrent.futures
import time

# CPU-bound task
def cpu_intensive_task(n):
total = 0
for i in range(n):
total += i * i
return total

def multiprocessing_example():
numbers = [1000000] * 8

# Sequential execution
start = time.time()
results = [cpu_intensive_task(n) for n in numbers]
sequential_time = time.time() - start

# Multiprocessing execution
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_intensive_task, numbers))
parallel_time = time.time() - start

print(f"Sequential: {sequential_time:.2f}s")
print(f"Parallel: {parallel_time:.2f}s")

# Process pool with shared memory
def shared_memory_example():
import multiprocessing as mp

def worker(shared_array, start, end):
for i in range(start, end):
shared_array[i] = i * i

if __name__ == '__main__':
# Create shared array
shared_array = mp.Array('i', range(1000000))

# Split work among processes
processes = []
chunk_size = len(shared_array) // mp.cpu_count()

for i in range(mp.cpu_count()):
start = i * chunk_size
end = start + chunk_size if i < mp.cpu_count() - 1 else len(shared_array)
p = mp.Process(target=worker, args=(shared_array, start, end))
processes.append(p)
p.start()

# Wait for completion
for p in processes:
p.join()

Asyncio for Asynchronous Programming

import asyncio
import aiohttp
import time

# Basic async/await
async def async_task(delay):
await asyncio.sleep(delay)
return f"Task completed after {delay}s"

async def basic_async_example():
# Run tasks concurrently
tasks = [async_task(1), async_task(2), async_task(3)]
results = await asyncio.gather(*tasks)
print(results)

# Async HTTP requests
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()

async def async_http_example():
urls = ['http://example.com'] * 10

async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)

return results

# Async context manager
class AsyncContextManager:
async def __aenter__(self):
print("Entering context")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Exiting context")

# Async generator
async def async_generator():
for i in range(5):
await asyncio.sleep(0.1)
yield i

async def consume_async_generator():
async for value in async_generator():
print(value)

# Run async code
if __name__ == '__main__':
asyncio.run(basic_async_example())

Caching Strategies

Built-in LRU Cache

from functools import lru_cache
import time

# Basic LRU cache
@lru_cache(maxsize=128)
def expensive_function(n):
time.sleep(1) # Simulate expensive operation
return n * n

# Usage
print(expensive_function(5)) # Takes 1 second
print(expensive_function(5)) # Returns immediately (cached)

# Cache info
print(expensive_function.cache_info())

# Clear cache
expensive_function.cache_clear()

# Parameterized cache
@lru_cache(maxsize=None) # Unlimited size
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)

# Time-based cache expiration
import time
from functools import wraps

def timed_cache(seconds):
def decorator(func):
cache = {}

@wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
now = time.time()

if key in cache:
value, timestamp = cache[key]
if now - timestamp < seconds:
return value

result = func(*args, **kwargs)
cache[key] = (result, now)
return result

return wrapper
return decorator

@timed_cache(30) # Cache for 30 seconds
def get_data():
return "expensive data"

Custom Caching Solutions

import threading
import pickle
import hashlib
from collections import OrderedDict

# Thread-safe LRU cache
class ThreadSafeLRUCache:
def __init__(self, maxsize=128):
self.maxsize = maxsize
self.cache = OrderedDict()
self.lock = threading.RLock()

def get(self, key):
with self.lock:
if key in self.cache:
# Move to end (most recently used)
self.cache.move_to_end(key)
return self.cache[key]
return None

def put(self, key, value):
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
elif len(self.cache) >= self.maxsize:
# Remove least recently used
self.cache.popitem(last=False)
self.cache[key] = value

# Persistent cache
class PersistentCache:
def __init__(self, filename):
self.filename = filename
self.cache = self._load_cache()

def _load_cache(self):
try:
with open(self.filename, 'rb') as f:
return pickle.load(f)
except (FileNotFoundError, pickle.PickleError):
return {}

def _save_cache(self):
with open(self.filename, 'wb') as f:
pickle.dump(self.cache, f)

def get(self, key):
return self.cache.get(key)

def set(self, key, value):
self.cache[key] = value
self._save_cache()

# Memoization decorator
def memoize(func):
cache = {}

@wraps(func)
def wrapper(*args, **kwargs):
# Create hashable key
key = (args, tuple(sorted(kwargs.items())))

if key not in cache:
cache[key] = func(*args, **kwargs)

return cache[key]

wrapper.cache = cache
wrapper.cache_clear = lambda: cache.clear()
return wrapper

# Redis-based caching
import redis
import json

class RedisCache:
def __init__(self, host='localhost', port=6379, db=0):
self.redis = redis.Redis(host=host, port=port, db=db)

def get(self, key):
value = self.redis.get(key)
if value:
return json.loads(value)
return None

def set(self, key, value, expiry=3600):
self.redis.setex(key, expiry, json.dumps(value))

def delete(self, key):
self.redis.delete(key)

Performance Best Practices

Algorithm & Data Structure Optimization

import bisect
from collections import defaultdict, Counter

# Use appropriate data structures
def efficient_lookups():
# Use set for membership testing
large_list = list(range(1000000))
large_set = set(large_list)

# O(n) vs O(1) lookup
item = 999999

# Slow: O(n)
found = item in large_list

# Fast: O(1)
found = item in large_set

# Binary search for sorted data
def binary_search_example():
sorted_list = list(range(1000000))

# Use bisect for efficient searching
index = bisect.bisect_left(sorted_list, 500000)

# Insert while maintaining sort order
bisect.insort(sorted_list, 500001)

# Efficient counting
def efficient_counting():
data = ['a', 'b', 'a', 'c', 'b', 'a']

# Use Counter instead of manual counting
counts = Counter(data)
most_common = counts.most_common(2)

# Efficient grouping
def efficient_grouping():
data = [('a', 1), ('b', 2), ('a', 3), ('c', 4)]

# Use defaultdict for grouping
groups = defaultdict(list)
for key, value in data:
groups[key].append(value)

# String operations optimization
def string_optimization():
# Use join instead of concatenation
words = ['hello', 'world', 'python']

# Inefficient
result = ''
for word in words:
result += word + ' '

# Efficient
result = ' '.join(words)

# Use string formatting
name = "Alice"
age = 30

# Efficient: f-strings
message = f"Name: {name}, Age: {age}"

Code Optimization Techniques

# List comprehensions vs loops
def comprehension_vs_loop():
# List comprehension (faster)
squares = [x*x for x in range(1000)]

# Traditional loop (slower)
squares = []
for x in range(1000):
squares.append(x*x)

# Generator expressions for memory efficiency
def generator_efficiency():
# Memory efficient
sum_squares = sum(x*x for x in range(1000000))

# Memory intensive
squares = [x*x for x in range(1000000)]
sum_squares = sum(squares)

# Use built-in functions
def builtin_functions():
numbers = range(1000000)

# Use built-in sum
total = sum(numbers)

# Use built-in max
maximum = max(numbers)

# Use built-in any/all
has_even = any(x % 2 == 0 for x in numbers)
all_positive = all(x > 0 for x in numbers)

# Avoid repeated attribute access
def avoid_repeated_access():
import math

# Inefficient
def calculate_slow(data):
result = []
for x in data:
result.append(math.sqrt(x))
return result

# Efficient
def calculate_fast(data):
sqrt = math.sqrt # Local reference
return [sqrt(x) for x in data]

# Use local variables
def local_variables():
global_var = 100

def slow_function():
total = 0
for i in range(1000):
total += global_var # Global lookup
return total

def fast_function():
local_var = global_var # Local copy
total = 0
for i in range(1000):
total += local_var # Local lookup
return total

NumPy Optimization

import numpy as np

# Vectorized operations
def numpy_optimization():
# Create large arrays
a = np.random.rand(1000000)
b = np.random.rand(1000000)

# Vectorized operation (fast)
c = a + b

# Avoid loops with NumPy arrays
# Slow
result = []
for i in range(len(a)):
result.append(a[i] + b[i])

# Fast
result = a + b

# Memory-efficient array operations
def memory_efficient_numpy():
# Use views instead of copies
arr = np.arange(1000000)

# View (no copy)
view = arr[::2]

# Copy (memory intensive)
copy = arr[::2].copy()

# In-place operations
arr += 1 # Modifies original array

# Use appropriate dtypes
small_ints = np.array([1, 2, 3], dtype=np.int8) # 1 byte per element
large_ints = np.array([1, 2, 3], dtype=np.int64) # 8 bytes per element

# Broadcasting for efficient operations
def broadcasting_example():
# Efficient broadcasting
matrix = np.random.rand(1000, 1000)
row_vector = np.random.rand(1000)

# Broadcasting (efficient)
result = matrix + row_vector

# Manual broadcasting (inefficient)
result = matrix + row_vector.reshape(1, -1)

Benchmark & Timing Tools

Built-in Timing

import time
import timeit
from contextlib import contextmanager

# Basic timing
def basic_timing():
start = time.time()

# Your code here
result = sum(range(1000000))

end = time.time()
print(f"Execution time: {end - start:.4f} seconds")

# Context manager for timing
@contextmanager
def timer():
start = time.perf_counter()
try:
yield
finally:
end = time.perf_counter()
print(f"Execution time: {end - start:.4f} seconds")

# Usage
with timer():
result = sum(range(1000000))

# Timing decorator
def timing_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} took {end - start:.4f} seconds")
return result
return wrapper

@timing_decorator
def expensive_function():
return sum(range(1000000))

Timeit Module

import timeit

# Basic usage
def timeit_example():
# Time a simple expression
time_taken = timeit.timeit('sum(range(100))', number=10000)
print(f"Time taken: {time_taken:.6f} seconds")

# Time with setup code
time_taken = timeit.timeit(
'sum(data)',
setup='data = list(range(100))',
number=10000
)

# Time a function
def test_function():
return sum(range(100))

time_taken = timeit.timeit(test_function, number=10000)

# Compare different approaches
def compare_approaches():
# List comprehension
time1 = timeit.timeit(
'[x*x for x in range(100)]',
number=10000
)

# Map function
time2 = timeit.timeit(
'list(map(lambda x: x*x, range(100)))',
number=10000
)

# Generator expression
time3 = timeit.timeit(
'list(x*x for x in range(100))',
number=10000
)

print(f"List comprehension: {time1:.6f}")
print(f"Map function: {time2:.6f}")
print(f"Generator expression: {time3:.6f}")

# Timing with different inputs
def timing_with_inputs():
setup = '''
def bubble_sort(arr):
n = len(arr)
for i in range(n):
for j in range(0, n-i-1):
if arr[j] > arr[j+1]:
arr[j], arr[j+1] = arr[j+1], arr[j]
return arr

import random
'''

# Time with different input sizes
for size in [100, 500, 1000]:
time_taken = timeit.timeit(
f'bubble_sort(random.sample(range(1000), {size}))',
setup=setup,
number=100
)
print(f"Size {size}: {time_taken:.6f} seconds")

Benchmarking Framework

import statistics
import time
from dataclasses import dataclass
from typing import List, Callable

@dataclass
class BenchmarkResult:
name: str
times: List[float]
mean: float
median: float
std_dev: float
min_time: float
max_time: float

class Benchmark:
def __init__(self, warmup=3, iterations=10):
self.warmup = warmup
self.iterations = iterations
self.results = []

def run(self, func: Callable, name: str = None):
if name is None:
name = func.__name__

# Warmup runs
for _ in range(self.warmup):
func()

# Actual timing runs
times = []
for _ in range(self.iterations):
start = time.perf_counter()
func()
end = time.perf_counter()
times.append(end - start)

# Calculate statistics
result = BenchmarkResult(
name=name,
times=times,
mean=statistics.mean(times),
median=statistics.median(times),
std_dev=statistics.stdev(times) if len(times) > 1 else 0,
min_time=min(times),
max_time=max(times)
)

self.results.append(result)
return result

def compare(self, functions: List[tuple]):
"""Compare multiple functions: [(func, name), ...]"""
results = []
for func, name in functions:
results.append(self.run(func, name))

# Sort by mean time
results.sort(key=lambda x: x.mean)

print("Benchmark Results:")
print("-" * 60)
for i, result in enumerate(results):
if i == 0:
print(f"🏆 {result.name}: {result.mean:.6f}s (baseline)")
else:
speedup = result.mean / results[0].mean
print(f"{i+1}. {result.name}: {result.mean:.6f}s ({speedup:.2f}x slower)")

return results

# Usage example
def benchmark_example():
benchmark = Benchmark(warmup=5, iterations=100)

def list_comp():
return [x*x for x in range(1000)]

def map_func():
return list(map(lambda x: x*x, range(1000)))

def generator():
return list(x*x for x in range(1000))

# Compare functions
benchmark.compare([
(list_comp, "List Comprehension"),
(map_func, "Map Function"),
(generator, "Generator Expression")
])

Memory Profiling & Leak Detection

Memory Profiling Tools

import tracemalloc
import gc
import sys
from pympler import tracker, muppy, summary

# Built-in memory tracing
def memory_tracing():
# Start tracing
tracemalloc.start()

# Your code here
data = [i for i in range(100000)]

# Get current memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage: {current / 1024 / 1024:.2f} MB")
print(f"Peak memory usage: {peak / 1024 / 1024:.2f} MB")

# Get top memory consumers
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

print("\nTop 10 memory consumers:")
for stat in top_stats[:10]:
print(stat)

tracemalloc.stop()

# Memory leak detection
def detect_memory_leaks():
# Take initial snapshot
tracemalloc.start()
initial_snapshot = tracemalloc.take_snapshot()

# Run potentially leaky code
for i in range(10):
# Simulate memory leak
leak_data = [j for j in range(10000)]
# "Forget" to clean up

# Take final snapshot
final_snapshot = tracemalloc.take_snapshot()

# Compare snapshots
top_stats = final_snapshot.compare_to(initial_snapshot, 'lineno')

print("Memory usage differences:")
for stat in top_stats[:10]:
print(stat)

# Object tracking
def track_object_creation():
tr = tracker.SummaryTracker()

# Create objects
data = []
for i in range(10000):
data.append([i] * 100)

# Show what was created
tr.print_diff()

# Clean up
del data
gc.collect()

# Show cleanup
tr.print_diff()

# Memory monitoring decorator
def memory_monitor(func):
@wraps(func)
def wrapper(*args, **kwargs):
tracemalloc.start()

result = func(*args, **kwargs)

current, peak = tracemalloc.get_traced_memory()
print(f"{func.__name__} memory usage:")
print(f" Current: {current / 1024 / 1024:.2f} MB")
print(f" Peak: {peak / 1024 / 1024:.2f} MB")

tracemalloc.stop()
return result
return wrapper

@memory_monitor
def memory_intensive_function():
data = [i for i in range(1000000)]
return sum(data)

Leak Detection Strategies

import weakref
import gc
from collections import defaultdict

# Reference counting
def reference_counting():
import sys

class TestObject:
def __init__(self, name):
self.name = name

obj = TestObject("test")
print(f"Reference count: {sys.getrefcount(obj)}")

# Create additional reference
obj2 = obj
print(f"Reference count: {sys.getrefcount(obj)}")

# Remove reference
del obj2
print(f"Reference count: {sys.getrefcount(obj)}")

# Circular reference detection
def circular_reference_example():
class Node:
def __init__(self, value):
self.value = value
self.parent = None
self.children = []

def add_child(self, child):
child.parent = self # Circular reference
self.children.append(child)

# Create circular reference
root = Node("root")
child = Node("child")
root.add_child(child)

# Check for circular references
print(f"Garbage before: {len(gc.garbage)}")

# Force garbage collection
del root, child
collected = gc.collect()
print(f"Objects collected: {collected}")
print(f"Garbage after: {len(gc.garbage)}")

# Weak reference solution
def weak_reference_solution():
class Node:
def __init__(self, value):
self.value = value
self.parent = None # Will be weak reference
self.children = []

def add_child(self, child):
child.parent = weakref.ref(self) # Weak reference
self.children.append(child)

def get_parent(self):
return self.parent() if self.parent else None

# Memory leak detector
class MemoryLeakDetector:
def __init__(self):
self.snapshots = []

def take_snapshot(self):
gc.collect() # Force garbage collection
snapshot = {
'objects': len(gc.get_objects()),
'memory': self._get_memory_usage(),
'types': self._get_object_types()
}
self.snapshots.append(snapshot)
return snapshot

def _get_memory_usage(self):
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # MB

def _get_object_types(self):
type_counts = defaultdict(int)
for obj in gc.get_objects():
type_counts[type(obj).__name__] += 1
return dict(type_counts)

def compare_snapshots(self, start_idx=0, end_idx=-1):
start = self.snapshots[start_idx]
end = self.snapshots[end_idx]

print(f"Object count change: {end['objects'] - start['objects']}")
print(f"Memory change: {end['memory'] - start['memory']:.2f} MB")

# Type differences
print("\nType count changes:")
for obj_type in set(start['types'].keys()) | set(end['types'].keys()):
start_count = start['types'].get(obj_type, 0)
end_count = end['types'].get(obj_type, 0)
if start_count != end_count:
print(f" {obj_type}: {start_count}{end_count}")

# Usage
detector = MemoryLeakDetector()
detector.take_snapshot()

# Run your code
for i in range(1000):
data = [j for j in range(100)]

detector.take_snapshot()
detector.compare_snapshots()

GIL Considerations & Workarounds

Understanding the GIL

import threading
import time
import multiprocessing

# GIL demonstration
def cpu_bound_task():
"""CPU-bound task affected by GIL"""
total = 0
for i in range(10000000):
total += i * i
return total

def io_bound_task():
"""I/O-bound task not affected by GIL"""
time.sleep(1)
return "Task completed"

def gil_demonstration():
# CPU-bound tasks (GIL limits performance)
start = time.time()

# Single-threaded
results = [cpu_bound_task() for _ in range(4)]
single_time = time.time() - start

# Multi-threaded (no improvement due to GIL)
start = time.time()
threads = []
for _ in range(4):
t = threading.Thread(target=cpu_bound_task)
threads.append(t)
t.start()

for t in threads:
t.join()

multi_threaded_time = time.time() - start

print(f"Single-threaded: {single_time:.2f}s")
print(f"Multi-threaded: {multi_threaded_time:.2f}s")
print(f"Speedup: {single_time / multi_threaded_time:.2f}x")

# GIL workarounds
def gil_workarounds():
# 1. Use multiprocessing for CPU-bound tasks
with multiprocessing.Pool() as pool:
results = pool.map(cpu_bound_task, range(4))

# 2. Use threading for I/O-bound tasks
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(io_bound_task) for _ in range(4)]
results = [f.result() for f in futures]

# 3. Use asyncio for I/O-bound tasks
async def async_workaround():
tasks = [asyncio.create_task(async_io_task()) for _ in range(4)]
results = await asyncio.gather(*tasks)
return results

C Extensions & Cython

# Cython example (save as fast_math.pyx)
"""
# fast_math.pyx
def fast_sum(int n):
cdef int i
cdef long long total = 0
for i in range(n):
total += i
return total

def matrix_multiply(double[:, :] A, double[:, :] B):
cdef int i, j, k
cdef int n = A.shape[0]
cdef int m = A.shape[1]
cdef int p = B.shape[1]

cdef double[:, :] C = np.zeros((n, p))

for i in range(n):
for j in range(p):
for k in range(m):
C[i, j] += A[i, k] * B[k, j]

return np.asarray(C)
"""

# setup.py for Cython
"""
from setuptools import setup
from Cython.Build import cythonize
import numpy

setup(
ext_modules = cythonize("fast_math.pyx"),
include_dirs=[numpy.get_include()]
)
"""

# NumPy + Cython integration
import numpy as np

def numpy_cython_example():
# Pure Python (slow)
def python_sum(arr):
total = 0
for x in arr:
total += x
return total

# NumPy (fast)
def numpy_sum(arr):
return np.sum(arr)

# Cython would be even faster for custom operations
data = np.random.rand(1000000)

# Compare performance
import timeit

python_time = timeit.timeit(lambda: python_sum(data), number=10)
numpy_time = timeit.timeit(lambda: numpy_sum(data), number=10)

print(f"Python: {python_time:.4f}s")
print(f"NumPy: {numpy_time:.4f}s")
print(f"Speedup: {python_time / numpy_time:.2f}x")

# Releasing the GIL in C extensions
"""
// Example C extension that releases GIL
#include <Python.h>

static PyObject* cpu_intensive_function(PyObject* self, PyObject* args) {
int n;
if (!PyArg_ParseTuple(args, "i", &n)) {
return NULL;
}

// Release GIL for CPU-intensive work
Py_BEGIN_ALLOW_THREADS

long long result = 0;
for (int i = 0; i < n; i++) {
result += i * i;
}

Py_END_ALLOW_THREADS

return PyLong_FromLongLong(result);
}
"""

Alternative Python Implementations

PyPy - JIT Compilation

# PyPy optimization tips
def pypy_optimization():
# 1. Use pure Python code (PyPy optimizes Python, not C extensions)
def fibonacci_recursive(n):
if n <= 1:
return n
return fibonacci_recursive(n-1) + fibonacci_recursive(n-2)

# 2. Avoid CPython-specific optimizations
# PyPy handles loops differently
def sum_range(n):
total = 0
for i in range(n):
total += i
return total

# 3. Use appropriate data structures
# PyPy optimizes dictionaries and lists well
def process_data(data):
result = {}
for item in data:
if item in result:
result[item] += 1
else:
result[item] = 1
return result

# 4. Profile with PyPy
import time

start = time.time()
result = fibonacci_recursive(35)
end = time.time()

print(f"Fibonacci(35) = {result}")
print(f"Time: {end - start:.4f}s")

# PyPy vs CPython comparison
def pypy_vs_cpython():
"""
Performance comparison between PyPy and CPython

Typical results:
- Pure Python loops: PyPy 5-50x faster
- Recursive functions: PyPy 10-100x faster
- NumPy operations: CPython often faster
- String operations: PyPy 2-10x faster
"""

# CPU-intensive pure Python
def mandelbrot(c, max_iter=100):
z = 0
for n in range(max_iter):
if abs(z) > 2:
return n
z = z*z + c
return max_iter

# PyPy excels at this type of code
def compute_mandelbrot_set():
width, height = 800, 600
results = []

for y in range(height):
for x in range(width):
c = complex(x/width * 2 - 1, y/height * 2 - 1)
results.append(mandelbrot(c))

return results

import time
start = time.time()
mandelbrot_set = compute_mandelbrot_set()
end = time.time()

print(f"Mandelbrot computation: {end - start:.4f}s")

Numba - JIT Compilation

# Install: pip install numba
from numba import jit, njit, vectorize
import numpy as np

# Basic JIT compilation
@jit
def jit_function(n):
total = 0
for i in range(n):
total += i * i
return total

# No-python mode (fastest)
@njit
def njit_function(n):
total = 0
for i in range(n):
total += i * i
return total

# Vectorized functions
@vectorize(['float64(float64, float64)'])
def vectorized_add(a, b):
return a + b

# Numba with NumPy
@njit
def numba_matrix_multiply(A, B):
return np.dot(A, B)

# Parallel execution
from numba import prange

@njit(parallel=True)
def parallel_sum(arr):
total = 0
for i in prange(len(arr)):
total += arr[i]
return total

# Performance comparison
def numba_comparison():
n = 10000000

# Pure Python
def python_sum(n):
total = 0
for i in range(n):
total += i
return total

# Numba JIT
@njit
def numba_sum(n):
total = 0
for i in range(n):
total += i
return total

import timeit

# Warm up JIT
numba_sum(100)

# Time comparison
python_time = timeit.timeit(lambda: python_sum(n), number=1)
numba_time = timeit.timeit(lambda: numba_sum(n), number=1)

print(f"Python: {python_time:.4f}s")
print(f"Numba: {numba_time:.4f}s")
print(f"Speedup: {python_time / numba_time:.2f}x")

Nuitka - Compilation to C++

# Nuitka compilation example
"""
# Install: pip install nuitka

# Compile single file
nuitka3 --module mymodule.py

# Compile with optimization
nuitka3 --module --optimize mymodule.py

# Standalone executable
nuitka3 --standalone --enable-plugin=tk-inter myapp.py

# With NumPy support
nuitka3 --module --enable-plugin=numpy-warnings mymodule.py
"""

# Code optimization for Nuitka
def nuitka_optimization():
# 1. Type hints help optimization
def typed_function(x: int, y: int) -> int:
return x + y

# 2. Use constants
PI = 3.14159

def circle_area(radius: float) -> float:
return PI * radius * radius

# 3. Avoid dynamic features when possible
def static_computation():
# Nuitka can optimize this well
result = 0
for i in range(1000):
result += i * 2
return result

# 4. Use built-in functions
def use_builtins(data):
return sum(x * 2 for x in data)

# Performance monitoring
def performance_monitoring():
import cProfile
import pstats

# Profile different implementations
def profile_implementations():
# CPython
cProfile.run('slow_function()', 'cpython_profile.prof')

# Nuitka (compile first)
# nuitka3 --module slow_function.py
# import slow_function
# cProfile.run('slow_function.slow_function()', 'nuitka_profile.prof')

# Compare results
cpython_stats = pstats.Stats('cpython_profile.prof')
print("CPython profile:")
cpython_stats.print_stats()

Real-world Optimization Examples

Web Application Performance

# Django/Flask optimization examples
from functools import lru_cache
import redis
import pickle

# Database query optimization
def optimize_database_queries():
"""
Database optimization strategies:
1. Use select_related() for foreign keys
2. Use prefetch_related() for many-to-many
3. Use database indexing
4. Avoid N+1 queries
"""

# Example with ORM
# users = User.objects.select_related('profile').all()
# posts = Post.objects.prefetch_related('tags').all()

# Connection pooling
import psycopg2.pool

connection_pool = psycopg2.pool.ThreadedConnectionPool(
1, 20, # min and max connections
database="mydb",
user="user",
password="password",
host="localhost",
port="5432"
)

# Caching strategies
class CacheManager:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.local_cache = {}

def get_cached_data(self, key):
# Try local cache first
if key in self.local_cache:
return self.local_cache[key]

# Try Redis cache
cached = self.redis_client.get(key)
if cached:
data = pickle.loads(cached)
self.local_cache[key] = data # Store in local cache
return data

return None

def set_cached_data(self, key, data, expire=3600):
# Store in both caches
self.local_cache[key] = data
self.redis_client.setex(key, expire, pickle.dumps(data))

# Session optimization
def optimize_sessions():
"""
Session optimization strategies:
1. Use database or Redis for session storage
2. Minimize session data
3. Use compression for large sessions
4. Implement session cleanup
"""

import zlib
import json

class CompressedSession:
def __init__(self):
self.data = {}

def serialize(self):
json_data = json.dumps(self.data)
return zlib.compress(json_data.encode())

def deserialize(self, compressed_data):
json_data = zlib.decompress(compressed_data).decode()
self.data = json.loads(json_data)

Data Processing Pipeline

# Large dataset processing
import pandas as pd
import numpy as np
from multiprocessing import Pool
import dask.dataframe as dd

def optimize_data_processing():
# Use chunked processing for large files
def process_large_csv(filename):
chunk_size = 10000
results = []

for chunk in pd.read_csv(filename, chunksize=chunk_size):
# Process chunk
processed = chunk.groupby('category').sum()
results.append(processed)

# Combine results
return pd.concat(results).groupby(level=0).sum()

# Use Dask for distributed processing
def dask_processing(filename):
df = dd.read_csv(filename)
result = df.groupby('category').sum().compute()
return result

# Memory-efficient operations
def memory_efficient_operations():
# Use categorical data for strings
df = pd.read_csv('data.csv')
df['category'] = df['category'].astype('category')

# Use appropriate dtypes
df['small_int'] = df['small_int'].astype('int8')
df['large_int'] = df['large_int'].astype('int64')

# Use sparse arrays for mostly empty data
df['sparse_col'] = df['sparse_col'].astype('Sparse[float64]')

return df

# Parallel processing patterns
def parallel_processing_patterns():
# Map-reduce pattern
def map_reduce_example(data):
def mapper(chunk):
return sum(x * x for x in chunk)

def reducer(results):
return sum(results)

# Split data into chunks
chunk_size = len(data) // 4
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

# Parallel map
with Pool() as pool:
mapped_results = pool.map(mapper, chunks)

# Reduce
return reducer(mapped_results)

# Pipeline pattern
def pipeline_pattern():
from queue import Queue
import threading

def stage1(input_queue, output_queue):
while True:
item = input_queue.get()
if item is None:
break
# Process item
processed = item * 2
output_queue.put(processed)
input_queue.task_done()

def stage2(input_queue, output_queue):
while True:
item = input_queue.get()
if item is None:
break
# Process item
processed = item + 1
output_queue.put(processed)
input_queue.task_done()

# Set up pipeline
q1 = Queue()
q2 = Queue()
q3 = Queue()

# Start workers
t1 = threading.Thread(target=stage1, args=(q1, q2))
t2 = threading.Thread(target=stage2, args=(q2, q3))

t1.start()
t2.start()

# Feed data
for i in range(100):
q1.put(i)

# Signal completion
q1.put(None)
q2.put(None)

# Collect results
results = []
while True:
try:
result = q3.get_nowait()
results.append(result)
except:
break

t1.join()
t2.join()

return results

Machine Learning Performance

# ML model optimization
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib

def optimize_ml_performance():
# Data preprocessing optimization
def optimize_preprocessing(X, y):
# Use appropriate data types
X = X.astype(np.float32) # Use float32 instead of float64

# Normalize data efficiently
X_normalized = (X - X.mean(axis=0)) / X.std(axis=0)

# Use efficient train-test split
X_train, X_test, y_train, y_test = train_test_split(
X_normalized, y, test_size=0.2, random_state=42
)

return X_train, X_test, y_train, y_test

# Model training optimization
def optimize_model_training():
# Use parallel processing
model = RandomForestClassifier(
n_estimators=100,
n_jobs=-1, # Use all available cores
random_state=42
)

# Incremental learning for large datasets
from sklearn.linear_model import SGDClassifier

sgd_model = SGDClassifier()

# Train in batches
batch_size = 1000
for i in range(0, len(X_train), batch_size):
batch_X = X_train[i:i+batch_size]
batch_y = y_train[i:i+batch_size]
sgd_model.partial_fit(batch_X, batch_y)

return model, sgd_model

# Model inference optimization
def optimize_inference():
# Batch predictions
def batch_predict(model, X, batch_size=1000):
predictions = []
for i in range(0, len(X), batch_size):
batch = X[i:i+batch_size]
batch_preds = model.predict(batch)
predictions.extend(batch_preds)
return np.array(predictions)

# Model serialization
def save_optimized_model(model, filename):
# Use joblib for efficient serialization
joblib.dump(model, filename, compress=3)

def load_optimized_model(filename):
return joblib.load(filename)

return batch_predict, save_optimized_model, load_optimized_model

# GPU acceleration with CuPy
def gpu_acceleration():
"""
GPU acceleration examples with CuPy
Install: pip install cupy
"""

try:
import cupy as cp

# GPU array operations
def gpu_matrix_operations():
# Create GPU arrays
a_gpu = cp.random.rand(1000, 1000)
b_gpu = cp.random.rand(1000, 1000)

# GPU matrix multiplication
c_gpu = cp.dot(a_gpu, b_gpu)

# Transfer back to CPU if needed
c_cpu = cp.asnumpy(c_gpu)

return c_cpu

# Memory management on GPU
def gpu_memory_management():
# Use memory pool
mempool = cp.get_default_memory_pool()

# Your GPU operations
result = gpu_matrix_operations()

# Free GPU memory
mempool.free_all_blocks()

return result

except ImportError:
print("CuPy not available - GPU acceleration disabled")

def gpu_matrix_operations():
# Fallback to CPU
a = np.random.rand(1000, 1000)
b = np.random.rand(1000, 1000)
return np.dot(a, b)

This comprehensive cheatsheet covers Python performance optimization from basic profiling to advanced techniques. Use these tools and strategies to identify bottlenecks and optimize your Python applications for maximum performance.