Skip to content
Home / Skills / Python / Asynchronous Python
PY

Asynchronous Python

Python concurrency v1.0.0

Asynchronous Python

Asynchronous programming in Python enables efficient I/O-bound operations through non-blocking code execution. Using the asyncio framework and async/await syntax, applications can handle thousands of concurrent connections without thread overhead. This skill covers coroutine patterns, event loop management, async HTTP clients with aiohttp, and building high-performance APIs with FastAPI. Mastering async Python is essential for building scalable web services, real-time applications, and I/O-intensive workloads.

Key Concepts

  • Event Loop - Central execution mechanism that schedules and runs async tasks, handles I/O operations, and manages callbacks
  • Coroutines - Functions defined with async def that can be paused and resumed, returning awaitable objects
  • Async/Await Syntax - async declares coroutines, await suspends execution until awaitable completes
  • Tasks - Wrapped coroutines scheduled to run concurrently on the event loop using asyncio.create_task()
  • Futures - Low-level awaitable objects representing eventual results of async operations
  • Async Context Managers - Resources managed with async with for proper setup/teardown in async code
  • Async Iterators - Asynchronous iteration with async for over streams or paginated APIs
  • Concurrency Primitives - asyncio.Lock, Semaphore, Queue, Event for coordinating async tasks
  • aiohttp - Async HTTP client/server library for making non-blocking HTTP requests
  • FastAPI - Modern async web framework with automatic OpenAPI generation and dependency injection

Best Practices

  1. Always await coroutines - Never forget await keyword; unwwaited coroutines are never executed
  2. Use asyncio.gather() for concurrent operations - Run multiple coroutines in parallel and collect results
  3. Avoid blocking calls - Never use blocking I/O (requests, time.sleep) in async functions; use async alternatives
  4. Set timeouts - Always use asyncio.wait_for() or timeout parameters to prevent hanging operations
  5. Handle task cancellation - Properly catch asyncio.CancelledError and clean up resources
  6. Use async context managers - Ensure resources (connections, files) are properly closed with async with
  7. Limit concurrency with Semaphore - Control concurrent task execution to avoid overwhelming resources
  8. Prefer TaskGroups (Python 3.11+) - Use structured concurrency with async with asyncio.TaskGroup()
  9. Type hint async functions - Annotate return types as Coroutine or use -> ReturnType directly
  10. Run CPU-bound work in executor - Use loop.run_in_executor() for blocking CPU-intensive operations

Code Examples

Basic Async/Await Patterns

# ✅ GOOD: Proper async function with error handling and timeout
import asyncio
from typing import Any
from collections.abc import Coroutine

async def fetch_user_data(user_id: int) -> dict[str, Any]:
    """
    Fetch user data with timeout and error handling.
    
    Args:
        user_id: User identifier
        
    Returns:
        User data dictionary
        
    Raises:
        asyncio.TimeoutError: If request exceeds 5 seconds
        ValueError: If user_id is invalid
    """
    if user_id <= 0:
        raise ValueError(f"Invalid user_id: {user_id}")
    
    async def _fetch() -> dict[str, Any]:
        await asyncio.sleep(0.1)  # Simulate API call
        return {"id": user_id, "name": f"User{user_id}"}
    
    try:
        return await asyncio.wait_for(_fetch(), timeout=5.0)
    except asyncio.TimeoutError:
        raise asyncio.TimeoutError(f"Fetch user {user_id} timed out")

async def main() -> None:
    """Run multiple async operations concurrently."""
    # Concurrent execution with gather
    results = await asyncio.gather(
        fetch_user_data(1),
        fetch_user_data(2),
        fetch_user_data(3),
        return_exceptions=True  # Don't fail all on single error
    )
    
    for result in results:
        if isinstance(result, Exception):
            print(f"Error: {result}")
        else:
            print(f"Success: {result}")

# Run event loop
if __name__ == "__main__":
    asyncio.run(main())

# ❌ BAD: Missing await, no timeout, no error handling
async def fetch_user_data(user_id):
    asyncio.sleep(0.1)  # Missing await - this does nothing!
    return {"id": user_id}

