The asyncio event loop doesn't forgive what the GIL used to hide. These five patterns compile cleanly, pass tests on a single machine, and corrupt data under real load. Here's where they live and what to do instead.
asyncio.create_task() without storing the returned handle, and without awaiting it, means the task runs detached. If it raises an exception, Python logs it to stderr and silently discards it. Your code thinks everything succeeded — the webhook fired, the email sent, the background job ran — but it silently died inside a task nobody is watching.
async def process_order(order_id: int):
asyncio.create_task(send_confirmation_email(order_id))
asyncio.create_task(update_inventory(order_id))
asyncio.create_task(log_analytics(order_id))
return {"status": "processed"}
async def process_order(order_id: int):
# Await all — fail if any fails
await asyncio.gather(
send_confirmation_email(order_id),
update_inventory(order_id),
log_analytics(order_id),
)
return {"status": "processed"}
If you need true fire-and-forget and can't await, add a cancellation handler:
async def process_order(order_id: int):
task = asyncio.create_task(send_confirmation_email(order_id))
task.add_done_callback(
lambda t: print(t.exception()) if t.exception() else None
)
return {"status": "processed"}
The code looks correct. create_task isn't a red flag — it looks like "run this concurrently." Tests don't catch it because the happy path works fine: the email sends, the inventory updates. The failure only shows up when something throws — maybe a transient network issue, maybe a null reference in a rarely-hit code path. Under load, with real failures, exceptions silently disappear and nobody knows. The tests never failed, so it shipped.
CodeSight flags any asyncio.create_task() call where the returned Task object is not stored or awaited. Fire-and-forget tasks with potential exception paths are called out with a severity rating and suggested fix.
Python's GIL protects you from race conditions between threads — but coroutines run on a single thread. Concurrent await points mean multiple coroutines can be writing to the same dict, list, or object at the same time. No GIL to save you here. Data corrupts silently: entries disappear, counts go wrong, state becomes inconsistent with no exception thrown.
results = {}
async def fetch_user(uid):
user = await db.fetchrow(
'SELECT * FROM users WHERE id = $1', uid)
results[uid] = user
async def fetch_batch(uids: list):
# Multiple coroutines write to results
# simultaneously — race condition
await asyncio.gather(
*[fetch_user(uid) for uid in uids]
)
return results
async def fetch_user(uid: int) -> dict:
user = await db.fetchrow(
'SELECT * FROM users WHERE id = $1', uid)
return {uid: user} # Return, don't mutate shared state
async def fetch_batch(uids: list) -> dict:
# Each task returns its result
# No shared mutation
raw_results = await asyncio.gather(
*[fetch_user(uid) for uid in uids]
)
return {k: v for d in raw_results for k, v in d.items()}
The pattern looks normal. Define a dict, populate it inside tasks, return it. Tests run sequentially even with asyncio.gather because the test database is small enough that operations finish before the next starts. Under load, with real I/O variance, one coroutine's dict write gets interleaved with another's. The bug is intermittent, load-dependent, and impossible to reproduce in unit tests. Production sees it when it's too late.
CodeSight detects shared mutable state (module-level dicts/lists/objects) being mutated inside async functions that are passed to gather, create_task, or wait. The AST analysis flags writes to non-local mutable state inside coroutines with concurrency hints.
time.sleep(), requests.get(), and synchronous database drivers block the entire event loop. While one coroutine waits for an I/O-bound operation to complete, no other coroutine can run. A single time.sleep(5) freezes every request being handled by that worker for five seconds. Under load, the event loop stalls and requests queue up behind it.
async def fetch_weather(city: str):
# Blocks ALL coroutines on this worker
# for the full 3 seconds
time.sleep(3)
return requests.get(f"https://api.example/{city}")
async def fetch_weather(city: str):
# Only this coroutine waits
# Others run freely
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://api.example/{city}"
) as resp:
return await resp.json()
The same problem hits with synchronous database libraries inside async functions:
import psycopg2 # synchronous driver
async def get_user(user_id: int):
conn = psycopg2.connect(DATABASE_URL)
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = %s', (user_id,))
return cursor.fetchone()
# Blocks the entire event loop on every call
import asyncpg # async driver
async def get_user(user_id: int):
conn = await asyncpg.connect(DATABASE_URL)
try:
return await conn.fetchrow(
'SELECT * FROM users WHERE id = $1', user_id
)
finally:
await conn.close()
The function is declared async, so it must be async, right? The blocking call is at the top — visible on first glance to anyone who knows to look for it. But most reviewers are reading for logic, not I/O architecture. The function returns the right data, the tests pass, and in a unit test the 3-second delay doesn't break anything. Production with 100 concurrent requests sees every single one blocked for 3 seconds. Throughput collapses.
CodeSight flags known blocking calls inside async functions: time.sleep, requests.get/post, urllib, psycopg2, pymysql, and other synchronous I/O. It also flags the absence of async-native equivalents where they exist (aiohttp, asyncpg, aiomysql, httpx).
async with aiohttp.ClientSession() or asyncpg.connect() opened but never explicitly closed means the connection sits in the pool in a half-open state. Under load, the pool exhausts. New requests queue behind the stalled connections. Eventually the service stops accepting new connections — not a crash, just a slow, total freeze that looks like a performance problem but isn't fixable by scaling.
async def fetch_data(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
# session.close() never called explicitly
# Pool hangs until GC runs (or doesn't)
# Under load: 1000 requests = 1000 open sessions
# = connection pool exhaustion
# Create session once, reuse it
_client_session: aiohttp.ClientSession | None = None
async def get_session():
global _client_session
if _client_session is None:
_client_session = aiohttp.ClientSession()
return _client_session
async def fetch_data(url: str):
session = await get_session()
async with session.get(url) as resp:
return await resp.json()
# session stays open — pooled internally
# call session.close() on app shutdown
Or better — use the lifespan context manager pattern introduced in FastAPI 0.89:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_db():
conn = await asyncpg.connect(DATABASE_URL)
try:
yield conn
finally:
await conn.close()
# Always closes, even on exception
# Usage:
async def handler():
async with get_db() as conn:
user = await conn.fetchrow('SELECT * FROM users WHERE id = $1', uid)
The async with block looks correct — Python will eventually run __aexit__ and close the connection. But under real load, the GC doesn't run fast enough to keep up with the connection creation rate. The session pool fills up, new requests start queuing, and the service looks like it's slow — not like it has a resource leak. In tests, one request per test creates one session, closes it, and the test runs fine. In production, 50 concurrent requests per second means 50 sessions open at once — and each one takes time to close. The pool exhausts before GC can catch up.
CodeSight identifies aiohttp.ClientSession() creation inside request handlers without a lifespan-scoped singleton pattern. It flags asyncpg.connect() without a finally: await conn.close() guard, and flags any context manager that creates a network resource without a clear close pattern.
asyncio.gather(return_exceptions=True) converts every raised exception into a return value. What looks like a "make everything resilient" pattern is actually a silent error suppressor. Errors disappear into the result list — if nobody inspects each element to check if it's an exception, the failure is invisible. The code completes "successfully" with wrong or missing data.
async def fetch_all_products():
results = await asyncio.gather(
fetch_product(1),
fetch_product(2),
fetch_product(3),
return_exceptions=True # Errors become return values
)
# If product 2's API is down,
# results[1] = ConnectionError("Connection refused")
# No exception raised
return results
# Caller sees a list, doesn't know it's partial
async def fetch_all_products():
results = await asyncio.gather(
fetch_product(1),
fetch_product(2),
fetch_product(3),
return_exceptions=True
)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
raise ExceptionGroup("fetch_all_products failed", errors)
return results
Python 3.11 introduced ExceptionGroup specifically for this — so gather errors can be surfaced cleanly with try ... except ExceptionGroup.
async def fetch_all_products():
try:
return await asyncio.gather(
fetch_product(1), fetch_product(2), fetch_product(3),
return_exceptions=True
)
except ExceptionGroup as eg:
for exc in eg.exceptions:
logger.error("Product fetch failed: %s", exc)
raise # Surface to caller, don't silently swallow
The return_exceptions=True flag looks intentional — "we're handling errors gracefully." The code runs without crashing, tests pass, CI is green. Nobody checks the result list element types because the test fixtures all succeed. In production, one dependency fails, the error is silently added to the results array, and downstream code iterates over the results expecting valid product objects — and fails on type errors or missing fields. The original error context is gone. Debugging is painful.
CodeSight flags asyncio.gather with return_exceptions=True where the result is consumed without type-checking each element. It also flags cases where the result is passed directly to another function without inspecting for exception instances first.
Fire-and-forget task detection, blocking call flagging, unclosed context manager alerts, exception-swallowing patterns. Every Python PR in 30 seconds, before it merges.
Install Free on GitHub 5 PRs/month free · No credit card · Uninstall in one click