Sincronización entre threads: Lock, RLock, Event y Condition

Cuando varios threads comparten estado mutable, el problema no es la concurrencia en sí —es la condición de carrera: dos threads leyendo y escribiendo la misma memoria sin coordinación. Python te da varias primitivas en threading para resolverlo, cada una diseñada para un patrón distinto. Usarlas bien es la diferencia entre código correcto y bugs que solo aparecen en producción bajo carga.

La raíz del problema: por qué necesitas sincronización

El GIL de CPython protege el intérprete, pero no protege tus datos. Una operación como counter += 1 se compila a varias instrucciones bytecode: LOAD, BINARY_ADD, STORE. El scheduler puede interrumpir un thread entre cualquiera de esas instrucciones. El resultado: pérdida silenciosa de incrementos. Eso es exactamente lo que Lock viene a resolver.

Un Lock es la primitiva más simple: un semáforo binario. Solo un thread puede adquirirlo (acquire()) a la vez; cualquier otro que intente hacerlo bloquea hasta que el primero llame release(). Usarlo siempre con el context manager (with lock:) garantiza que el release ocurra aunque haya una excepción.

Un RLock (reentrant lock) es un Lock que recuerda qué thread lo adquirió y permite que ese mismo thread lo adquiera de nuevo sin bloquearse. El lock se libera completamente solo cuando el número de acquire() iguala al de release(). Sin esto, una función que toma el lock y luego llama a otra función que también lo toma produciría un deadlock inmediato.

Un Event es el mecanismo más limpio para señalar entre threads: un flag booleano con semántica de espera. Un thread llama event.wait() y duerme hasta que otro llame event.set(). No hay estado compartido que proteger, solo una señal.

Un Condition combina un lock con notificación. Es el bloque de construcción del patrón producer-consumer: un thread espera hasta que cierta condición sea verdadera (condition.wait()), y otro la notifica cuando cambia el estado (condition.notify()). La clave es que wait() libera el lock internamente mientras espera y lo re-adquiere antes de retornar —eso evita el busy-waiting.

Si no eliges la primitiva correcta, los síntomas típicos son deadlocks (todos los threads bloqueados para siempre) o race conditions (datos corruptos de forma intermitente). Ambos son extremadamente difíciles de reproducir y depurar.

Código principal

import threading
import time
import random
from collections import deque


# ── Lock: contador compartido sin race conditions ──────────────────────
class SafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:          # acquire + release automático
            self._value += 1

    @property
    def value(self):
        with self._lock:
            return self._value


# ── RLock: árbol de nodos con operaciones recursivas ──────────────────
class TreeNode:
    def __init__(self, value):
        self.value = value
        self.children: list["TreeNode"] = []
        self._lock = threading.RLock()  # el mismo thread puede adquirirlo N veces

    def add_child(self, node: "TreeNode"):
        with self._lock:
            self.children.append(node)

    def sum_recursive(self) -> int:
        with self._lock:            # re-entra al lock en cada nivel del árbol
            total = self.value
            for child in self.children:
                total += child.sum_recursive()
        return total


# ── Event: señal de inicio entre threads ─────────────────────────────
def worker_with_event(name: str, start_event: threading.Event):
    print(f"[{name}] esperando señal...")
    start_event.wait()             # bloquea sin consumir CPU
    print(f"[{name}] señal recibida, trabajando")
    time.sleep(random.uniform(0.1, 0.3))
    print(f"[{name}] terminado")


# ── Condition: producer-consumer clásico ─────────────────────────────
BUFFER_SIZE = 3

def producer(queue: deque, cond: threading.Condition, items: int):
    for i in range(items):
        with cond:
            # espera mientras el buffer esté lleno
            while len(queue) >= BUFFER_SIZE:
                cond.wait()
            queue.append(i)
            print(f"[producer] produjo {i}, buffer={list(queue)}")
            cond.notify_all()      # notifica a consumers que hay datos
        time.sleep(0.05)


def consumer(name: str, queue: deque, cond: threading.Condition, items: int):
    consumed = 0
    while consumed < items:
        with cond:
            # espera mientras el buffer esté vacío
            while not queue:
                cond.wait()
            item = queue.popleft()
            consumed += 1
            print(f"[{name}] consumió {item}, buffer={list(queue)}")
            cond.notify_all()      # notifica al producer que hay espacio