async def main():
    fetch_user_data(1)  # Missing await - coroutine never runs!

Concurrent Task Execution with TaskGroup (Python 3.11+)

# ✅ GOOD: Structured concurrency with TaskGroup
import asyncio
from typing import TypeAlias
from collections.abc import Sequence

UserId: TypeAlias = int
UserData: TypeAlias = dict[str, str | int]

async def process_user(user_id: UserId) -> UserData:
    """Process individual user with simulated delay."""
    await asyncio.sleep(0.1)
    return {"id": user_id, "status": "processed"}

async def process_users_concurrent(
    user_ids: Sequence[UserId]
) -> list[UserData]:
    """
    Process multiple users concurrently using TaskGroup.
    
    TaskGroup provides structured concurrency:
    - Automatically cancels remaining tasks if one fails
    - Ensures all tasks complete before exiting context
    - Collects exceptions and raises them as ExceptionGroup
    
    Args:
        user_ids: User IDs to process
        
    Returns:
        List of processed user data
        
    Raises:
        ExceptionGroup: If any tasks fail
    """
    results: list[UserData] = []
    
    async with asyncio.TaskGroup() as tg:
        tasks = [
            tg.create_task(process_user(uid))
            for uid in user_ids
        ]
    
    # Tasks are guaranteed complete here
    results = [task.result() for task in tasks]
    return results

# ❌ BAD: Unstructured concurrency, no error handling
async def process_users_concurrent(user_ids):
    tasks = [asyncio.create_task(process_user(uid)) for uid in user_ids]
    return await asyncio.gather(*tasks)  # Tasks may leak on error

Async Context Managers and Resource Management

# ✅ GOOD: Async context manager for database connection
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from collections.abc import AsyncIterator as AsyncIteratorABC

import asyncpg

class DatabasePool:
    """Async database connection pool manager."""
    
    def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self._pool: asyncpg.Pool | None = None
    
    async def connect(self) -> None:
        """Initialize connection pool."""
        self._pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=60
        )
    
    async def disconnect(self) -> None:
        """Close all connections in pool."""
        if self._pool:
            await self._pool.close()
            self._pool = None
    
    @asynccontextmanager
    async def acquire(self) -> AsyncIterator[asyncpg.Connection]:
        """
        Acquire database connection from pool.
        
        Yields:
            Database connection that is automatically returned to pool
            
        Example:
            async with db_pool.acquire() as conn:
                result = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
        """
        if not self._pool:
            raise RuntimeError("Pool not initialized. Call connect() first.")
        
        async with self._pool.acquire() as connection:
            yield connection

# Usage with FastAPI lifespan
from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """
    Manage application lifecycle.
    
    Setup runs on startup, teardown on shutdown.
    """
    # Startup
    db_pool = DatabasePool("postgresql://user:pass@localhost/db")
    await db_pool.connect()
    app.state.db_pool = db_pool
    
    yield  # Application runs
    
    # Shutdown
    await db_pool.disconnect()

app = FastAPI(lifespan=lifespan)

@app.get("/users/{user_id}")
async def get_user(user_id: int) -> dict[str, Any]:
    """Fetch user from database."""
    async with app.state.db_pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT id, name, email FROM users WHERE id = $1",
            user_id
        )
        if not row:
            raise HTTPException(status_code=404, detail="User not found")
        return dict(row)

# ❌ BAD: No resource cleanup, connection leaks
async def get_user(user_id: int):
    conn = await asyncpg.connect("postgresql://...")
    row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
    # Connection never closed!
    return dict(row)

Async HTTP Client with aiohttp

# ✅ GOOD: aiohttp client with retry logic and timeouts
import asyncio
from typing import Any, Final
from collections.abc import Sequence

import aiohttp
from aiohttp import ClientSession, ClientTimeout, ClientError

MAX_RETRIES: Final[int] = 3
BACKOFF_FACTOR: Final[float] = 2.0

