Python asyncio cover showing structured concurrency theme
Python

Python Asyncio Patterns: From Callbacks to Structured Concurrency

Learn how to write high-performance async Python code using asyncio — covering coroutines, task groups, async context managers, and real-world HTTP client patterns.

By aigrama
#python#asyncio#concurrency#performance

Python’s asyncio library enables cooperative multitasking — a single thread efficiently handles thousands of concurrent I/O-bound operations. This guide progresses from the basics to production patterns.

The Event Loop Model

Unlike threading (OS-managed), asyncio uses a single-threaded event loop that switches tasks only at await points. No GIL contention, minimal context-switch overhead.

import asyncio

async def greet(name: str, delay: float) -> str:
    await asyncio.sleep(delay)          # Yields control; event loop runs other tasks
    return f"Hello, {name}!"

async def main() -> None:
    # Run both concurrently — total time ~1s, not 2s
    results = await asyncio.gather(
        greet("Alice", 1.0),
        greet("Bob", 0.8),
    )
    print(results)  # ['Hello, Alice!', 'Hello, Bob!']

asyncio.run(main())

Task Groups (Python 3.11+)

TaskGroup is the modern, structured-concurrency replacement for gather. It cancels sibling tasks automatically if one raises an exception.

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url, timeout=10.0)
    response.raise_for_status()
    return response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    results: list[dict] = []

    async with httpx.AsyncClient() as client:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(fetch(client, url)) for url in urls]

    # All tasks finished (or one raised, cancelling siblings)
    return [t.result() for t in tasks]

Async Context Managers

Any class implementing __aenter__ / __aexit__ works as an async context manager — perfect for managing async resources like database connections.

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_connection(dsn: str):
    conn = await create_db_connection(dsn)   # hypothetical async connect
    try:
        yield conn
    finally:
        await conn.close()

async def main():
    async with managed_connection("postgresql://localhost/mydb") as conn:
        rows = await conn.fetch("SELECT id, name FROM users LIMIT 10")
        for row in rows:
            print(row["id"], row["name"])

Producer / Consumer with asyncio.Queue

Queues decouple producers from consumers, enabling backpressure control.

import asyncio
import random

async def producer(queue: asyncio.Queue[int], n: int) -> None:
    for i in range(n):
        item = random.randint(1, 100)
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.1)
    # Signal completion with sentinel
    await queue.put(-1)

async def consumer(queue: asyncio.Queue[int]) -> None:
    total = 0
    while True:
        item = await queue.get()
        if item == -1:
            break
        total += item
        queue.task_done()
    print(f"Consumer total: {total}")

async def main() -> None:
    queue: asyncio.Queue[int] = asyncio.Queue(maxsize=5)  # Backpressure at 5 items
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, 10))
        tg.create_task(consumer(queue))

asyncio.run(main())

Common Pitfalls

# ✗ WRONG — blocks the entire event loop
import time
async def bad():
    time.sleep(2)       # Blocking call inside async function!

# ✓ CORRECT — run blocking calls in a thread pool
async def good():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, time.sleep, 2)

# ✗ WRONG — fire-and-forget loses exceptions silently
async def fire_and_forget():
    asyncio.create_task(risky_coro())  # Exception disappears into the void

# ✓ CORRECT — always await tasks or attach exception handlers
async def safe():
    task = asyncio.create_task(risky_coro())
    task.add_done_callback(lambda t: t.exception() and print(f"Error: {t.exception()}"))

Performance Comparison

import asyncio
import time

async def async_benchmark(n: int) -> float:
    start = time.perf_counter()
    await asyncio.gather(*[asyncio.sleep(0.01) for _ in range(n)])
    return time.perf_counter() - start

# 1000 concurrent "tasks" (each 10ms) completes in ~10ms total
# Sequential would take 10 seconds
print(asyncio.run(async_benchmark(1000)))  # ~0.010s

Asyncio shines for I/O-bound workloads. For CPU-bound work, use multiprocessing or ProcessPoolExecutor instead.