Skip to content
Home / Skills / Python / Performance
PY

Performance

Python performance v1.0.0

Python Performance Optimization

Performance optimization in Python requires understanding the language’s execution model, profiling bottlenecks, and applying appropriate optimization strategies. This skill covers profiling tools like cProfile and memory_profiler, optimization techniques for CPU-bound and memory-intensive workloads, leveraging NumPy for numerical computations, multiprocessing for parallel execution, and working around the Global Interpreter Lock (GIL). Effective performance tuning balances code clarity with execution speed for production systems.

Key Concepts

  • Profiling - Measuring code execution time and resource usage to identify bottlenecks
  • cProfile - Built-in deterministic profiler for function-level performance analysis
  • Memory Profiling - Tracking memory allocation, leaks, and optimization opportunities
  • Global Interpreter Lock (GIL) - CPython lock preventing true multi-threaded parallelism for CPU-bound tasks
  • NumPy Vectorization - Replacing Python loops with optimized array operations for numerical computing
  • Multiprocessing - True parallelism using separate Python processes, bypassing GIL
  • Algorithmic Complexity - Big-O analysis for selecting efficient data structures and algorithms
  • Lazy Evaluation - Deferring computation using generators and itertools for memory efficiency
  • Cython - Compiling Python to C for performance-critical code sections
  • Concurrency Models - Threading (I/O-bound), multiprocessing (CPU-bound), asyncio (I/O-bound)

Best Practices

  1. Profile before optimizing - Measure actual bottlenecks with profilers; don’t guess where to optimize
  2. Optimize algorithms first - Improving O(n²) to O(n log n) beats micro-optimizations
  3. Use appropriate data structures - Choose set/dict lookups (O(1)) over list searches (O(n))
  4. Vectorize with NumPy - Replace Python loops with NumPy array operations for 10-100x speedups
  5. Leverage built-in functions - Use map(), filter(), sum() implemented in C for better performance
  6. Avoid premature optimization - Write clear code first; optimize proven bottlenecks later
  7. Use multiprocessing for CPU-bound work - Bypass GIL with process-based parallelism
  8. Cache expensive computations - Use functools.lru_cache for repeated function calls
  9. Profile memory usage - Identify memory leaks and optimize large data processing
  10. Consider Cython for hot paths - Compile performance-critical sections to C extensions

Code Examples

CPU Profiling with cProfile

# ✅ GOOD: Comprehensive profiling workflow
import cProfile
import pstats
from io import StringIO
from functools import wraps
from typing import Callable, TypeVar, ParamSpec

P = ParamSpec('P')
T = TypeVar('T')

def profile_function(func: Callable[P, T]) -> Callable[P, T]:
    """
    Decorator to profile function execution.
    
    Prints top 20 functions by cumulative time.
    """
    @wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        profiler = cProfile.Profile()
        profiler.enable()
        
        try:
            result = func(*args, **kwargs)
            return result
        finally:
            profiler.disable()
            
            # Print statistics
            stream = StringIO()
            stats = pstats.Stats(profiler, stream=stream)
            stats.sort_stats('cumulative')
            stats.print_stats(20)
            print(stream.getvalue())
    
    return wrapper

@profile_function
def process_large_dataset(data: list[dict]) -> list[dict]:
    """Process dataset with profiling enabled."""
    result = []
    for item in data:
        processed = expensive_computation(item)
        result.append(processed)
    return result

# Command-line profiling
"""
# Profile entire script
python -m cProfile -o profile.stats my_script.py

# Analyze profile data
python -m pstats profile.stats
% sort cumulative
% stats 20
% callers expensive_function
% callees expensive_function
"""

# Programmatic profiling analysis
def analyze_profile_stats(profile_file: str) -> None:
    """
    Analyze profile statistics and identify bottlenecks.
    
    Args:
        profile_file: Path to .stats file from cProfile
    """
    stats = pstats.Stats(profile_file)
    
    print("=" * 80)
    print("Top 10 functions by cumulative time:")
    print("=" * 80)
    stats.sort_stats('cumulative')
    stats.print_stats(10)
    
    print("\n" + "=" * 80)
    print("Top 10 functions by total time:")
    print("=" * 80)
    stats.sort_stats('tottime')
    stats.print_stats(10)
    
    print("\n" + "=" * 80)
    print("Functions called most frequently:")
    print("=" * 80)
    stats.sort_stats('ncalls')
    stats.print_stats(10)

# ❌ BAD: No profiling, guessing at bottlenecks
def process_large_dataset(data):
    # Optimizing this loop because "loops are slow"
    # without profiling to confirm it's the bottleneck
    return [expensive_computation(item) for item in data]

Memory Profiling and Optimization

# ✅ GOOD: Memory profiling with memory_profiler
from memory_profiler import profile
from typing import Iterator
import sys

@profile
def memory_intensive_function(n: int) -> list[int]:
    """
    Process large dataset with memory profiling.
    
    Decorator shows line-by-line memory usage.
    """
    # Memory spike: creates large list
    data = list(range(n))  # ~40 MB for n=10M
    
    # Memory efficient: process in chunks
    result = []
    for i in range(0, len(data), 1000):
        chunk = data[i:i+1000]
        result.extend(process_chunk(chunk))
    
    return result

# Run with: python -m memory_profiler script.py
"""
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     8     50.5 MiB     50.5 MiB           1   @profile
     9                                         def memory_intensive_function(n: int):
    14     91.2 MiB     40.7 MiB           1       data = list(range(n))
    17     91.2 MiB      0.0 MiB           1       result = []
    18     95.3 MiB      4.1 MiB       10001       for i in range(0, len(data), 1000):
    19     95.3 MiB      0.0 MiB       10000           chunk = data[i:i+1000]
    20     95.3 MiB      0.0 MiB       10000           result.extend(process_chunk(chunk))
"""

# Memory-efficient generator approach
def memory_efficient_processor(n: int) -> Iterator[int]:
    """
    Process large dataset using generator.
    
    Yields results one at a time without loading all into memory.
    """
    for i in range(n):
        yield expensive_computation(i)

# Memory usage comparison
def compare_memory_usage() -> None:
    """Compare memory usage of different approaches."""
    import tracemalloc
    
    # Track memory allocation
    tracemalloc.start()
    
    # Approach 1: Load all into memory
    snapshot1 = tracemalloc.take_snapshot()
    data1 = list(range(10_000_000))
    snapshot2 = tracemalloc.take_snapshot()
    
    stats = snapshot2.compare_to(snapshot1, 'lineno')
    print("Memory allocated:")
    for stat in stats[:3]:
        print(stat)
    
    # Peak memory usage
    current, peak = tracemalloc.get_traced_memory()
    print(f"Peak memory: {peak / 1024 / 1024:.1f} MB")
    
    tracemalloc.stop()

# ❌ BAD: Unnecessary memory allocation
def process_data(n: int) -> int:
    # Creates unnecessary intermediate lists
    data = list(range(n))  # 40 MB
    squared = [x**2 for x in data]  # Another 40 MB
    filtered = [x for x in squared if x % 2 == 0]  # Another list
    return sum(filtered)

# ✅ Fix: Use generators for memory efficiency
def process_data_efficient(n: int) -> int:
    return sum(x**2 for x in range(n) if (x**2) % 2 == 0)
    # Constant memory usage

NumPy Vectorization for Performance

# ✅ GOOD: NumPy vectorization for numerical computing
import numpy as np
from typing import Final
import time

def benchmark_comparison() -> None:
    """Compare pure Python vs NumPy performance."""
    size: Final[int] = 1_000_000
    
    # Pure Python approach
    start = time.perf_counter()
    python_list = list(range(size))
    python_result = [x**2 + 2*x + 1 for x in python_list]
    python_time = time.perf_counter() - start
    
    # NumPy vectorized approach
    start = time.perf_counter()
    numpy_array = np.arange(size)
    numpy_result = numpy_array**2 + 2*numpy_array + 1
    numpy_time = time.perf_counter() - start
    
    print(f"Python time: {python_time:.3f}s")
    print(f"NumPy time:  {numpy_time:.3f}s")
    print(f"Speedup:     {python_time/numpy_time:.1f}x")
    
    # Typical output:
    # Python time: 0.234s
    # NumPy time:  0.008s
    # Speedup:     29.3x

# NumPy operations for data processing
def process_numerical_data(data: np.ndarray) -> dict[str, float]:
    """
    Process numerical data using vectorized operations.
    
    Args:
        data: NumPy array of numerical values
        
    Returns:
        Dictionary of computed statistics
    """
    # All operations vectorized, computed in C
    return {
        "mean": np.mean(data),
        "std": np.std(data),
        "median": np.median(data),
        "min": np.min(data),
        "max": np.max(data),
        "percentile_95": np.percentile(data, 95),
    }