def main():
    print("=== SafeCounter ===")
    counter = SafeCounter()
    threads = [threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
               for _ in range(5)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print(f"Esperado: 5000, Obtenido: {counter.value}")

    print("\n=== RLock recursivo ===")
    root = TreeNode(1)
    child1 = TreeNode(2)
    child2 = TreeNode(3)
    child1.add_child(TreeNode(4))
    root.add_child(child1)
    root.add_child(child2)
    print(f"Suma del árbol: {root.sum_recursive()}")  # 1+2+3+4 = 10

    print("\n=== Event ===")
    start = threading.Event()
    workers = [threading.Thread(target=worker_with_event, args=(f"w{i}", start))
               for i in range(3)]
    for w in workers:
        w.start()
    time.sleep(0.2)
    print("[main] disparando evento")
    start.set()                    # desbloquea todos los workers simultáneamente
    for w in workers:
        w.join()

    print("\n=== Condition / Producer-Consumer ===")
    queue: deque = deque()
    cond = threading.Condition()
    total_items = 10
    p = threading.Thread(target=producer, args=(queue, cond, total_items))
    c1 = threading.Thread(target=consumer, args=("c1", queue, cond, total_items // 2))
    c2 = threading.Thread(target=consumer, args=("c2", queue, cond, total_items // 2))
    for t in [p, c1, c2]:
        t.start()
    for t in [p, c1, c2]:
        t.join()


if __name__ == "__main__":
    main()

Desglose del código

SafeCounter muestra el patrón fundamental: el lock vive junto al dato que protege, no en algún lugar global. Ese acoplamiento es intencional —hace imposible acceder a _value sin pasar por el lock si respetas la interfaz pública. La propiedad value también lo toma porque una lectura sin lock sigue siendo una carrera.

TreeNode.sum_recursive ilustra exactamente por qué existe RLock. Cuando sum_recursive baja un nivel en el árbol y llama a sí misma, el thread ya posee el lock del nodo padre. Si fuera un Lock normal, el intento de adquirirlo de nuevo bloquearía indefinidamente. El RLock mantiene un contador interno de adquisiciones (_count) y solo libera realmente el lock cuando ese contador llega a cero.

El patrón Event es ideal cuando necesitas un disparo de pistola: todos los workers esperan, tú llamas set(), y todos arrancan. Fíjate que set() es permanente —cualquier thread que llame wait() después del set() retornará inmediatamente. Si necesitas reusar el evento, llama clear() primero.

El Condition es donde está la complejidad real. El bucle while not queue: cond.wait() no es accidental —es un patrón obligatorio. Cuando wait() regresa, el lock ya está re-adquirido, pero eso no garantiza que la condición siga siendo verdadera: otro thread puede haberse adelantado. Usar if en lugar de while aquí es un bug clásico conocido como spurious wakeup. El notify_all() en lugar de notify() es una elección conservadora que evita el problema de notificar al thread equivocado cuando hay múltiples consumers con condiciones distintas.

Errores que debes conocer

Error: usar Lock en código recursivo o en dos métodos que se llaman entre sí desde el mismo thread.

# ❌ Wrong
lock = threading.Lock()

def outer():
    with lock:
        inner()          # este thread ya tiene el lock; inner() bloqueará para siempre

def inner():
    with lock:           # deadlock: el mismo thread intenta adquirir un Lock ya adquirido
        pass

# ✅ Right
lock = threading.RLock()

def outer():
    with lock:
        inner()          # el RLock permite la re-entrada del mismo thread

def inner():
    with lock:
        pass

Cambia Lock por RLock cuando el mismo thread necesita adquirir el lock más de una vez en la misma pila de llamadas.


Error: adquirir múltiples locks en orden distinto en threads distintos, creando un deadlock circular.

# ❌ Wrong
lock_a = threading.Lock()
lock_b = threading.Lock()

def thread_1():
    with lock_a:
        time.sleep(0.01)
        with lock_b:     # espera lock_b, que thread_2 tiene
            pass

def thread_2():
    with lock_b:
        time.sleep(0.01)
        with lock_a:     # espera lock_a, que thread_1 tiene → deadlock
            pass

# ✅ Right
def thread_1():
    with lock_a:         # ambos threads adquieren en el mismo orden: a → b
        with lock_b:
            pass

def thread_2():
    with lock_a:
        with lock_b:
            pass

El orden consistente de adquisición es la única regla de prevención de deadlocks que funciona sin necesidad de herramientas externas.


Error: usar if en lugar de while al esperar en un Condition, ignorando spurious wakeups y race conditions entre waiters.

# ❌ Wrong
with cond:
    if not queue:        # si dos consumers despiertan, el segundo encontrará queue vacío
        cond.wait()
    item = queue.popleft()  # IndexError o dato incorrecto

# ✅ Right
with cond:
    while not queue:     # re-evalúa la condición cada vez que el thread despierta
        cond.wait()
    item = queue.popleft()

El while convierte el wait en un bucle de re-verificación que tolera tanto wakeups espurios del SO como contención entre múltiples consumers.

138

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio