Cuando trabajas con recursos asincrónicos —conexiones de base de datos, sockets, sesiones HTTP— necesitas garantizar que el ciclo de vida del recurso respete el event loop. El problema con los context managers síncronos (__enter__ / __exit__) es que no pueden usar await internamente, así que si abrir o cerrar un recurso requiere I/O, estás bloqueando el loop. Ahí entran __aenter__ y __aexit__.
Un async context manager es cualquier objeto que implementa __aenter__ y __aexit__ como corrutinas. Cuando escribes async with conn:, Python hace await conn.__aenter__() al entrar y await conn.__aexit__(...) al salir —con o sin excepción. Eso significa que adquirir un lock de base de datos, establecer un handshake TLS, o liberar una conexión al pool puede ser I/O real, no llamadas síncronas que bloqueen.
La contraparte es el async iterator: cualquier objeto con __aiter__ (que devuelve el propio iterador, igual que en el protocolo síncrono) y __anext__ como corrutina. Con async for item in source: Python ejecuta await source.__anext__() en cada iteración, deteniendo el bucle cuando se lanza StopAsyncIteration. Esto es lo que hace posible procesar un stream de red sin cargar todo en memoria: cada chunk llega cuando el servidor lo envía, y el loop sigue libre mientras tanto.
El punto crítico de diseño: si usas un context manager síncrono para envolver un recurso asíncrono, el __exit__ no puede hacer await conn.close(). Tienes que llamar conn.close() sin await (si existe esa variante) o ignorar el error de cierre silenciosamente. Eso es un resource leak esperando a ocurrir.
Para no tener que escribir una clase entera cada vez, contextlib.asynccontextmanager convierte una función generadora asíncrona en un context manager. La función hace yield exactamente una vez: lo que hay antes es el __aenter__, lo que hay después es el __aexit__. Los async generators —async def con yield— pueden usar await en su cuerpo, lo que los hace perfectos para producir datos que requieren I/O entre cada elemento.
import asyncio
import contextlib
from typing import AsyncIterator, AsyncGenerator
import aiohttp # pip install aiohttp
# ── 1. Async context manager manual ─────────────────────────────────────
class DatabaseConnection:
def __init__(self, dsn: str) -> None:
self.dsn = dsn
self._conn = None
async def __aenter__(self) -> "DatabaseConnection":
# Simula una conexión que requiere I/O real
await asyncio.sleep(0.01)
self._conn = {"dsn": self.dsn, "open": True}
print(f"[DB] Conexión abierta: {self.dsn}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
await asyncio.sleep(0.005) # flush / handshake de cierre
if self._conn:
self._conn["open"] = False
print(f"[DB] Conexión cerrada (exc={exc_type})")
return False # no suprime excepciones
async def fetch(self, query: str) -> list[dict]:
await asyncio.sleep(0.02) # simula round-trip a la BD
return [{"id": 1, "query": query}]
# ── 2. Async context manager con @asynccontextmanager ────────────────────
@contextlib.asynccontextmanager
async def managed_connection(dsn: str) -> AsyncIterator[DatabaseConnection]:
conn = DatabaseConnection(dsn)
await conn.__aenter__()
try:
yield conn
except Exception:
# Aquí podrías registrar la excepción antes de propagar
raise
finally:
# finally garantiza el cierre aunque haya excepción
await conn.__aexit__(None, None, None)
# ── 3. Async iterator manual ─────────────────────────────────────────────
class PaginatedQuery:
"""Itera páginas de resultados haciendo una petición por página."""
def __init__(self, base_url: str, pages: int) -> None:
self.base_url = base_url
self.pages = pages
self._page = 0
def __aiter__(self) -> "PaginatedQuery":
return self
async def __anext__(self) -> dict:
if self._page >= self.pages:
raise StopAsyncIteration
await asyncio.sleep(0.01) # simula latencia de red
result = {"page": self._page, "data": f"row_{self._page}"}
self._page += 1
return result
# ── 4. Async generator (forma idiomática del punto 3) ────────────────────
async def paginated_rows(base_url: str, pages: int) -> AsyncGenerator[dict, None]:
for page in range(pages):
await asyncio.sleep(0.01)
yield {"page": page, "data": f"row_{page}"}
# ── 5. Streaming HTTP con async for ──────────────────────────────────────
async def download_large_file(url: str, dest: str) -> int:
total = 0
# aiohttp.ClientSession es en sí un async context manager
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
with open(dest, "wb") as f:
# response.content es un AsyncStreamReader (async iterator)
async for chunk in response.content.iter_chunked(64 * 1024):
f.write(chunk)
total += len(chunk)
return total
# ── Demo principal ────────────────────────────────────────────────────────
async def main() -> None:
# Context manager manual
async with DatabaseConnection("postgresql://localhost/mydb") as db:
rows = await db.fetch("SELECT * FROM users")
print(f"Resultado: {rows}")
print()
# Context manager con decorator
async with managed_connection("postgresql://localhost/mydb") as db:
rows = await db.fetch("SELECT count(*) FROM orders")
print(f"Resultado: {rows}")
print()
# Async iterator manual
print("Async iterator manual:")
async for row in PaginatedQuery("https://api.example.com", pages=3):
print(f" {row}")
print()
# Async generator
print("Async generator:")
async for row in paginated_rows("https://api.example.com", pages=3):
print(f" {row}")
if __name__ == "__main__":
asyncio.run(main())
Recorrido por las decisiones de diseño
El context manager manual (DatabaseConnection) muestra la anatomía completa: __aenter__ devuelve self para que el bloque as db: tenga acceso al objeto, y __aexit__ recibe la tripleta de excepción estándar. El return False es intencional —retornar True suprimiría cualquier excepción que ocurra dentro del bloque, algo que rara vez quieres hacer con conexiones.
@asynccontextmanager es el atajo para cuando no necesitas una clase entera. La función es un generador asíncrono que hace yield exactamente una vez. El try/finally en el bloque del decorator es obligatorio: si omites el finally y ocurre una excepción antes del yield, la conexión nunca se cierra. El decorator transforma ese generador en un objeto que implementa __aenter__ / __aexit__ por ti.
PaginatedQuery como async iterator explícito ilustra que __aiter__ simplemente devuelve self —igual que en el protocolo síncrono— y que la señal de parada es StopAsyncIteration, no StopIteration. Si lanzaras StopIteration dentro de una corrutina, Python la envuelve en RuntimeError desde 3.7 precisamente para evitar confusiones con el protocolo síncrono.
El async generator en paginated_rows hace exactamente lo mismo con un tercio del código. Cada yield suspende la corrutina y entrega el valor al consumidor del async for. El await asyncio.sleep antes del yield es el punto clave: mientras esperamos la respuesta de red, el event loop puede procesar otras corrutinas. Sin async for, tendrías que cargar todas las páginas en memoria antes de procesar la primera.
El ejemplo de streaming HTTP con aiohttp muestra por qué esto importa en producción. response.content.iter_chunked(64 * 1024) es un async iterator que produce chunks de 64 KB conforme llegan del socket. Descargando un archivo de 2 GB, la huella de memoria es siempre ~64 KB, no 2 GB. aiohttp.ClientSession y session.get(url) son ambos async context managers: el primero gestiona el pool de conexiones TCP, el segundo el ciclo de vida de la petición individual.
Errores que debes conocer
Error: usar contextlib.contextmanager (síncrono) para envolver código async, lo que fuerza a llamar operaciones con asyncio.run() dentro del __exit__ o simplemente ignorarlas.
import contextlib
import asyncio
# ❌ Wrong
@contextlib.contextmanager
def bad_db_ctx(dsn):
conn = SomeAsyncDB(dsn)
asyncio.run(conn.connect()) # crea un loop anidado → RuntimeError en 3.10+
try:
yield conn
finally:
asyncio.run(conn.close()) # puede no ejecutarse si el loop ya cerró
# ✅ Right
@contextlib.asynccontextmanager
async def good_db_ctx(dsn):
conn = SomeAsyncDB(dsn)
await conn.connect()
try:
yield conn
finally:
await conn.close()
La versión correcta usa asynccontextmanager y await directamente, sin crear loops anidados ni condiciones de carrera en el cierre.
Error: consumir un async generator con un for síncrono en lugar de async for, silenciando el problema porque el generador simplemente no se ejecuta.
async def fetch_rows():
for i in range(5):
await asyncio.sleep(0)
yield i
async def process():
# ❌ Wrong — itera sobre el objeto generador, no sobre sus valores
for row in fetch_rows(): # TypeError: 'async_generator' object is not iterable
print(row)
# ✅ Right
async for row in fetch_rows():
print(row)
fetch_rows() devuelve un async_generator, que implementa __aiter__ / __anext__ pero no __iter__ / __next__; intentar iterarlo síncronamente lanza TypeError en tiempo de ejecución.
Error: no manejar StopAsyncIteration en __anext__ correctamente cuando hay lógica de reintentos, lo que puede resultar en un iterador que nunca termina o que salta el último elemento.
class BrokenIter:
def __init__(self): self.n = 0
def __aiter__(self): return self
async def __anext__(self):
# ❌ Wrong — atrapa StopAsyncIteration y continúa indefinidamente
try:
if self.n >= 3:
raise StopAsyncIteration
self.n += 1
return self.n
except Exception: # demasiado amplio: también atrapa StopAsyncIteration
return -1 # loop infinito
class FixedIter:
def __init__(self): self.n = 0
def __aiter__(self): return self
async def __anext__(self):
# ✅ Right — StopAsyncIteration no debe quedar atrapada por handlers genéricos
if self.n >= 3:
raise StopAsyncIteration
self.n += 1
return self.n
StopAsyncIteration hereda de Exception en Python, así que un except Exception la captura —el async for nunca la ve y el bucle no termina.
N° 151