Cuando separas trabajo en procesos distintos, el siguiente problema es inmediato: ¿cómo se hablan? Cada proceso tiene su propio espacio de direcciones —el kernel garantiza eso—, así que no puedes compartir un objeto Python como si nada. Necesitas un canal explícito, y la elección del canal tiene consecuencias de rendimiento que no son triviales.
multiprocessing.Queue no es queue.Queue con otro nombre. La versión de threading vive en el heap del proceso y usa un Lock interno. La versión de multiprocessing construye sobre un Pipe (que a su vez usa sockets o pipes del OS dependiendo de la plataforma) y arranca un hilo feeder interno que serializa cada ítem con pickle antes de escribirlo en el descriptor de fichero. Ese hilo existe dentro del proceso productor; el consumidor hace pickle.loads en el otro lado. El costo de pickle es inevitable y acumulativo: objetos grandes o frecuentes lo convierten en el cuello de botella.
multiprocessing.Pipe() es más bajo en la pila: te da dos Connection objects conectados directamente, sin hilo feeder. La serialización sigue siendo pickle, pero el overhead de sincronización es menor porque es comunicación punto a punto sin cola intermedia. El problema es exactamente ese: punto a punto. Dos extremos, no más.
multiprocessing.shared_memory (disponible desde 3.8) toma una ruta completamente diferente: pide al kernel un segmento de memoria compartida al que varios procesos mapean con mmap. No hay serialización porque no hay copia —todos los procesos leen y escriben en las mismas páginas físicas. Eso es transformador para arrays numéricos grandes, pero exige que gestiones la sincronización tú mismo y que los datos sean contiguos (numpy arrays, ctypes arrays). Para tipos simples, multiprocessing.Value y multiprocessing.Array envuelven esa memoria con un Lock ya integrado.
El error de diseño más común es elegir Queue para mover arrays de varios megabytes en un bucle tight. Cada vuelta serializa, copia al pipe, deserializa. Con shared memory, esa misma operación se convierte en escribir bytes en una dirección.
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import time
import struct
# ── 1. Queue: múltiples productores / consumidores ──────────────────────
def producer(q: mp.Queue, n: int) -> None:
for i in range(n):
q.put({"id": i, "value": i ** 2})
q.put(None) # sentinel para que el consumidor sepa que terminó
def consumer(q: mp.Queue) -> None:
while True:
item = q.get()
if item is None:
break
# procesa normalmente; el pickle ya ocurrió en el otro lado
# ── 2. Pipe: comunicación bidireccional punto a punto ───────────────────
def worker_pipe(conn: mp.connection.Connection) -> None:
while True:
try:
msg = conn.recv() # bloquea hasta recibir
except EOFError:
break
if msg == "STOP":
conn.close()
break
conn.send(msg * 2) # responde al padre
# ── 3. shared_memory con numpy: sin pickle, sin copia ───────────────────
ARRAY_SIZE = 1_000_000 # floats de 64 bits → 8 MB
def fill_shared(shm_name: str, size: int) -> None:
shm = SharedMemory(name=shm_name)
arr = np.ndarray((size,), dtype=np.float64, buffer=shm.buf)
arr[:] = np.arange(size, dtype=np.float64)
shm.close() # cierra el mapeo local; no destruye el segmento
def reduce_shared(shm_name: str, size: int, result: mp.Value) -> None:
shm = SharedMemory(name=shm_name)
arr = np.ndarray((size,), dtype=np.float64, buffer=shm.buf)
result.value = float(arr.sum())
shm.close()
# ── 4. Value y Array: tipos simples con Lock incluido ───────────────────
def increment_counter(counter: mp.Value, n: int) -> None:
for _ in range(n):
with counter.get_lock(): # el lock es reentrante dentro del proceso
counter.value += 1
# ── Demostración ─────────────────────────────────────────────────────────
def demo_queue() -> None:
q: mp.Queue = mp.Queue(maxsize=100)
p = mp.Process(target=producer, args=(q, 5))
c = mp.Process(target=consumer, args=(q,))
p.start(); c.start()
p.join(); c.join()
print("Queue demo completado")
def demo_pipe() -> None:
parent_conn, child_conn = mp.Pipe(duplex=True)
w = mp.Process(target=worker_pipe, args=(child_conn,))
w.start()
child_conn.close() # el padre cierra su copia del extremo hijo
for val in [3, 7, 21]:
parent_conn.send(val)
print(f"Pipe: envié {val}, recibí {parent_conn.recv()}")
parent_conn.send("STOP")
w.join()
parent_conn.close()
def demo_shared_memory() -> None:
shm = SharedMemory(create=True, size=ARRAY_SIZE * np.dtype(np.float64).itemsize)
result = mp.Value("d", 0.0) # 'd' → double en ctypes
filler = mp.Process(target=fill_shared, args=(shm.name, ARRAY_SIZE))
reducer = mp.Process(target=reduce_shared, args=(shm.name, ARRAY_SIZE, result))
filler.start(); filler.join() # espera llenado antes de reducir
reducer.start(); reducer.join()
expected = ARRAY_SIZE * (ARRAY_SIZE - 1) / 2
print(f"Shared memory: suma={result.value:.0f}, esperado={expected:.0f}")
shm.close()
shm.unlink() # destruye el segmento del OS; DEBES llamarlo
def demo_value() -> None:
counter = mp.Value("i", 0) # 'i' → int en ctypes
procs = [mp.Process(target=increment_counter, args=(counter, 1000))
for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print(f"Value: contador final = {counter.value}") # debe ser 4000
if __name__ == "__main__":
demo_queue()
demo_pipe()
demo_shared_memory()
demo_value()
Lo que hace cada decisión de diseño
Queue con maxsize: sin límite, el hilo feeder puede acumular miles de ítems serializados en memoria del productor si el consumidor es lento. maxsize=100 aplica backpressure: q.put() bloquea cuando está llena, lo que convierte el canal en un mecanismo de control de flujo natural.
child_conn.close() en el padre después de Pipe: esto no es capricho. Cuando el proceso hijo cierra su extremo, el padre necesita recibir EOFError para saber que no hay más datos. Si el padre mantiene una copia abierta del extremo hijo, el OS ve que ese descriptor sigue vivo y nunca señaliza EOF. El close() es semántica de ownership, no limpieza opcional.
SharedMemory(create=True) y shm.unlink(): el segmento existe en el sistema de ficheros virtual del kernel (/dev/shm en Linux) hasta que alguien llama a unlink(). Si el proceso muere sin llamarlo, el segmento queda en el OS hasta el siguiente reinicio. close() solo cierra el mapeo local del proceso actual; unlink() destruye el segmento globalmente. En producción, maneja esto con try/finally o un gestor de contexto.
np.ndarray(..., buffer=shm.buf): numpy usa el buffer directamente —cero copias. shm.buf es un memoryview sobre el mmap. El array no tiene datos propios; es una vista. Por eso shm.close() invalida el array: libera el mapeo, y cualquier acceso posterior es comportamiento indefinido a nivel de OS.
mp.Value("d", 0.0) y get_lock(): el type code "d" es un ctypes.c_double. El lock que viene incluido es válido entre procesos, pero counter.value += 1 sin get_lock() es una race condition: la lectura, el incremento y la escritura son tres operaciones separadas en el nivel de bytecode. El lock hace esas tres operaciones atómicas desde la perspectiva de otros procesos.
La razón por la que fill_shared termina antes de que reduce_shared empiece (join explícito entre los dos procesos) es que no hay sincronización implícita en la memoria compartida. Sin esa barrera, el reductor podría leer el array a medias. Para pipelines más complejos, necesitarás mp.Event, mp.Semaphore o barriers explícitos.
Errores que debes conocer
Error: llamar shm.unlink() en cada proceso que accede al segmento, no solo en el creador.
# ❌ Wrong
def worker(shm_name):
shm = SharedMemory(name=shm_name)
# ... usa el segmento ...
shm.close()
shm.unlink() # destruye el segmento para TODOS los otros procesos también
# ✅ Right
def worker(shm_name):
shm = SharedMemory(name=shm_name)
# ... usa el segmento ...
shm.close() # solo cierra el mapeo local
# el creador es el responsable de llamar unlink()
Solo el proceso creador debería llamar unlink(), idealmente en un finally. Los demás solo hacen close().
Error: usar mp.Queue para transferir arrays numpy grandes en un bucle de alta frecuencia.
# ❌ Wrong
def worker(q):
while True:
arr = q.get() # pickle.loads de 8 MB en cada iteración
process(arr)
# ✅ Right — SharedMemory + un índice/señal pequeña por Queue
def worker(shm_name, size, signal_q):
shm = SharedMemory(name=shm_name)
arr = np.ndarray((size,), dtype=np.float64, buffer=shm.buf)
while True:
idx = signal_q.get() # solo transfiere un entero por pickle
if idx is None:
break
process(arr[idx]) # lee directamente de memoria compartida
El Queue sigue siendo útil para coordinación (señales pequeñas); los datos grandes viven en el segmento compartido y no cruzan ninguna frontera de serialización.
Error: no cerrar el extremo del Pipe que no usa el proceso actual.
# ❌ Wrong parent_conn, child_conn = mp.Pipe() w = mp.Process(target=worker, args=(child_conn,)) w.start() # parent_conn sigue abierto en el hijo también (heredado por fork) # el hijo nunca recibe EOF aunque el padre cierre parent_conn # ✅ Right w.start() child_conn.close() # padre cierra su copia del extremo hijo, inmediatamente tras fork
En sistemas que usan fork, el proceso hijo hereda todos los descriptores abiertos. Cerrar explícitamente el extremo que no corresponde es lo que hace que EOF se propague correctamente.
N° 144