# Matrix operations with NumPy
def matrix_computation(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    """
    Perform matrix multiplication using NumPy.
    
    Uses optimized BLAS libraries for massive speedup.
    """
    # Pure Python would be O(n³) and extremely slow
    # NumPy uses optimized BLAS: 100-1000x faster
    return np.dot(a, b)

# Broadcasting for element-wise operations
def normalize_features(features: np.ndarray) -> np.ndarray:
    """
    Normalize features using broadcasting.
    
    Args:
        features: 2D array of shape (n_samples, n_features)
        
    Returns:
        Normalized features with zero mean and unit variance
    """
    mean = np.mean(features, axis=0)  # Shape: (n_features,)
    std = np.std(features, axis=0)    # Shape: (n_features,)
    
    # Broadcasting: automatically expands mean/std to match features shape
    normalized = (features - mean) / std
    return normalized

# ❌ BAD: Python loops instead of NumPy vectorization
def process_numerical_data_slow(data: list[float]) -> dict[str, float]:
    total = sum(data)
    mean = total / len(data)
    
    variance = sum((x - mean)**2 for x in data) / len(data)
    std = variance ** 0.5
    
    sorted_data = sorted(data)
    median = sorted_data[len(data) // 2]
    
    # 10-100x slower than NumPy
    return {"mean": mean, "std": std, "median": median}

Multiprocessing for CPU-Bound Parallelism

# ✅ GOOD: Multiprocessing to bypass GIL
import multiprocessing as mp
from multiprocessing import Pool
from typing import Callable, TypeVar, Sequence
from functools import partial
import time

T = TypeVar('T')
R = TypeVar('R')

def parallel_map(
    func: Callable[[T], R],
    items: Sequence[T],
    *,
    processes: int | None = None
) -> list[R]:
    """
    Apply function to items in parallel using multiprocessing.
    
    Args:
        func: Function to apply to each item
        items: Sequence of items to process
        processes: Number of worker processes (default: CPU count)
        
    Returns:
        List of results in same order as input
        
    Example:
        >>> results = parallel_map(expensive_computation, range(100))
    """
    if processes is None:
        processes = mp.cpu_count()
    
    with Pool(processes=processes) as pool:
        results = pool.map(func, items)
    
    return results

def cpu_intensive_task(n: int) -> int:
    """Simulate CPU-intensive computation."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def benchmark_parallelism() -> None:
    """Compare sequential vs parallel execution."""
    tasks = [10_000_000] * 8  # 8 heavy tasks
    
    # Sequential execution
    start = time.perf_counter()
    results_seq = [cpu_intensive_task(n) for n in tasks]
    seq_time = time.perf_counter() - start
    
    # Parallel execution
    start = time.perf_counter()
    results_par = parallel_map(cpu_intensive_task, tasks)
    par_time = time.perf_counter() - start
    
    print(f"Sequential: {seq_time:.2f}s")
    print(f"Parallel:   {par_time:.2f}s")
    print(f"Speedup:    {seq_time/par_time:.1f}x")
    
    # On 8-core machine:
    # Sequential: 12.45s
    # Parallel:   1.78s
    # Speedup:    7.0x

# Process pool for long-running workers
class DataProcessor:
    """Parallel data processor with worker pool."""
    
    def __init__(self, num_workers: int | None = None):
        self.num_workers = num_workers or mp.cpu_count()
        self.pool: Pool | None = None
    
    def __enter__(self) -> "DataProcessor":
        self.pool = Pool(processes=self.num_workers)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        if self.pool:
            self.pool.close()
            self.pool.join()
    
    def process_batch(
        self,
        items: Sequence[dict],
        chunk_size: int = 100
    ) -> list[dict]:
        """
        Process items in parallel chunks.
        
        Args:
            items: Items to process
            chunk_size: Number of items per worker
            
        Returns:
            Processed results
        """
        if not self.pool:
            raise RuntimeError("Processor not initialized")
        
        # Process in chunks for better efficiency
        results = self.pool.map(
            self._process_item,
            items,
            chunksize=chunk_size
        )
        return results
    
    @staticmethod
    def _process_item(item: dict) -> dict:
        """Process single item (must be picklable)."""
        # CPU-intensive processing
        return {"result": item["value"] ** 2}

# Usage
with DataProcessor(num_workers=4) as processor:
    items = [{"value": i} for i in range(10000)]
    results = processor.process_batch(items)

# ❌ BAD: Threading for CPU-bound work (GIL prevents parallelism)
import threading

def process_with_threading(items: list[int]) -> list[int]:
    results = []
    threads = []
    
    def worker(item):
        result = cpu_intensive_task(item)
        results.append(result)
    
    for item in items:
        thread = threading.Thread(target=worker, args=(item,))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    return results
    # No speedup! GIL prevents parallel execution

Caching and Memoization

# ✅ GOOD: Caching expensive computations
from functools import lru_cache, cache
from typing import Final
import time

@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
    """
    Expensive computation with LRU cache.
    
    Cache stores up to 128 most recent results.
    Subsequent calls with same argument return cached result.
    """
    time.sleep(0.1)  # Simulate expensive operation
    return n ** 2 + 2 * n + 1

@cache  # Python 3.9+: unbounded cache
def fibonacci(n: int) -> int:
    """
    Fibonacci with memoization.
    
    Without cache: O(2ⁿ) - exponential time
    With cache: O(n) - linear time
    """
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

# Benchmark caching impact
def benchmark_caching() -> None:
    """Demonstrate caching performance improvement."""
    # First call: executes function
    start = time.perf_counter()
    result1 = expensive_computation(42)
    time1 = time.perf_counter() - start
    
    # Second call: returns cached result
    start = time.perf_counter()
    result2 = expensive_computation(42)
    time2 = time.perf_counter() - start
    
    print(f"First call:  {time1*1000:.1f}ms")
    print(f"Cached call: {time2*1000:.3f}ms")
    print(f"Speedup:     {time1/time2:.0f}x")
    
    # Output:
    # First call:  100.2ms
    # Cached call: 0.001ms
    # Speedup:     100200x

# Cache info and clearing
def analyze_cache_performance() -> None:
    """Analyze cache hit rate and efficiency."""
    # Warm up cache
    for i in range(100):
        fibonacci(i % 20)
    
    # Check cache statistics
    info = fibonacci.cache_info()
    print(f"Hits:   {info.hits}")
    print(f"Misses: {info.misses}")
    print(f"Hit rate: {info.hits / (info.hits + info.misses):.1%}")
    
    # Clear cache if needed
    fibonacci.cache_clear()

# Custom caching for complex scenarios
from typing import Callable, TypeVar, ParamSpec
import pickle

P = ParamSpec('P')
T = TypeVar('T')

class DiskCache:
    """Cache function results to disk for persistence."""
    
    def __init__(self, cache_dir: str = ".cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
    
    def __call__(self, func: Callable[P, T]) -> Callable[P, T]:
        @wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            # Generate cache key from arguments
            key = f"{func.__name__}_{hash((args, tuple(sorted(kwargs.items()))))}"
            cache_file = self.cache_dir / f"{key}.pkl"
            
            # Return cached result if exists
            if cache_file.exists():
                with open(cache_file, 'rb') as f:
                    return pickle.load(f)
            
            # Compute and cache result
            result = func(*args, **kwargs)
            with open(cache_file, 'wb') as f:
                pickle.dump(result, f)
            
            return result
        
        return wrapper

@DiskCache(cache_dir=".cache")
def expensive_data_processing(input_file: str) -> dict:
    """Process file with disk-persisted cache."""
    # Expensive processing
    return {"processed": True}

# ❌ BAD: No caching, recomputing same values
def fibonacci_slow(n: int) -> int:
    if n < 2:
        return n
    return fibonacci_slow(n - 1) + fibonacci_slow(n - 2)
    # fibonacci_slow(40) takes ~30 seconds
    # fibonacci(40) with cache takes ~0.0001 seconds

Algorithm and Data Structure Optimization

# ✅ GOOD: Choosing optimal data structures
from collections import deque, defaultdict, Counter
from typing import Sequence, Hashable

# Use sets for membership testing
def find_common_elements_optimized(
    list1: Sequence[Hashable],
    list2: Sequence[Hashable]
) -> set[Hashable]:
    """
    Find common elements using sets (O(n)).
    
    Converting to sets makes lookup O(1) instead of O(n).
    """
    set1 = set(list1)
    set2 = set(list2)
    return set1 & set2  # Set intersection: O(min(len(set1), len(set2)))

# ❌ BAD: Nested loops (O(n²))
def find_common_elements_slow(list1: list, list2: list) -> list:
    common = []
    for item1 in list1:  # O(n)
        if item1 in list2:  # O(n) for list lookup
            common.append(item1)
    return common
    # Total: O(n²)

# Use deque for efficient queue operations
def process_queue_optimized(items: Sequence[str]) -> None:
    """Use deque for O(1) appendleft and popleft."""
    queue: deque[str] = deque(items)
    
    while queue:
        item = queue.popleft()  # O(1)
        process(item)
        
        if needs_reprocessing(item):
            queue.append(item)  # O(1)

# ❌ BAD: List as queue (O(n) for pop(0))
def process_queue_slow(items: list[str]) -> None:
    queue = list(items)
    
    while queue:
        item = queue.pop(0)  # O(n) - shifts all elements
        process(item)

# Use Counter for frequency counting
def find_most_common_optimized(items: Sequence[str], n: int = 10) -> list[tuple[str, int]]:
    """Find n most common items efficiently."""
    return Counter(items).most_common(n)  # O(n log n)

# ❌ BAD: Manual counting
def find_most_common_slow(items: list[str], n: int = 10) -> list[tuple[str, int]]:
    counts = {}
    for item in items:
        counts[item] = counts.get(item, 0) + 1
    
    sorted_items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
    return sorted_items[:n]
    # More code, same complexity, but slower due to Python overhead

# Use defaultdict to avoid key checking
def group_by_category_optimized(
    items: Sequence[dict]
) -> dict[str, list[dict]]:
    """Group items by category using defaultdict."""
    groups: defaultdict[str, list[dict]] = defaultdict(list)
    
    for item in items:
        groups[item["category"]].append(item)
    
    return dict(groups)

# ❌ BAD: Manual key checking
def group_by_category_slow(items: list[dict]) -> dict[str, list[dict]]:
    groups = {}
    
    for item in items:
        category = item["category"]
        if category not in groups:
            groups[category] = []
        groups[category].append(item)
    
    return groups

Anti-Patterns

Premature Optimization

# ❌ Avoid: Optimizing before profiling
def process_data(items):
    # Complex, unreadable optimization attempt
    return [y for x in items if (y := expensive_func(x)) and condition(y)]

# ✅ Fix: Write clear code first, profile, then optimize if needed
def process_data(items):
    results = []
    for item in items:
        result = expensive_func(item)
        if condition(result):
            results.append(result)
    return results
    # Profile shows expensive_func is bottleneck? Add @lru_cache

Using Threading for CPU-Bound Work

# ❌ Avoid: Threading doesn't help CPU-bound tasks (GIL)
import threading

def parallel_compute(data):
    threads = [threading.Thread(target=cpu_work, args=(x,)) for x in data]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    # No speedup due to GIL

# ✅ Fix: Use multiprocessing for CPU-bound work
import multiprocessing
def parallel_compute(data):
    with multiprocessing.Pool() as pool:
        results = pool.map(cpu_work, data)
    return results

Unnecessary List Comprehensions

# ❌ Avoid: Creating intermediate lists
def process_pipeline(data):
    step1 = [transform1(x) for x in data]  # Full list
    step2 = [transform2(x) for x in step1]  # Another full list
    step3 = [transform3(x) for x in step2]  # Another full list
    return sum(step3)

# ✅ Fix: Use generator expressions
def process_pipeline(data):
    return sum(
        transform3(transform2(transform1(x)))
        for x in data
    )  # Constant memory

Testing Strategies

Performance Regression Testing

import pytest
import time

def test_query_performance() -> None:
    """Ensure query performance doesn't regress."""
    start = time.perf_counter()
    result = expensive_query()
    duration = time.perf_counter() - start
    
    assert duration < 1.0, f"Query took {duration:.2f}s, threshold 1.0s"
    assert len(result) > 0

@pytest.mark.benchmark
def test_processing_throughput(benchmark) -> None:
    """Benchmark processing throughput."""
    data = generate_test_data(size=10000)
    
    result = benchmark(process_data, data)
    
    assert result is not None
    # pytest-benchmark provides statistics automatically

Memory Leak Detection

import tracemalloc
import gc

def test_no_memory_leak() -> None:
    """Test that repeated operations don't leak memory."""
    gc.collect()
    tracemalloc.start()
    
    # Baseline
    snapshot1 = tracemalloc.take_snapshot()
    
    # Run operations multiple times
    for _ in range(100):
        process_data(generate_data())
    
    gc.collect()
    snapshot2 = tracemalloc.take_snapshot()
    
    # Compare memory usage
    stats = snapshot2.compare_to(snapshot1, 'lineno')
    total_diff = sum(stat.size_diff for stat in stats)
    
    # Allow small growth, but catch leaks
    assert total_diff < 10 * 1024 * 1024, f"Memory leaked: {total_diff / 1024 / 1024:.1f} MB"
    
    tracemalloc.stop()

References