class AsyncAPIClient:
    """HTTP client with connection pooling and retry logic."""
    
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url.rstrip("/")
        self.api_key = api_key
        self._session: ClientSession | None = None
    
    async def __aenter__(self) -> "AsyncAPIClient":
        """Create session on context enter."""
        timeout = ClientTimeout(total=30, connect=10)
        self._session = ClientSession(
            base_url=self.base_url,
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=timeout,
            raise_for_status=True
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Close session on context exit."""
        if self._session:
            await self._session.close()
            self._session = None
    
    async def get_with_retry(
        self,
        path: str,
        *,
        params: dict[str, str] | None = None
    ) -> dict[str, Any]:
        """
        Make GET request with exponential backoff retry.
        
        Args:
            path: API endpoint path (e.g., '/users/123')
            params: Optional query parameters
            
        Returns:
            JSON response as dictionary
            
        Raises:
            aiohttp.ClientError: After all retries exhausted
        """
        if not self._session:
            raise RuntimeError("Client not initialized. Use async with.")
        
        last_exception: Exception | None = None
        
        for attempt in range(MAX_RETRIES):
            try:
                async with self._session.get(path, params=params) as response:
                    return await response.json()
            except ClientError as e:
                last_exception = e
                if attempt < MAX_RETRIES - 1:
                    delay = BACKOFF_FACTOR ** attempt
                    await asyncio.sleep(delay)
                    continue
                raise
        
        raise RuntimeError(f"All retries failed: {last_exception}")

# Concurrent API calls with rate limiting
async def fetch_multiple_users(
    user_ids: Sequence[int],
    client: AsyncAPIClient,
    *,
    max_concurrent: int = 10
) -> list[dict[str, Any]]:
    """
    Fetch multiple users with concurrency limit.
    
    Args:
        user_ids: User IDs to fetch
        client: Initialized API client
        max_concurrent: Maximum concurrent requests
        
    Returns:
        List of user data dictionaries
    """
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_one(user_id: int) -> dict[str, Any]:
        async with semaphore:
            return await client.get_with_retry(f"/users/{user_id}")
    
    return await asyncio.gather(*[fetch_one(uid) for uid in user_ids])

# Usage
async def main() -> None:
    async with AsyncAPIClient(
        base_url="https://api.example.com",
        api_key="secret"
    ) as client:
        users = await fetch_multiple_users([1, 2, 3, 4, 5], client)
        print(f"Fetched {len(users)} users")

# ❌ BAD: No connection pooling, no retry, no timeout
import requests  # Blocking library in async code!

async def fetch_user(user_id: int):
    response = requests.get(f"https://api.example.com/users/{user_id}")
    return response.json()

FastAPI Async Endpoints with Dependency Injection

# ✅ GOOD: FastAPI async endpoints with proper dependencies
from typing import Annotated
from collections.abc import AsyncIterator

from fastapi import Depends, FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

# Models
class UserCreate(BaseModel):
    """User creation request."""
    name: str = Field(..., min_length=1, max_length=100)
    email: str = Field(..., pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")

class UserResponse(BaseModel):
    """User response."""
    id: int
    name: str
    email: str
    
    model_config = {"from_attributes": True}

# Database
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=False,
    pool_pre_ping=True,
    pool_size=10,
    max_overflow=20
)

AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

async def get_db_session() -> AsyncIterator[AsyncSession]:
    """
    Database session dependency.
    
    Yields:
        Database session that is automatically closed
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Service layer
class UserService:
    """Business logic for user operations."""
    
    def __init__(self, session: AsyncSession):
        self.session = session
    
    async def create_user(self, user_data: UserCreate) -> UserResponse:
        """Create new user."""
        # Simulate async database operation
        await asyncio.sleep(0.1)
        user = UserResponse(id=1, name=user_data.name, email=user_data.email)
        return user
    
    async def get_user(self, user_id: int) -> UserResponse | None:
        """Get user by ID."""
        await asyncio.sleep(0.1)
        if user_id == 1:
            return UserResponse(id=1, name="John", email="john@example.com")
        return None

async def get_user_service(
    session: Annotated[AsyncSession, Depends(get_db_session)]
) -> UserService:
    """User service dependency."""
    return UserService(session)

# API endpoints
app = FastAPI(title="Async API")

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(
    user_data: UserCreate,
    service: Annotated[UserService, Depends(get_user_service)]
) -> UserResponse:
    """
    Create a new user.
    
    Args:
        user_data: User creation data
        service: Injected user service
        
    Returns:
        Created user data
    """
    return await service.create_user(user_data)

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int,
    service: Annotated[UserService, Depends(get_user_service)]
) -> UserResponse:
    """
    Get user by ID.
    
    Args:
        user_id: User identifier
        service: Injected user service
        
    Returns:
        User data
        
    Raises:
        HTTPException: 404 if user not found
    """
    user = await service.get_user(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.get("/users", response_model=list[UserResponse])
async def list_users(
    limit: Annotated[int, Query(ge=1, le=100)] = 10,
    offset: Annotated[int, Query(ge=0)] = 0,
    service: Annotated[UserService, Depends(get_user_service)]
) -> list[UserResponse]:
    """
    List users with pagination.
    
    Args:
        limit: Maximum number of users to return
        offset: Number of users to skip
        service: Injected user service
        
    Returns:
        List of users
    """
    # Simulate async query
    await asyncio.sleep(0.1)
    return [
        UserResponse(id=i, name=f"User{i}", email=f"user{i}@example.com")
        for i in range(offset, offset + limit)
    ]

# ❌ BAD: Blocking database calls in async endpoint
@app.get("/users/{user_id}")
def get_user(user_id: int):  # Not async!
    import psycopg2  # Blocking library
    conn = psycopg2.connect("postgresql://...")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    return cursor.fetchone()

Async Iterators and Generators

# ✅ GOOD: Async generator for streaming data
import asyncio
from typing import AsyncIterator
from collections.abc import AsyncIterator as AsyncIteratorABC

async def fetch_page(page: int) -> list[dict[str, int]]:
    """Simulate fetching a page of data."""
    await asyncio.sleep(0.1)
    return [{"id": page * 10 + i, "value": i} for i in range(10)]

async def stream_all_records() -> AsyncIterator[dict[str, int]]:
    """
    Stream all records using async generator.
    
    Yields records one at a time without loading all into memory.
    
    Yields:
        Individual record dictionaries
        
    Example:
        async for record in stream_all_records():
            await process_record(record)
    """
    page = 0
    while True:
        records = await fetch_page(page)
        if not records:
            break
        
        for record in records:
            yield record
        
        page += 1

# Usage with async for
async def process_all_records() -> None:
    """Process records as they are streamed."""
    count = 0
    async for record in stream_all_records():
        # Process record without loading all into memory
        print(f"Processing record {record['id']}")
        count += 1
        if count >= 50:  # Limit for demo
            break

# Async comprehension
async def collect_ids() -> list[int]:
    """Collect all IDs using async comprehension."""
    return [
        record["id"]
        async for record in stream_all_records()
    ]

# ❌ BAD: Loading all data into memory before processing
async def process_all_records():
    all_records = []
    page = 0
    while True:
        records = await fetch_page(page)
        if not records:
            break
        all_records.extend(records)  # Memory grows unbounded
        page += 1
    
    for record in all_records:
        await process_record(record)

Anti-Patterns

Forgetting await Keyword

# ❌ Avoid: Coroutine not awaited
async def get_data():
    return {"data": "value"}

async def process():
    result = get_data()  # Returns coroutine object, not result!
    print(result)  # Prints: <coroutine object get_data at 0x...>

# ✅ Fix: Always await coroutines
async def process():
    result = await get_data()
    print(result)  # Prints: {'data': 'value'}

Blocking I/O in Async Functions

# ❌ Avoid: Blocking calls block entire event loop
import time
import requests

async def fetch_data():
    time.sleep(5)  # Blocks event loop for 5 seconds!
    response = requests.get("https://api.example.com")  # Blocking!
    return response.json()

# ✅ Fix: Use async alternatives
import asyncio
import aiohttp

async def fetch_data():
    await asyncio.sleep(5)  # Non-blocking
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as response:
            return await response.json()

Not Handling Task Cancellation

# ❌ Avoid: Resources leak on cancellation
async def long_running_task():
    file = open("data.txt", "w")
    await asyncio.sleep(10)
    file.write("done")
    file.close()  # Never reached if task cancelled

# ✅ Fix: Handle CancelledError and clean up
async def long_running_task():
    file = open("data.txt", "w")
    try:
        await asyncio.sleep(10)
        file.write("done")
    except asyncio.CancelledError:
        file.write("cancelled")
        raise  # Re-raise after cleanup
    finally:
        file.close()

Creating Tasks Without Storing References

# ❌ Avoid: Task garbage collected before completion
async def background_task():
    await asyncio.sleep(1)
    print("Task complete")

async def main():
    asyncio.create_task(background_task())  # No reference stored
    await asyncio.sleep(0.1)  # Task may be garbage collected!

# ✅ Fix: Store task references or use TaskGroup
async def main():
    task = asyncio.create_task(background_task())
    await asyncio.sleep(0.1)
    await task  # Ensure task completes

# Or use TaskGroup (Python 3.11+)
async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(background_task())
    # All tasks guaranteed complete here

Testing Strategies

Testing Async Functions with pytest-asyncio

# tests/test_async.py
import asyncio
from typing import Any

import pytest
import pytest_asyncio
from httpx import AsyncClient

from myapp.main import app
from myapp.services import UserService

# Mark module for async tests
pytestmark = pytest.mark.asyncio

@pytest_asyncio.fixture
async def async_client() -> AsyncClient:
    """Provide async test client for FastAPI app."""
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

async def test_create_user_endpoint(async_client: AsyncClient) -> None:
    """Test user creation endpoint."""
    response = await async_client.post(
        "/users",
        json={"name": "John Doe", "email": "john@example.com"}
    )
    assert response.status_code == 201
    data = response.json()
    assert data["name"] == "John Doe"
    assert "id" in data

async def test_concurrent_requests(async_client: AsyncClient) -> None:
    """Test handling multiple concurrent requests."""
    tasks = [
        async_client.get(f"/users/{i}")
        for i in range(1, 11)
    ]
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = [r for r in responses if not isinstance(r, Exception)]
    assert len(successful) >= 8  # Most should succeed

@pytest.mark.timeout(5)  # Fail if test takes longer than 5 seconds
async def test_timeout_handling() -> None:
    """Test that operations respect timeouts."""
    async def slow_operation():
        await asyncio.sleep(10)
    
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(slow_operation(), timeout=1.0)

Mocking Async Functions

# tests/test_mocking.py
from unittest.mock import AsyncMock, patch

import pytest

@pytest.mark.asyncio
async def test_api_client_with_mock() -> None:
    """Test API client with mocked responses."""
    mock_response = {"id": 1, "name": "Test User"}
    
    with patch("aiohttp.ClientSession.get") as mock_get:
        # Configure mock to return async context manager
        mock_get.return_value.__aenter__.return_value.json = AsyncMock(
            return_value=mock_response
        )
        mock_get.return_value.__aenter__.return_value.status = 200
        
        # Your test here
        # result = await client.get_user(1)
        # assert result == mock_response

Load Testing Async Endpoints

# tests/test_load.py
import asyncio
import time

import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_concurrent_load() -> None:
    """Test system under concurrent load."""
    async with AsyncClient(base_url="http://localhost:8000") as client:
        start = time.time()
        
        # Simulate 100 concurrent users
        tasks = [
            client.get("/users/1")
            for _ in range(100)
        ]
        
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        
        duration = time.time() - start
        successful = [r for r in responses if not isinstance(r, Exception)]
        
        print(f"Duration: {duration:.2f}s")
        print(f"Success rate: {len(successful)}/100")
        print(f"Requests/sec: {100/duration:.2f}")
        
        assert len(successful) >= 95  # 95% success rate
        assert duration < 5.0  # Complete within 5 seconds

References