PY
Asynchronous Python
Python concurrency v1.0.0
Asynchronous Python
Asynchronous programming in Python enables efficient I/O-bound operations through non-blocking code execution. Using the asyncio framework and async/await syntax, applications can handle thousands of concurrent connections without thread overhead. This skill covers coroutine patterns, event loop management, async HTTP clients with aiohttp, and building high-performance APIs with FastAPI. Mastering async Python is essential for building scalable web services, real-time applications, and I/O-intensive workloads.
Key Concepts
- Event Loop - Central execution mechanism that schedules and runs async tasks, handles I/O operations, and manages callbacks
- Coroutines - Functions defined with
async defthat can be paused and resumed, returning awaitable objects - Async/Await Syntax -
asyncdeclares coroutines,awaitsuspends execution until awaitable completes - Tasks - Wrapped coroutines scheduled to run concurrently on the event loop using
asyncio.create_task() - Futures - Low-level awaitable objects representing eventual results of async operations
- Async Context Managers - Resources managed with
async withfor proper setup/teardown in async code - Async Iterators - Asynchronous iteration with
async forover streams or paginated APIs - Concurrency Primitives - asyncio.Lock, Semaphore, Queue, Event for coordinating async tasks
- aiohttp - Async HTTP client/server library for making non-blocking HTTP requests
- FastAPI - Modern async web framework with automatic OpenAPI generation and dependency injection
Best Practices
- Always await coroutines - Never forget
awaitkeyword; unwwaited coroutines are never executed - Use asyncio.gather() for concurrent operations - Run multiple coroutines in parallel and collect results
- Avoid blocking calls - Never use blocking I/O (requests, time.sleep) in async functions; use async alternatives
- Set timeouts - Always use
asyncio.wait_for()or timeout parameters to prevent hanging operations - Handle task cancellation - Properly catch
asyncio.CancelledErrorand clean up resources - Use async context managers - Ensure resources (connections, files) are properly closed with
async with - Limit concurrency with Semaphore - Control concurrent task execution to avoid overwhelming resources
- Prefer TaskGroups (Python 3.11+) - Use structured concurrency with
async with asyncio.TaskGroup() - Type hint async functions - Annotate return types as
Coroutineor use-> ReturnTypedirectly - Run CPU-bound work in executor - Use
loop.run_in_executor()for blocking CPU-intensive operations
Code Examples
Basic Async/Await Patterns
# ✅ GOOD: Proper async function with error handling and timeout
import asyncio
from typing import Any
from collections.abc import Coroutine
async def fetch_user_data(user_id: int) -> dict[str, Any]:
"""
Fetch user data with timeout and error handling.
Args:
user_id: User identifier
Returns:
User data dictionary
Raises:
asyncio.TimeoutError: If request exceeds 5 seconds
ValueError: If user_id is invalid
"""
if user_id <= 0:
raise ValueError(f"Invalid user_id: {user_id}")
async def _fetch() -> dict[str, Any]:
await asyncio.sleep(0.1) # Simulate API call
return {"id": user_id, "name": f"User{user_id}"}
try:
return await asyncio.wait_for(_fetch(), timeout=5.0)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Fetch user {user_id} timed out")
async def main() -> None:
"""Run multiple async operations concurrently."""
# Concurrent execution with gather
results = await asyncio.gather(
fetch_user_data(1),
fetch_user_data(2),
fetch_user_data(3),
return_exceptions=True # Don't fail all on single error
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(f"Success: {result}")
# Run event loop
if __name__ == "__main__":
asyncio.run(main())
# ❌ BAD: Missing await, no timeout, no error handling
async def fetch_user_data(user_id):
asyncio.sleep(0.1) # Missing await - this does nothing!
return {"id": user_id}
async def main():
fetch_user_data(1) # Missing await - coroutine never runs!
Concurrent Task Execution with TaskGroup (Python 3.11+)
# ✅ GOOD: Structured concurrency with TaskGroup
import asyncio
from typing import TypeAlias
from collections.abc import Sequence
UserId: TypeAlias = int
UserData: TypeAlias = dict[str, str | int]
async def process_user(user_id: UserId) -> UserData:
"""Process individual user with simulated delay."""
await asyncio.sleep(0.1)
return {"id": user_id, "status": "processed"}
async def process_users_concurrent(
user_ids: Sequence[UserId]
) -> list[UserData]:
"""
Process multiple users concurrently using TaskGroup.
TaskGroup provides structured concurrency:
- Automatically cancels remaining tasks if one fails
- Ensures all tasks complete before exiting context
- Collects exceptions and raises them as ExceptionGroup
Args:
user_ids: User IDs to process
Returns:
List of processed user data
Raises:
ExceptionGroup: If any tasks fail
"""
results: list[UserData] = []
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(process_user(uid))
for uid in user_ids
]
# Tasks are guaranteed complete here
results = [task.result() for task in tasks]
return results
# ❌ BAD: Unstructured concurrency, no error handling
async def process_users_concurrent(user_ids):
tasks = [asyncio.create_task(process_user(uid)) for uid in user_ids]
return await asyncio.gather(*tasks) # Tasks may leak on error
Async Context Managers and Resource Management
# ✅ GOOD: Async context manager for database connection
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from collections.abc import AsyncIterator as AsyncIteratorABC
import asyncpg
class DatabasePool:
"""Async database connection pool manager."""
def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self._pool: asyncpg.Pool | None = None
async def connect(self) -> None:
"""Initialize connection pool."""
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
async def disconnect(self) -> None:
"""Close all connections in pool."""
if self._pool:
await self._pool.close()
self._pool = None
@asynccontextmanager
async def acquire(self) -> AsyncIterator[asyncpg.Connection]:
"""
Acquire database connection from pool.
Yields:
Database connection that is automatically returned to pool
Example:
async with db_pool.acquire() as conn:
result = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
"""
if not self._pool:
raise RuntimeError("Pool not initialized. Call connect() first.")
async with self._pool.acquire() as connection:
yield connection
# Usage with FastAPI lifespan
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""
Manage application lifecycle.
Setup runs on startup, teardown on shutdown.
"""
# Startup
db_pool = DatabasePool("postgresql://user:pass@localhost/db")
await db_pool.connect()
app.state.db_pool = db_pool
yield # Application runs
# Shutdown
await db_pool.disconnect()
app = FastAPI(lifespan=lifespan)
@app.get("/users/{user_id}")
async def get_user(user_id: int) -> dict[str, Any]:
"""Fetch user from database."""
async with app.state.db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
if not row:
raise HTTPException(status_code=404, detail="User not found")
return dict(row)
# ❌ BAD: No resource cleanup, connection leaks
async def get_user(user_id: int):
conn = await asyncpg.connect("postgresql://...")
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
# Connection never closed!
return dict(row)
Async HTTP Client with aiohttp
# ✅ GOOD: aiohttp client with retry logic and timeouts
import asyncio
from typing import Any, Final
from collections.abc import Sequence
import aiohttp
from aiohttp import ClientSession, ClientTimeout, ClientError
MAX_RETRIES: Final[int] = 3
BACKOFF_FACTOR: Final[float] = 2.0
class AsyncAPIClient:
"""HTTP client with connection pooling and retry logic."""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self._session: ClientSession | None = None
async def __aenter__(self) -> "AsyncAPIClient":
"""Create session on context enter."""
timeout = ClientTimeout(total=30, connect=10)
self._session = ClientSession(
base_url=self.base_url,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=timeout,
raise_for_status=True
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Close session on context exit."""
if self._session:
await self._session.close()
self._session = None
async def get_with_retry(
self,
path: str,
*,
params: dict[str, str] | None = None
) -> dict[str, Any]:
"""
Make GET request with exponential backoff retry.
Args:
path: API endpoint path (e.g., '/users/123')
params: Optional query parameters
Returns:
JSON response as dictionary
Raises:
aiohttp.ClientError: After all retries exhausted
"""
if not self._session:
raise RuntimeError("Client not initialized. Use async with.")
last_exception: Exception | None = None
for attempt in range(MAX_RETRIES):
try:
async with self._session.get(path, params=params) as response:
return await response.json()
except ClientError as e:
last_exception = e
if attempt < MAX_RETRIES - 1:
delay = BACKOFF_FACTOR ** attempt
await asyncio.sleep(delay)
continue
raise
raise RuntimeError(f"All retries failed: {last_exception}")
# Concurrent API calls with rate limiting
async def fetch_multiple_users(
user_ids: Sequence[int],
client: AsyncAPIClient,
*,
max_concurrent: int = 10
) -> list[dict[str, Any]]:
"""
Fetch multiple users with concurrency limit.
Args:
user_ids: User IDs to fetch
client: Initialized API client
max_concurrent: Maximum concurrent requests
Returns:
List of user data dictionaries
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(user_id: int) -> dict[str, Any]:
async with semaphore:
return await client.get_with_retry(f"/users/{user_id}")
return await asyncio.gather(*[fetch_one(uid) for uid in user_ids])
# Usage
async def main() -> None:
async with AsyncAPIClient(
base_url="https://api.example.com",
api_key="secret"
) as client:
users = await fetch_multiple_users([1, 2, 3, 4, 5], client)
print(f"Fetched {len(users)} users")
# ❌ BAD: No connection pooling, no retry, no timeout
import requests # Blocking library in async code!
async def fetch_user(user_id: int):
response = requests.get(f"https://api.example.com/users/{user_id}")
return response.json()
FastAPI Async Endpoints with Dependency Injection
# ✅ GOOD: FastAPI async endpoints with proper dependencies
from typing import Annotated
from collections.abc import AsyncIterator
from fastapi import Depends, FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
# Models
class UserCreate(BaseModel):
"""User creation request."""
name: str = Field(..., min_length=1, max_length=100)
email: str = Field(..., pattern=r"^[\w\.-]+@[\w\.-]+\.\w+$")
class UserResponse(BaseModel):
"""User response."""
id: int
name: str
email: str
model_config = {"from_attributes": True}
# Database
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=False,
pool_pre_ping=True,
pool_size=10,
max_overflow=20
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_db_session() -> AsyncIterator[AsyncSession]:
"""
Database session dependency.
Yields:
Database session that is automatically closed
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Service layer
class UserService:
"""Business logic for user operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def create_user(self, user_data: UserCreate) -> UserResponse:
"""Create new user."""
# Simulate async database operation
await asyncio.sleep(0.1)
user = UserResponse(id=1, name=user_data.name, email=user_data.email)
return user
async def get_user(self, user_id: int) -> UserResponse | None:
"""Get user by ID."""
await asyncio.sleep(0.1)
if user_id == 1:
return UserResponse(id=1, name="John", email="john@example.com")
return None
async def get_user_service(
session: Annotated[AsyncSession, Depends(get_db_session)]
) -> UserService:
"""User service dependency."""
return UserService(session)
# API endpoints
app = FastAPI(title="Async API")
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(
user_data: UserCreate,
service: Annotated[UserService, Depends(get_user_service)]
) -> UserResponse:
"""
Create a new user.
Args:
user_data: User creation data
service: Injected user service
Returns:
Created user data
"""
return await service.create_user(user_data)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
service: Annotated[UserService, Depends(get_user_service)]
) -> UserResponse:
"""
Get user by ID.
Args:
user_id: User identifier
service: Injected user service
Returns:
User data
Raises:
HTTPException: 404 if user not found
"""
user = await service.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.get("/users", response_model=list[UserResponse])
async def list_users(
limit: Annotated[int, Query(ge=1, le=100)] = 10,
offset: Annotated[int, Query(ge=0)] = 0,
service: Annotated[UserService, Depends(get_user_service)]
) -> list[UserResponse]:
"""
List users with pagination.
Args:
limit: Maximum number of users to return
offset: Number of users to skip
service: Injected user service
Returns:
List of users
"""
# Simulate async query
await asyncio.sleep(0.1)
return [
UserResponse(id=i, name=f"User{i}", email=f"user{i}@example.com")
for i in range(offset, offset + limit)
]
# ❌ BAD: Blocking database calls in async endpoint
@app.get("/users/{user_id}")
def get_user(user_id: int): # Not async!
import psycopg2 # Blocking library
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cursor.fetchone()
Async Iterators and Generators
# ✅ GOOD: Async generator for streaming data
import asyncio
from typing import AsyncIterator
from collections.abc import AsyncIterator as AsyncIteratorABC
async def fetch_page(page: int) -> list[dict[str, int]]:
"""Simulate fetching a page of data."""
await asyncio.sleep(0.1)
return [{"id": page * 10 + i, "value": i} for i in range(10)]
async def stream_all_records() -> AsyncIterator[dict[str, int]]:
"""
Stream all records using async generator.
Yields records one at a time without loading all into memory.
Yields:
Individual record dictionaries
Example:
async for record in stream_all_records():
await process_record(record)
"""
page = 0
while True:
records = await fetch_page(page)
if not records:
break
for record in records:
yield record
page += 1
# Usage with async for
async def process_all_records() -> None:
"""Process records as they are streamed."""
count = 0
async for record in stream_all_records():
# Process record without loading all into memory
print(f"Processing record {record['id']}")
count += 1
if count >= 50: # Limit for demo
break
# Async comprehension
async def collect_ids() -> list[int]:
"""Collect all IDs using async comprehension."""
return [
record["id"]
async for record in stream_all_records()
]
# ❌ BAD: Loading all data into memory before processing
async def process_all_records():
all_records = []
page = 0
while True:
records = await fetch_page(page)
if not records:
break
all_records.extend(records) # Memory grows unbounded
page += 1
for record in all_records:
await process_record(record)
Anti-Patterns
Forgetting await Keyword
# ❌ Avoid: Coroutine not awaited
async def get_data():
return {"data": "value"}
async def process():
result = get_data() # Returns coroutine object, not result!
print(result) # Prints: <coroutine object get_data at 0x...>
# ✅ Fix: Always await coroutines
async def process():
result = await get_data()
print(result) # Prints: {'data': 'value'}
Blocking I/O in Async Functions
# ❌ Avoid: Blocking calls block entire event loop
import time
import requests
async def fetch_data():
time.sleep(5) # Blocks event loop for 5 seconds!
response = requests.get("https://api.example.com") # Blocking!
return response.json()
# ✅ Fix: Use async alternatives
import asyncio
import aiohttp
async def fetch_data():
await asyncio.sleep(5) # Non-blocking
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()
Not Handling Task Cancellation
# ❌ Avoid: Resources leak on cancellation
async def long_running_task():
file = open("data.txt", "w")
await asyncio.sleep(10)
file.write("done")
file.close() # Never reached if task cancelled
# ✅ Fix: Handle CancelledError and clean up
async def long_running_task():
file = open("data.txt", "w")
try:
await asyncio.sleep(10)
file.write("done")
except asyncio.CancelledError:
file.write("cancelled")
raise # Re-raise after cleanup
finally:
file.close()
Creating Tasks Without Storing References
# ❌ Avoid: Task garbage collected before completion
async def background_task():
await asyncio.sleep(1)
print("Task complete")
async def main():
asyncio.create_task(background_task()) # No reference stored
await asyncio.sleep(0.1) # Task may be garbage collected!
# ✅ Fix: Store task references or use TaskGroup
async def main():
task = asyncio.create_task(background_task())
await asyncio.sleep(0.1)
await task # Ensure task completes
# Or use TaskGroup (Python 3.11+)
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(background_task())
# All tasks guaranteed complete here
Testing Strategies
Testing Async Functions with pytest-asyncio
# tests/test_async.py
import asyncio
from typing import Any
import pytest
import pytest_asyncio
from httpx import AsyncClient
from myapp.main import app
from myapp.services import UserService
# Mark module for async tests
pytestmark = pytest.mark.asyncio
@pytest_asyncio.fixture
async def async_client() -> AsyncClient:
"""Provide async test client for FastAPI app."""
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
async def test_create_user_endpoint(async_client: AsyncClient) -> None:
"""Test user creation endpoint."""
response = await async_client.post(
"/users",
json={"name": "John Doe", "email": "john@example.com"}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "John Doe"
assert "id" in data
async def test_concurrent_requests(async_client: AsyncClient) -> None:
"""Test handling multiple concurrent requests."""
tasks = [
async_client.get(f"/users/{i}")
for i in range(1, 11)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in responses if not isinstance(r, Exception)]
assert len(successful) >= 8 # Most should succeed
@pytest.mark.timeout(5) # Fail if test takes longer than 5 seconds
async def test_timeout_handling() -> None:
"""Test that operations respect timeouts."""
async def slow_operation():
await asyncio.sleep(10)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(), timeout=1.0)
Mocking Async Functions
# tests/test_mocking.py
from unittest.mock import AsyncMock, patch
import pytest
@pytest.mark.asyncio
async def test_api_client_with_mock() -> None:
"""Test API client with mocked responses."""
mock_response = {"id": 1, "name": "Test User"}
with patch("aiohttp.ClientSession.get") as mock_get:
# Configure mock to return async context manager
mock_get.return_value.__aenter__.return_value.json = AsyncMock(
return_value=mock_response
)
mock_get.return_value.__aenter__.return_value.status = 200
# Your test here
# result = await client.get_user(1)
# assert result == mock_response
Load Testing Async Endpoints
# tests/test_load.py
import asyncio
import time
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_concurrent_load() -> None:
"""Test system under concurrent load."""
async with AsyncClient(base_url="http://localhost:8000") as client:
start = time.time()
# Simulate 100 concurrent users
tasks = [
client.get("/users/1")
for _ in range(100)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
duration = time.time() - start
successful = [r for r in responses if not isinstance(r, Exception)]
print(f"Duration: {duration:.2f}s")
print(f"Success rate: {len(successful)}/100")
print(f"Requests/sec: {100/duration:.2f}")
assert len(successful) >= 95 # 95% success rate
assert duration < 5.0 # Complete within 5 seconds
References
- asyncio Documentation
- Python async/await Tutorial
- aiohttp Documentation
- FastAPI Async Documentation
- PEP 492 - Coroutines with async/await
- asyncpg PostgreSQL Driver
- pytest-asyncio Plugin
- HTTPX Async Client
Related Skills
- python-best-practices.md - Type hints for async functions
- python-testing.md - Testing async code with pytest-asyncio
- python-performance.md - When to use async vs multiprocessing
- pythonic-patterns.md - Async context managers and generators