Cuando escribes una función async def y metes dentro una llamada a time.sleep(1), no estás pausando esa coroutine mientras las demás siguen trabajando. Estás bloqueando el thread completo del event loop durante un segundo entero. Ninguna otra coroutine avanza. Ningún callback se ejecuta. El event loop queda congelado.
Esto sucede porque asyncio corre por defecto en un único thread. El event loop es un bucle cooperativo: avanza cuando las coroutines ceden el control voluntariamente con await. Una llamada síncrona bloqueante nunca cede ese control. El thread se queda pegado en esa llamada y el event loop no puede hacer absolutamente nada hasta que termina.
time.sleep es el ejemplo canónico, pero el problema es idéntico con requests.get(), operaciones de fichero con el módulo open estándar, consultas a bases de datos con drivers síncronos, o cualquier cómputo intensivo de CPU. Si la llamada no tiene await delante, no coopera.
El momento en que esto importa es exactamente cuando más lo necesitas: un servidor web manejando cientos de conexiones concurrentes, un scraper que lanza decenas de peticiones en paralelo, o cualquier sistema donde la promesa de asyncio es gestionar I/O sin bloquear. Un solo requests.get escondido en tu stack destruye esa promesa.
La solución correcta tiene dos caminos. Si hay una librería con soporte nativo async (httpx, aiofiles, asyncpg), úsala y listo. Si tienes código bloqueante que no puedes reemplazar —una librería legacy, código de terceros, o cómputo CPU-bound— usa asyncio.to_thread() para ejecutarlo en un thread pool separado, dejando el event loop libre.
import asyncio
import time
import httpx
from concurrent.futures import ThreadPoolExecutor
import requests # librería síncrona, para demostrar el problema
# ── Simulación de trabajo bloqueante de terceros ──────────────────────────────
def fetch_sync(url: str) -> str:
"""Función síncrona bloqueante que no podemos modificar."""
response = requests.get(url, timeout=5)
return response.text[:80]
def cpu_heavy(n: int) -> int:
"""Cómputo que bloquea el thread."""
total = 0
for i in range(n):
total += i * i
return total
# ── Versión incorrecta: bloquea el event loop ─────────────────────────────────
async def wrong_approach():
print("[wrong] inicio")
time.sleep(2) # ❌ bloquea el event loop completo durante 2 s
result = requests.get("https://httpbin.org/get", timeout=5) # ❌ igual
print("[wrong] fin")
# ── Versión correcta: libera el event loop ────────────────────────────────────
async def fetch_async(url: str) -> str:
"""I/O de red con librería async nativa."""
async with httpx.AsyncClient() as client:
response = await client.get(url) # cede el control mientras espera
return response.text[:80]
async def offload_blocking(url: str) -> str:
"""Código bloqueante que no podemos tocar, ejecutado en thread pool."""
# asyncio.to_thread lanza fetch_sync en un ThreadPoolExecutor interno
# y devuelve una coroutine que podemos awaitar normalmente.
result = await asyncio.to_thread(fetch_sync, url)
return result
async def offload_cpu(n: int) -> int:
"""Cómputo CPU-bound fuera del thread del event loop."""
return await asyncio.to_thread(cpu_heavy, n)
# ── run_in_executor: la API de nivel más bajo ─────────────────────────────────
async def offload_with_executor(url: str) -> str:
"""
Equivalente a asyncio.to_thread pero con control explícito del executor.
Útil cuando necesitas configurar el pool (tamaño máximo, ThreadPoolExecutor
personalizado, o incluso ProcessPoolExecutor para paralelismo real de CPU).
"""
loop = asyncio.get_running_loop()
# None → usa el executor por defecto del loop (ThreadPoolExecutor interno)
result = await loop.run_in_executor(None, fetch_sync, url)
return result
async def with_custom_executor(n: int) -> int:
loop = asyncio.get_running_loop()
# Pool dedicado con límite de threads explícito
with ThreadPoolExecutor(max_workers=4) as pool:
result = await loop.run_in_executor(pool, cpu_heavy, n)
return result
# ── Demo: la diferencia en concurrencia real ──────────────────────────────────
async def ticker():
"""Coroutine que imprime cada 0.5 s; si el event loop se bloquea, enmudece."""
for i in range(6):
print(f" tick {i}")
await asyncio.sleep(0.5)
async def main():
url = "https://httpbin.org/delay/1" # endpoint que tarda ~1 s en responder
print("── Demo con asyncio.to_thread ──")
# ticker y offload_blocking corren de verdad en paralelo:
# ticker sigue imprimiendo mientras fetch_sync espera en su thread.
results = await asyncio.gather(
ticker(),
offload_blocking(url),
offload_blocking(url), # dos peticiones síncronas en paralelo
)
print(f"Resultado: {results[1][:40]}...")
print("\n── Demo con httpx (nativo async) ──")
results = await asyncio.gather(
ticker(),
fetch_async(url),
fetch_async(url),
)
print(f"Resultado: {results[1][:40]}...")
if __name__ == "__main__":
asyncio.run(main())
Qué está pasando realmente aquí
La demo con ticker() es la prueba de fuego. Si lanzas ticker y una operación bloqueante juntos con asyncio.gather, los ticks deberían aparecer cada medio segundo sin importar lo que haga la otra tarea. Si el event loop se bloquea, el ticker enmudece mientras dura el bloqueo.
asyncio.to_thread(fetch_sync, url) no hace magia: toma la función síncrona y la envía al ThreadPoolExecutor interno que el event loop mantiene. Mientras ese thread ejecuta requests.get, el event loop sigue corriendo en su propio thread y puede atender al ticker, resolver otras coroutines, etc. El await delante simplemente dice “cuando ese thread termine, devuélveme el resultado”.
loop.run_in_executor(None, fn, args) es exactamente lo mismo bajo el capó, pero más verboso y con una ventaja: puedes pasar un Executor propio. Esto importa cuando tienes cómputo CPU-bound real y quieres un ProcessPoolExecutor —porque los threads en Python siguen sujetos al GIL y no paralelizan CPU de verdad. Para I/O bloqueante, los threads son suficientes porque el GIL se libera durante la syscall.
La diferencia entre asyncio.to_thread y run_in_executor es solo azúcar sintáctica y nivel de abstracción. asyncio.to_thread está disponible desde Python 3.9 y es la forma idiomática moderna. run_in_executor existe desde antes y te da control fino sobre el pool.
La regla que no tiene excepciones: dentro de código async, toda operación que pueda tardar debe o tener await con una librería async nativa, o estar en un thread/proceso separado. No hay término medio.
Errores que debes conocer
Error: usar time.sleep en lugar de asyncio.sleep por hábito, pensando que en contexto async se comporta de forma cooperativa.
# ❌ Wrong
async def wait_a_bit():
time.sleep(1) # bloquea el thread; el event loop muere durante 1 s
# ✅ Right
async def wait_a_bit():
await asyncio.sleep(1) # cede el control; el event loop sigue vivo
asyncio.sleep es la versión cooperativa: registra un callback para cuando expire el timer y devuelve el control al event loop inmediatamente.
Error: pasar un método de instancia a asyncio.to_thread con lambdas o argumentos mal formados, perdiendo el contexto de self.
# ❌ Wrong await asyncio.to_thread(lambda: self.blocking_method(arg)) # las lambdas funcionan, pero ocultan errores y no pasan bien las excepciones # ✅ Right await asyncio.to_thread(self.blocking_method, arg) # to_thread acepta callable + args por separado, igual que ThreadPoolExecutor.submit
Pasar el callable y sus argumentos por separado permite que asyncio.to_thread propague correctamente las excepciones y haga el wrapping interno de forma limpia.
Error: usar asyncio.to_thread para cómputo CPU-bound intensivo y esperar paralelismo real, sin recordar el GIL.
# ❌ Wrong — para CPU pura, los threads no ayudan por el GIL
result = await asyncio.to_thread(matrix_multiplication, big_matrix)
# ✅ Right — ProcessPoolExecutor sí escapa el GIL
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, matrix_multiplication, big_matrix)
Para I/O bloqueante los threads son la solución correcta; para CPU pura necesitas procesos separados que tengan su propio intérprete y su propio GIL.
N° 152