Manejar threads a mano en Python es perfectamente posible, pero implica crear cada Thread, hacer join() de cada uno, propagar excepciones manualmente y gestionar el ciclo de vida del pool tú mismo. ThreadPoolExecutor del módulo concurrent.futures encapsula todo eso en una API que separa claramente dos responsabilidades: qué quieres ejecutar y cómo se gestionan los recursos para ejecutarlo.
El executor mantiene un pool de threads reutilizables. Cuando le envías trabajo, asigna ese trabajo al primer thread libre del pool en lugar de crear y destruir un thread por cada tarea — operación costosa tanto en tiempo como en memoria. El parámetro max_workers controla el tamaño del pool; si no lo especificas, Python usa un valor por defecto basado en los CPUs disponibles multiplicado por 5 (pensado para I/O-bound work).
Cada llamada a submit() retorna inmediatamente un objeto Future. Ese future es un handle al resultado eventual de la tarea: puede estar pendiente, completado con valor, o completado con excepción. No hay resultado todavía cuando submit() retorna — solo una promesa de que habrá uno. Cuando llamas future.result(), ahí sí bloqueas hasta que el thread termine. Si el thread lanzó una excepción, result() la re-lanza en el thread llamante, lo que significa que no pierdes errores silenciosamente como suele pasar con threads manuales.
Usarlo como context manager con with es el patrón idiomático: al salir del bloque with, el executor llama a shutdown(wait=True) internamente, que espera a que todos los futures pendientes terminen antes de continuar. Nada de join() dispersos por el código.
¿Cuándo preferirlo sobre threads manuales? Casi siempre que el trabajo sea I/O-bound (requests HTTP, lecturas de disco, consultas a base de datos) y quieras un nivel de concurrencia fijo. Los threads manuales tienen sentido cuando necesitas control muy fino sobre el ciclo de vida del thread o comunicación entre threads via Event o Queue de manera compleja. Para el 80% de los casos de paralelismo I/O, el executor es la herramienta correcta.
Lo que rompe si lo usas mal: llamar future.result() dentro del mismo thread que envió el trabajo, dentro del context manager, sin entender el orden, puede crear deadlocks si el número de tareas excede max_workers y cada tarea espera a otra tarea pendiente. También, executor.map() consume el iterable completo de inmediato — si es enorme, carga todo en memoria antes de empezar.
import time
import concurrent.futures
from typing import NamedTuple
# Simula una llamada a una API externa con latencia variable
def fetch_user_data(user_id: int) -> dict:
time.sleep(0.1 * (user_id % 3 + 1)) # latencia simulada: 0.1s, 0.2s o 0.3s
if user_id == 7:
raise ValueError(f"User {user_id} no existe en el sistema")
return {"id": user_id, "name": f"User_{user_id}", "score": user_id * 10}
class FetchResult(NamedTuple):
user_id: int
data: dict | None
error: str | None
def fetch_all_users(user_ids: list[int], max_workers: int = 5) -> list[FetchResult]:
results: list[FetchResult] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# submit() retorna inmediatamente; guardamos el future junto al id
# para poder asociar resultado con la solicitud original
future_to_id = {
executor.submit(fetch_user_data, uid): uid
for uid in user_ids
}
# as_completed() itera sobre futures en el orden en que terminan,
# no en el orden en que fueron enviados — útil para ver progreso
for future in concurrent.futures.as_completed(future_to_id):
uid = future_to_id[future]
try:
data = future.result() # bloquea hasta tener el valor (ya listo aquí)
results.append(FetchResult(user_id=uid, data=data, error=None))
except ValueError as exc:
# La excepción del thread se re-lanza aquí, en el thread principal
results.append(FetchResult(user_id=uid, data=None, error=str(exc)))
# Al salir del `with`, todos los futures ya terminaron
return results
def fetch_scores_only(user_ids: list[int]) -> list[int]:
"""
Cuando solo necesitas aplicar una función a una colección y recoger
los resultados, map() es más conciso que submit() + as_completed().
map() preserva el orden del iterable original.
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# map() lanza todas las tareas en paralelo y retorna un iterador
# que bloquea elemento a elemento en orden de entrada, no de compleción
scores = list(executor.map(
lambda uid: fetch_user_data(uid)["score"],
user_ids,
))
return scores
if __name__ == "__main__":
ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
start = time.perf_counter()
all_results = fetch_all_users(ids)
elapsed = time.perf_counter() - start
print(f"Fetched {len(ids)} users en {elapsed:.2f}s\n")
for r in sorted(all_results, key=lambda x: x.user_id):
if r.error:
print(f" [ERROR] user {r.user_id}: {r.error}")
else:
print(f" [OK] user {r.user_id}: score={r.data['score']}")
print("\n--- Solo scores via map() ---")
safe_ids = [uid for uid in ids if uid != 7]
print(fetch_scores_only(safe_ids))
Lo que hace cada decisión
El diccionario future_to_id es el patrón estándar para asociar futures con metadatos. submit() retorna un objeto Future hashable, así que usarlo como clave de dict es idiomático y evita tener que mantener listas paralelas.
as_completed() merece atención especial: itera sobre los futures en orden de finalización, no de envío. Eso significa que si el usuario 5 termina antes que el usuario 1, lo procesas antes. Es útil cuando quieres mostrar progreso o procesar resultados cuanto antes. Contrasta con executor.map(), que preserva el orden del iterable de entrada aunque internamente ejecute todo en paralelo — pagas por esa garantía de orden en la iteración.
El bloque try/except alrededor de future.result() es fundamental. Sin él, una excepción en cualquier thread se pierde silenciosamente si nadie llama a result(). El executor no imprime nada, no detiene el programa, simplemente guarda la excepción en el future esperando que alguien la recoja.
La función fetch_scores_only muestra cuándo map() es preferible a submit() + as_completed(): cuando la operación es homogénea, el orden importa y no necesitas manejar errores individualmente. Si cualquier tarea lanza una excepción, map() la propaga cuando el iterador llega a ese elemento — no hay forma de capturar errores parciales sin iterar con try/except dentro del list().
Errores que debes conocer
Error: usar executor.map() cuando algunas tareas pueden fallar y quieres resultados parciales — al primer error, la iteración se interrumpe y pierdes los resultados siguientes.
# ❌ Wrong
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(fetch_user_data, [1, 7, 3]))
# lanza ValueError en el item 7 y nunca ves el resultado de 3
# ✅ Right
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(fetch_user_data, uid): uid for uid in [1, 7, 3]}
for future in concurrent.futures.as_completed(futures):
try:
data = future.result()
except ValueError as exc:
print(f"Error: {exc}") # continúa con los demás
Cambiar a submit() + as_completed() con try/except por future permite procesar todos los resultados independientemente de los fallos individuales.
Error: llamar future.result(timeout=None) en el thread principal antes de que el executor tenga threads libres para procesar las tareas pendientes, creando un deadlock.
# ❌ Wrong — si max_workers=1 y submit() llena el único thread,
# result() bloquea el main thread que nunca liberará el executor
with ThreadPoolExecutor(max_workers=1) as executor:
f1 = executor.submit(time.sleep, 5)
f2 = executor.submit(time.sleep, 5)
f1.result() # bloquea; f2 nunca empieza hasta que f1 termine (no hay deadlock aquí
# per se, pero sí serialización total — pierdes la concurrencia)
# ✅ Right — recolecta todos los futures primero, luego espera resultados
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(time.sleep, 5) for _ in range(2)]
# Al salir del with, ambos ya terminaron; o bien itera sobre futures aquí
Dimensionar max_workers correctamente según el número de tareas concurrentes esperadas evita tanto la serialización accidental como el desperdicio de recursos.
N° 140