`queue.Queue`: comunicación segura entre threads

Cuando varios threads necesitan pasarse trabajo, la tentación natural es usar una lista compartida protegida con un Lock. Funciona, pero estás reinventando la rueda con más superficie de error. queue.Queue encapsula exactamente ese patrón —lista + lock interno + condiciones de sincronización— y lo hace con una API diseñada para el caso de uso productor-consumidor.

La distinción clave frente a multiprocessing.Queue es el modelo de memoria: los threads comparten el mismo heap, así que queue.Queue puede pasar objetos Python por referencia sin serialización. No hay pickling, no hay pipes del sistema operativo, todo ocurre dentro del mismo proceso. El lock interno es un threading.Lock estándar que protege las operaciones sobre la deque interna, y dos threading.Condition adicionales gestionan los estados de “llena” y “vacía”.

put() encola un item; si la cola tiene maxsize > 0 y está llena, bloquea hasta que haya espacio. get() desencola; si está vacía, bloquea hasta que llegue algo. Ambos aceptan block=False o un timeout cuando no quieres esperar indefinidamente.

El par task_done() / join() cubre un caso que los locks sueltos no resuelven fácilmente: el productor necesita saber no solo que encoló todos los items, sino que los consumidores los procesaron. join() bloquea al productor hasta que cada item recibido con get() haya sido marcado con task_done(). Si olvidas llamar task_done() después de procesar, join() espera eternamente —ese es el error más frecuente.

Cuando el desorden de una lista compartida + lock empieza a crecer, los deadlocks aparecen porque el lock queda adquirido en un branch que nadie anticipó. Queue elimina esa posibilidad: nunca expongas el lock, deja que la cola lo gestione.

import threading
import queue
import time
import random

NUM_WORKERS = 4
SENTINEL = None  # señal de cierre para cada worker


def producer(q: queue.Queue, items: list[str]) -> None:
    for item in items:
        q.put(item)
        print(f"[producer] encolado: {item}")
    # un sentinel por cada worker para cerrarlos limpiamente
    for _ in range(NUM_WORKERS):
        q.put(SENTINEL)


def worker(worker_id: int, q: queue.Queue) -> None:
    while True:
        item = q.get()  # bloquea si la cola está vacía
        if item is SENTINEL:
            q.task_done()  # sentinel también cuenta como tarea
            break
        try:
            # simula trabajo variable; en producción aquí va la lógica real
            time.sleep(random.uniform(0.05, 0.2))
            print(f"[worker-{worker_id}] procesado: {item}")
        finally:
            # task_done() SIEMPRE debe llamarse, incluso si falla el procesado
            q.task_done()


def main() -> None:
    work_items = [f"tarea-{i}" for i in range(12)]
    q: queue.Queue[str | None] = queue.Queue(maxsize=6)  # backpressure al productor

    workers = [
        threading.Thread(target=worker, args=(i, q), daemon=True)
        for i in range(NUM_WORKERS)
    ]
    for t in workers:
        t.start()

    # el productor corre en el thread principal
    producer(q, work_items)

    # bloquea hasta que task_done() haya sido llamado para cada put()
    q.join()
    print("Todas las tareas procesadas.")


if __name__ == "__main__":
    main()

Lo que hace cada decisión

maxsize=6 introduce backpressure: si los workers van lentos y hay seis items pendientes, put() bloquea al productor. Esto evita que la cola crezca sin límite cuando el trabajo llega más rápido de lo que se procesa —sin maxsize, con un productor rápido y workers lentos, acabarías consumiendo toda la memoria disponible antes de que nadie se diera cuenta.

Los sentinels (None por worker) son el patrón idiomático para cerrar workers de forma limpia sin variables de estado compartidas. Cada worker consume exactamente un sentinel y sale. El número de sentinels debe coincidir con el número de workers; si hay menos, algunos threads quedan bloqueados en get() para siempre. Si hay más, los workers extras salen antes de que les llegue trabajo real.

El try/finally alrededor del procesado garantiza que task_done() se llame incluso si la tarea lanza una excepción. Sin el finally, una tarea que falla deja el contador interno desfasado y join() nunca retorna. Es el equivalente a lock.release() dentro de un finally —la misma disciplina, encapsulada.

daemon=True en los threads es una decisión de ciclo de vida: si el programa principal termina antes de que los workers acaben (por ejemplo, si join() se interrumpe), los threads daemon mueren con el proceso en lugar de bloquearlo. Para workers de fondo que no gestionan recursos críticos, es el comportamiento correcto.

q.join() en el thread principal espera a que el contador interno llegue a cero. Ese contador sube con cada put() y baja con cada task_done(). No tiene nada que ver con si los threads siguen vivos; es puramente contable. Por eso los sentinels también necesitan su task_done(): fueron encolados con put(), así que suman al contador aunque su “procesado” sea simplemente salir del loop.

Errores que debes conocer

Error: olvidar task_done() cuando el procesado falla, haciendo que join() bloquee indefinidamente.

# ❌ Wrong
def worker_bad(q: queue.Queue) -> None:
    while True:
        item = q.get()
        if item is None:
            break
        process(item)  # si lanza, task_done() nunca se llama
        q.task_done()

# ✅ Right
def worker_good(q: queue.Queue) -> None:
    while True:
        item = q.get()
        if item is None:
            q.task_done()
            break
        try:
            process(item)
        finally:
            q.task_done()

task_done() dentro del finally garantiza que el contador baje siempre, independientemente de si process() tuvo éxito o no.


Error: usar queue.Queue con get(block=False) en un busy-loop, quemando CPU en lugar de dejar que el scheduler bloquee el thread.

# ❌ Wrong
while True:
    try:
        item = q.get(block=False)
        process(item)
    except queue.Empty:
        continue  # 100% CPU esperando trabajo

# ✅ Right
while True:
    item = q.get()  # bloquea y cede el GIL mientras espera
    process(item)
    q.task_done()

get() bloqueante libera el GIL mientras espera, permitiendo que otros threads trabajen; el busy-loop retiene el GIL y degrada el rendimiento de toda la aplicación.


Error: usar LifoQueue (o PriorityQueue) sin entender que cambian el orden de consumo y pueden provocar starvation si los items más antiguos nunca alcanzan suficiente prioridad.

# ❌ Wrong — items de baja prioridad nunca se procesan si el productor
# sigue añadiendo items de alta prioridad
q: queue.PriorityQueue = queue.PriorityQueue()
q.put((1, "urgente"))
q.put((10, "rutina"))
q.put((1, "urgente2"))  # "rutina" queda enterrada indefinidamente

# ✅ Right — incluir un número de secuencia como desempate garantiza progreso
import itertools
counter = itertools.count()
q.put((1, next(counter), "urgente"))
q.put((10, next(counter), "rutina"))
q.put((1, next(counter), "urgente2"))  # "rutina" procesada en orden FIFO dentro de su prioridad

La tupla de tres elementos garantiza que, dentro de la misma prioridad, el orden de llegada se respeta, evitando que items de prioridad baja esperen para siempre.

139

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio