PY
Performance
Python performance v1.0.0
Python Performance Optimization
Performance optimization in Python requires understanding the language’s execution model, profiling bottlenecks, and applying appropriate optimization strategies. This skill covers profiling tools like cProfile and memory_profiler, optimization techniques for CPU-bound and memory-intensive workloads, leveraging NumPy for numerical computations, multiprocessing for parallel execution, and working around the Global Interpreter Lock (GIL). Effective performance tuning balances code clarity with execution speed for production systems.
Key Concepts
- Profiling - Measuring code execution time and resource usage to identify bottlenecks
- cProfile - Built-in deterministic profiler for function-level performance analysis
- Memory Profiling - Tracking memory allocation, leaks, and optimization opportunities
- Global Interpreter Lock (GIL) - CPython lock preventing true multi-threaded parallelism for CPU-bound tasks
- NumPy Vectorization - Replacing Python loops with optimized array operations for numerical computing
- Multiprocessing - True parallelism using separate Python processes, bypassing GIL
- Algorithmic Complexity - Big-O analysis for selecting efficient data structures and algorithms
- Lazy Evaluation - Deferring computation using generators and itertools for memory efficiency
- Cython - Compiling Python to C for performance-critical code sections
- Concurrency Models - Threading (I/O-bound), multiprocessing (CPU-bound), asyncio (I/O-bound)
Best Practices
- Profile before optimizing - Measure actual bottlenecks with profilers; don’t guess where to optimize
- Optimize algorithms first - Improving O(n²) to O(n log n) beats micro-optimizations
- Use appropriate data structures - Choose set/dict lookups (O(1)) over list searches (O(n))
- Vectorize with NumPy - Replace Python loops with NumPy array operations for 10-100x speedups
- Leverage built-in functions - Use map(), filter(), sum() implemented in C for better performance
- Avoid premature optimization - Write clear code first; optimize proven bottlenecks later
- Use multiprocessing for CPU-bound work - Bypass GIL with process-based parallelism
- Cache expensive computations - Use functools.lru_cache for repeated function calls
- Profile memory usage - Identify memory leaks and optimize large data processing
- Consider Cython for hot paths - Compile performance-critical sections to C extensions
Code Examples
CPU Profiling with cProfile
# ✅ GOOD: Comprehensive profiling workflow
import cProfile
import pstats
from io import StringIO
from functools import wraps
from typing import Callable, TypeVar, ParamSpec
P = ParamSpec('P')
T = TypeVar('T')
def profile_function(func: Callable[P, T]) -> Callable[P, T]:
"""
Decorator to profile function execution.
Prints top 20 functions by cumulative time.
"""
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
profiler = cProfile.Profile()
profiler.enable()
try:
result = func(*args, **kwargs)
return result
finally:
profiler.disable()
# Print statistics
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(20)
print(stream.getvalue())
return wrapper
@profile_function
def process_large_dataset(data: list[dict]) -> list[dict]:
"""Process dataset with profiling enabled."""
result = []
for item in data:
processed = expensive_computation(item)
result.append(processed)
return result
# Command-line profiling
"""
# Profile entire script
python -m cProfile -o profile.stats my_script.py
# Analyze profile data
python -m pstats profile.stats
% sort cumulative
% stats 20
% callers expensive_function
% callees expensive_function
"""
# Programmatic profiling analysis
def analyze_profile_stats(profile_file: str) -> None:
"""
Analyze profile statistics and identify bottlenecks.
Args:
profile_file: Path to .stats file from cProfile
"""
stats = pstats.Stats(profile_file)
print("=" * 80)
print("Top 10 functions by cumulative time:")
print("=" * 80)
stats.sort_stats('cumulative')
stats.print_stats(10)
print("\n" + "=" * 80)
print("Top 10 functions by total time:")
print("=" * 80)
stats.sort_stats('tottime')
stats.print_stats(10)
print("\n" + "=" * 80)
print("Functions called most frequently:")
print("=" * 80)
stats.sort_stats('ncalls')
stats.print_stats(10)
# ❌ BAD: No profiling, guessing at bottlenecks
def process_large_dataset(data):
# Optimizing this loop because "loops are slow"
# without profiling to confirm it's the bottleneck
return [expensive_computation(item) for item in data]
Memory Profiling and Optimization
# ✅ GOOD: Memory profiling with memory_profiler
from memory_profiler import profile
from typing import Iterator
import sys
@profile
def memory_intensive_function(n: int) -> list[int]:
"""
Process large dataset with memory profiling.
Decorator shows line-by-line memory usage.
"""
# Memory spike: creates large list
data = list(range(n)) # ~40 MB for n=10M
# Memory efficient: process in chunks
result = []
for i in range(0, len(data), 1000):
chunk = data[i:i+1000]
result.extend(process_chunk(chunk))
return result
# Run with: python -m memory_profiler script.py
"""
Line # Mem usage Increment Occurrences Line Contents
=============================================================
8 50.5 MiB 50.5 MiB 1 @profile
9 def memory_intensive_function(n: int):
14 91.2 MiB 40.7 MiB 1 data = list(range(n))
17 91.2 MiB 0.0 MiB 1 result = []
18 95.3 MiB 4.1 MiB 10001 for i in range(0, len(data), 1000):
19 95.3 MiB 0.0 MiB 10000 chunk = data[i:i+1000]
20 95.3 MiB 0.0 MiB 10000 result.extend(process_chunk(chunk))
"""
# Memory-efficient generator approach
def memory_efficient_processor(n: int) -> Iterator[int]:
"""
Process large dataset using generator.
Yields results one at a time without loading all into memory.
"""
for i in range(n):
yield expensive_computation(i)
# Memory usage comparison
def compare_memory_usage() -> None:
"""Compare memory usage of different approaches."""
import tracemalloc
# Track memory allocation
tracemalloc.start()
# Approach 1: Load all into memory
snapshot1 = tracemalloc.take_snapshot()
data1 = list(range(10_000_000))
snapshot2 = tracemalloc.take_snapshot()
stats = snapshot2.compare_to(snapshot1, 'lineno')
print("Memory allocated:")
for stat in stats[:3]:
print(stat)
# Peak memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
tracemalloc.stop()
# ❌ BAD: Unnecessary memory allocation
def process_data(n: int) -> int:
# Creates unnecessary intermediate lists
data = list(range(n)) # 40 MB
squared = [x**2 for x in data] # Another 40 MB
filtered = [x for x in squared if x % 2 == 0] # Another list
return sum(filtered)
# ✅ Fix: Use generators for memory efficiency
def process_data_efficient(n: int) -> int:
return sum(x**2 for x in range(n) if (x**2) % 2 == 0)
# Constant memory usage
NumPy Vectorization for Performance
# ✅ GOOD: NumPy vectorization for numerical computing
import numpy as np
from typing import Final
import time
def benchmark_comparison() -> None:
"""Compare pure Python vs NumPy performance."""
size: Final[int] = 1_000_000
# Pure Python approach
start = time.perf_counter()
python_list = list(range(size))
python_result = [x**2 + 2*x + 1 for x in python_list]
python_time = time.perf_counter() - start
# NumPy vectorized approach
start = time.perf_counter()
numpy_array = np.arange(size)
numpy_result = numpy_array**2 + 2*numpy_array + 1
numpy_time = time.perf_counter() - start
print(f"Python time: {python_time:.3f}s")
print(f"NumPy time: {numpy_time:.3f}s")
print(f"Speedup: {python_time/numpy_time:.1f}x")
# Typical output:
# Python time: 0.234s
# NumPy time: 0.008s
# Speedup: 29.3x
# NumPy operations for data processing
def process_numerical_data(data: np.ndarray) -> dict[str, float]:
"""
Process numerical data using vectorized operations.
Args:
data: NumPy array of numerical values
Returns:
Dictionary of computed statistics
"""
# All operations vectorized, computed in C
return {
"mean": np.mean(data),
"std": np.std(data),
"median": np.median(data),
"min": np.min(data),
"max": np.max(data),
"percentile_95": np.percentile(data, 95),
}
# Matrix operations with NumPy
def matrix_computation(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""
Perform matrix multiplication using NumPy.
Uses optimized BLAS libraries for massive speedup.
"""
# Pure Python would be O(n³) and extremely slow
# NumPy uses optimized BLAS: 100-1000x faster
return np.dot(a, b)
# Broadcasting for element-wise operations
def normalize_features(features: np.ndarray) -> np.ndarray:
"""
Normalize features using broadcasting.
Args:
features: 2D array of shape (n_samples, n_features)
Returns:
Normalized features with zero mean and unit variance
"""
mean = np.mean(features, axis=0) # Shape: (n_features,)
std = np.std(features, axis=0) # Shape: (n_features,)
# Broadcasting: automatically expands mean/std to match features shape
normalized = (features - mean) / std
return normalized
# ❌ BAD: Python loops instead of NumPy vectorization
def process_numerical_data_slow(data: list[float]) -> dict[str, float]:
total = sum(data)
mean = total / len(data)
variance = sum((x - mean)**2 for x in data) / len(data)
std = variance ** 0.5
sorted_data = sorted(data)
median = sorted_data[len(data) // 2]
# 10-100x slower than NumPy
return {"mean": mean, "std": std, "median": median}
Multiprocessing for CPU-Bound Parallelism
# ✅ GOOD: Multiprocessing to bypass GIL
import multiprocessing as mp
from multiprocessing import Pool
from typing import Callable, TypeVar, Sequence
from functools import partial
import time
T = TypeVar('T')
R = TypeVar('R')
def parallel_map(
func: Callable[[T], R],
items: Sequence[T],
*,
processes: int | None = None
) -> list[R]:
"""
Apply function to items in parallel using multiprocessing.
Args:
func: Function to apply to each item
items: Sequence of items to process
processes: Number of worker processes (default: CPU count)
Returns:
List of results in same order as input
Example:
>>> results = parallel_map(expensive_computation, range(100))
"""
if processes is None:
processes = mp.cpu_count()
with Pool(processes=processes) as pool:
results = pool.map(func, items)
return results
def cpu_intensive_task(n: int) -> int:
"""Simulate CPU-intensive computation."""
result = 0
for i in range(n):
result += i ** 2
return result
def benchmark_parallelism() -> None:
"""Compare sequential vs parallel execution."""
tasks = [10_000_000] * 8 # 8 heavy tasks
# Sequential execution
start = time.perf_counter()
results_seq = [cpu_intensive_task(n) for n in tasks]
seq_time = time.perf_counter() - start
# Parallel execution
start = time.perf_counter()
results_par = parallel_map(cpu_intensive_task, tasks)
par_time = time.perf_counter() - start
print(f"Sequential: {seq_time:.2f}s")
print(f"Parallel: {par_time:.2f}s")
print(f"Speedup: {seq_time/par_time:.1f}x")
# On 8-core machine:
# Sequential: 12.45s
# Parallel: 1.78s
# Speedup: 7.0x
# Process pool for long-running workers
class DataProcessor:
"""Parallel data processor with worker pool."""
def __init__(self, num_workers: int | None = None):
self.num_workers = num_workers or mp.cpu_count()
self.pool: Pool | None = None
def __enter__(self) -> "DataProcessor":
self.pool = Pool(processes=self.num_workers)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self.pool:
self.pool.close()
self.pool.join()
def process_batch(
self,
items: Sequence[dict],
chunk_size: int = 100
) -> list[dict]:
"""
Process items in parallel chunks.
Args:
items: Items to process
chunk_size: Number of items per worker
Returns:
Processed results
"""
if not self.pool:
raise RuntimeError("Processor not initialized")
# Process in chunks for better efficiency
results = self.pool.map(
self._process_item,
items,
chunksize=chunk_size
)
return results
@staticmethod
def _process_item(item: dict) -> dict:
"""Process single item (must be picklable)."""
# CPU-intensive processing
return {"result": item["value"] ** 2}
# Usage
with DataProcessor(num_workers=4) as processor:
items = [{"value": i} for i in range(10000)]
results = processor.process_batch(items)
# ❌ BAD: Threading for CPU-bound work (GIL prevents parallelism)
import threading
def process_with_threading(items: list[int]) -> list[int]:
results = []
threads = []
def worker(item):
result = cpu_intensive_task(item)
results.append(result)
for item in items:
thread = threading.Thread(target=worker, args=(item,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return results
# No speedup! GIL prevents parallel execution
Caching and Memoization
# ✅ GOOD: Caching expensive computations
from functools import lru_cache, cache
from typing import Final
import time
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
"""
Expensive computation with LRU cache.
Cache stores up to 128 most recent results.
Subsequent calls with same argument return cached result.
"""
time.sleep(0.1) # Simulate expensive operation
return n ** 2 + 2 * n + 1
@cache # Python 3.9+: unbounded cache
def fibonacci(n: int) -> int:
"""
Fibonacci with memoization.
Without cache: O(2ⁿ) - exponential time
With cache: O(n) - linear time
"""
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# Benchmark caching impact
def benchmark_caching() -> None:
"""Demonstrate caching performance improvement."""
# First call: executes function
start = time.perf_counter()
result1 = expensive_computation(42)
time1 = time.perf_counter() - start
# Second call: returns cached result
start = time.perf_counter()
result2 = expensive_computation(42)
time2 = time.perf_counter() - start
print(f"First call: {time1*1000:.1f}ms")
print(f"Cached call: {time2*1000:.3f}ms")
print(f"Speedup: {time1/time2:.0f}x")
# Output:
# First call: 100.2ms
# Cached call: 0.001ms
# Speedup: 100200x
# Cache info and clearing
def analyze_cache_performance() -> None:
"""Analyze cache hit rate and efficiency."""
# Warm up cache
for i in range(100):
fibonacci(i % 20)
# Check cache statistics
info = fibonacci.cache_info()
print(f"Hits: {info.hits}")
print(f"Misses: {info.misses}")
print(f"Hit rate: {info.hits / (info.hits + info.misses):.1%}")
# Clear cache if needed
fibonacci.cache_clear()
# Custom caching for complex scenarios
from typing import Callable, TypeVar, ParamSpec
import pickle
P = ParamSpec('P')
T = TypeVar('T')
class DiskCache:
"""Cache function results to disk for persistence."""
def __init__(self, cache_dir: str = ".cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def __call__(self, func: Callable[P, T]) -> Callable[P, T]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
# Generate cache key from arguments
key = f"{func.__name__}_{hash((args, tuple(sorted(kwargs.items()))))}"
cache_file = self.cache_dir / f"{key}.pkl"
# Return cached result if exists
if cache_file.exists():
with open(cache_file, 'rb') as f:
return pickle.load(f)
# Compute and cache result
result = func(*args, **kwargs)
with open(cache_file, 'wb') as f:
pickle.dump(result, f)
return result
return wrapper
@DiskCache(cache_dir=".cache")
def expensive_data_processing(input_file: str) -> dict:
"""Process file with disk-persisted cache."""
# Expensive processing
return {"processed": True}
# ❌ BAD: No caching, recomputing same values
def fibonacci_slow(n: int) -> int:
if n < 2:
return n
return fibonacci_slow(n - 1) + fibonacci_slow(n - 2)
# fibonacci_slow(40) takes ~30 seconds
# fibonacci(40) with cache takes ~0.0001 seconds
Algorithm and Data Structure Optimization
# ✅ GOOD: Choosing optimal data structures
from collections import deque, defaultdict, Counter
from typing import Sequence, Hashable
# Use sets for membership testing
def find_common_elements_optimized(
list1: Sequence[Hashable],
list2: Sequence[Hashable]
) -> set[Hashable]:
"""
Find common elements using sets (O(n)).
Converting to sets makes lookup O(1) instead of O(n).
"""
set1 = set(list1)
set2 = set(list2)
return set1 & set2 # Set intersection: O(min(len(set1), len(set2)))
# ❌ BAD: Nested loops (O(n²))
def find_common_elements_slow(list1: list, list2: list) -> list:
common = []
for item1 in list1: # O(n)
if item1 in list2: # O(n) for list lookup
common.append(item1)
return common
# Total: O(n²)
# Use deque for efficient queue operations
def process_queue_optimized(items: Sequence[str]) -> None:
"""Use deque for O(1) appendleft and popleft."""
queue: deque[str] = deque(items)
while queue:
item = queue.popleft() # O(1)
process(item)
if needs_reprocessing(item):
queue.append(item) # O(1)
# ❌ BAD: List as queue (O(n) for pop(0))
def process_queue_slow(items: list[str]) -> None:
queue = list(items)
while queue:
item = queue.pop(0) # O(n) - shifts all elements
process(item)
# Use Counter for frequency counting
def find_most_common_optimized(items: Sequence[str], n: int = 10) -> list[tuple[str, int]]:
"""Find n most common items efficiently."""
return Counter(items).most_common(n) # O(n log n)
# ❌ BAD: Manual counting
def find_most_common_slow(items: list[str], n: int = 10) -> list[tuple[str, int]]:
counts = {}
for item in items:
counts[item] = counts.get(item, 0) + 1
sorted_items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
return sorted_items[:n]
# More code, same complexity, but slower due to Python overhead
# Use defaultdict to avoid key checking
def group_by_category_optimized(
items: Sequence[dict]
) -> dict[str, list[dict]]:
"""Group items by category using defaultdict."""
groups: defaultdict[str, list[dict]] = defaultdict(list)
for item in items:
groups[item["category"]].append(item)
return dict(groups)
# ❌ BAD: Manual key checking
def group_by_category_slow(items: list[dict]) -> dict[str, list[dict]]:
groups = {}
for item in items:
category = item["category"]
if category not in groups:
groups[category] = []
groups[category].append(item)
return groups
Anti-Patterns
Premature Optimization
# ❌ Avoid: Optimizing before profiling
def process_data(items):
# Complex, unreadable optimization attempt
return [y for x in items if (y := expensive_func(x)) and condition(y)]
# ✅ Fix: Write clear code first, profile, then optimize if needed
def process_data(items):
results = []
for item in items:
result = expensive_func(item)
if condition(result):
results.append(result)
return results
# Profile shows expensive_func is bottleneck? Add @lru_cache
Using Threading for CPU-Bound Work
# ❌ Avoid: Threading doesn't help CPU-bound tasks (GIL)
import threading
def parallel_compute(data):
threads = [threading.Thread(target=cpu_work, args=(x,)) for x in data]
for t in threads:
t.start()
for t in threads:
t.join()
# No speedup due to GIL
# ✅ Fix: Use multiprocessing for CPU-bound work
import multiprocessing
def parallel_compute(data):
with multiprocessing.Pool() as pool:
results = pool.map(cpu_work, data)
return results
Unnecessary List Comprehensions
# ❌ Avoid: Creating intermediate lists
def process_pipeline(data):
step1 = [transform1(x) for x in data] # Full list
step2 = [transform2(x) for x in step1] # Another full list
step3 = [transform3(x) for x in step2] # Another full list
return sum(step3)
# ✅ Fix: Use generator expressions
def process_pipeline(data):
return sum(
transform3(transform2(transform1(x)))
for x in data
) # Constant memory
Testing Strategies
Performance Regression Testing
import pytest
import time
def test_query_performance() -> None:
"""Ensure query performance doesn't regress."""
start = time.perf_counter()
result = expensive_query()
duration = time.perf_counter() - start
assert duration < 1.0, f"Query took {duration:.2f}s, threshold 1.0s"
assert len(result) > 0
@pytest.mark.benchmark
def test_processing_throughput(benchmark) -> None:
"""Benchmark processing throughput."""
data = generate_test_data(size=10000)
result = benchmark(process_data, data)
assert result is not None
# pytest-benchmark provides statistics automatically
Memory Leak Detection
import tracemalloc
import gc
def test_no_memory_leak() -> None:
"""Test that repeated operations don't leak memory."""
gc.collect()
tracemalloc.start()
# Baseline
snapshot1 = tracemalloc.take_snapshot()
# Run operations multiple times
for _ in range(100):
process_data(generate_data())
gc.collect()
snapshot2 = tracemalloc.take_snapshot()
# Compare memory usage
stats = snapshot2.compare_to(snapshot1, 'lineno')
total_diff = sum(stat.size_diff for stat in stats)
# Allow small growth, but catch leaks
assert total_diff < 10 * 1024 * 1024, f"Memory leaked: {total_diff / 1024 / 1024:.1f} MB"
tracemalloc.stop()
References
- Python Profilers
- cProfile Documentation
- memory_profiler
- NumPy Documentation
- multiprocessing
- Understanding the GIL
- Python Performance Tips
- Cython Documentation
Related Skills
- python-best-practices.md - Profile code quality and type coverage
- async-python.md - Async for I/O-bound performance
- pythonic-patterns.md - Generators and itertools for efficiency
- python-testing.md - Performance testing and benchmarking