Cuando lanzas un Pool.map o metes algo en una Queue, Python no comparte memoria — serializa los datos con pickle y los reconstruye al otro lado. Ese “al otro lado” puede ser un proceso hijo en la misma máquina o un worker remoto; lo que importa es que pickle es el contrato de transporte, y ese contrato tiene reglas estrictas que, si las ignoras, te explotan en producción de formas confusas.
El mecanismo es directo: cuando llamas pool.map(fn, iterable), el proceso principal serializa fn y cada elemento del iterable con pickle.dumps, los envía por un pipe interno, y el worker los reconstruye con pickle.loads. Si cualquier pieza de ese proceso falla, obtienes un PicklingError o, peor, un AttributeError críptico. El diseño existe porque procesos separados tienen espacios de memoria completamente distintos — no hay otra forma de mover datos entre ellos sin una frontera de serialización explícita.
La regla de oro: solo lo que tiene una referencia importable desde el módulo puede cruzar la frontera. Pickle no serializa bytecode — serializa nombres. Para una función, serializa su módulo y su nombre cualificado (__module__ + __qualname__). Al deserializar, el worker hace import modulo; getattr(modulo, nombre). Si esa ruta no existe en el worker, falla. Por eso las funciones definidas a nivel de módulo funcionan, y las lambdas, funciones locales y closures no — no tienen una ruta importable estable.
Lo que definitivamente no sobrevive pickle: conexiones a bases de datos, file handles, sockets, locks de threading, iteradores con estado interno, y cualquier objeto cuya identidad depende del proceso que lo creó. Si intentas pasarlos, el error puede ser inmediato o, más traicionero, puede serializar silenciosamente y fallar al deserializar.
# worker_tasks.py ← debe ser importable como módulo independiente
import os
import time
from multiprocessing import Pool, Queue, Process
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import ctypes
import multiprocessing as mp
# ✅ Función a nivel de módulo: pickle serializa "worker_tasks.compute_heavy"
def compute_heavy(item: tuple[int, float]) -> float:
idx, value = item
# Simulamos trabajo intensivo con el GIL liberado (operación pura en C)
return sum(value ** 0.5 for _ in range(10_000))
# ✅ Clase con estado picklable: todos sus atributos deben serializarse
class Transformer:
def __init__(self, scale: float, offset: float):
self.scale = scale
self.offset = offset
# __call__ hace la instancia usable como función en Pool.map
def __call__(self, x: float) -> float:
return x * self.scale + self.offset
# ✅ Alternativa para estado compartido de solo lectura: initializer
_shared_config: dict = {}
def init_worker(config: dict) -> None:
"""
Se ejecuta UNA vez por worker al arrancar el pool.
Evita serializar `config` en cada tarea individual.
"""
global _shared_config
_shared_config = config
def task_with_config(x: float) -> float:
# Lee del estado local del worker, no hay pickle en cada llamada
return x * _shared_config["multiplier"]
# ✅ Shared memory para arrays grandes: cruzamos solo el nombre del segmento
def fill_shared_array(args: tuple[str, int, int]) -> None:
shm_name, start, end = args
shm = SharedMemory(name=shm_name)
arr = np.ndarray((100,), dtype=np.float64, buffer=shm.buf)
for i in range(start, end):
arr[i] = i * 2.0
shm.close() # no unlink — el creador es responsable de destruirlo
def demonstrate_patterns() -> None:
data = [(i, float(i)) for i in range(20)]
# --- Patrón 1: función de módulo ---
with Pool(processes=4) as pool:
results = pool.map(compute_heavy, data)
print(f"compute_heavy resultados (primeros 3): {results[:3]}")
# --- Patrón 2: instancia callable ---
transformer = Transformer(scale=2.5, offset=1.0)
with Pool(processes=4) as pool:
transformed = pool.map(transformer, [x for _, x in data])
print(f"Transformer resultados (primeros 3): {transformed[:3]}")
# --- Patrón 3: initializer para config compartida ---
config = {"multiplier": 3.14}
with Pool(processes=4, initializer=init_worker, initargs=(config,)) as pool:
configured = pool.map(task_with_config, [float(i) for i in range(10)])
print(f"Con config compartida (primeros 3): {configured[:3]}")
# --- Patrón 4: shared_memory para arrays numpy grandes ---
shm = SharedMemory(create=True, size=100 * np.float64().itemsize)
arr = np.ndarray((100,), dtype=np.float64, buffer=shm.buf)
arr[:] = 0.0 # inicializar
segments = [(shm.name, i * 25, (i + 1) * 25) for i in range(4)]
with Pool(processes=4) as pool:
pool.map(fill_shared_array, segments)
print(f"Shared memory array (primeros 5): {arr[:5]}")
shm.close()
shm.unlink() # destruir el segmento — solo el creador debe hacer esto
if __name__ == "__main__":
demonstrate_patterns()
Qué está pasando realmente en cada patrón
compute_heavy como función de módulo funciona porque pickle escribe literalmente el string "worker_tasks.compute_heavy". El worker importa worker_tasks y busca ese atributo. Si mueves la función a un if __name__ == "__main__":, deja de ser importable y falla.
Transformer como callable resuelve el problema clásico de “necesito pasar estado a mis workers”. La instancia se pickle entera — sus atributos scale y offset son floats, perfectamente serializables. El truco es __call__: Pool.map solo necesita algo que sea invocable con un argumento, y una instancia con __call__ cumple ese contrato. Esto es superior a una closure porque la ruta pickle de la instancia es estable.
El patrón initializer es el que más gente ignora y el que más duele ignorar. Si tienes configuración pesada — un modelo de ML cargado, un diccionario grande, una conexión — y la serializas en cada llamada a pool.map, estás pagando el coste de pickle en cada tarea. Con initializer, pagas una sola vez al arrancar el pool, y cada worker tiene su propia copia local sin roundtrip de serialización.
SharedMemory cambia completamente el modelo: en vez de mover datos, compartimos un segmento de memoria del sistema operativo identificado por nombre. Lo que cruzamos la frontera de proceso es solo ese string. El array numpy de 100 elementos podría ser de 100 millones y el coste de pickle seguiría siendo el coste de serializar un string corto. La regla crítica: el proceso que llama create=True es el dueño y el único que debe llamar unlink(). Si un worker llama unlink(), destruyes el segmento para todos.
Errores que debes conocer
Error: pasar una lambda a Pool.map — las lambdas son funciones anónimas sin nombre cualificado importable, pickle no puede encontrarlas en el módulo.
# ❌ Wrong
with Pool() as pool:
results = pool.map(lambda x: x * 2, range(10))
# AttributeError: Can't pickle local object '<lambda>'
# ✅ Right
def double(x: int) -> int:
return x * 2
with Pool() as pool:
results = pool.map(double, range(10))
La función de módulo tiene __qualname__ == "double", que pickle puede resolver; la lambda tiene __qualname__ == "<lambda>", que no corresponde a ningún atributo importable.
Error: intentar pasar un objeto con recursos del sistema operativo — las conexiones de base de datos, sockets y locks se crean en el proceso padre y no pueden reconstituirse en el hijo.
# ❌ Wrong
import sqlite3
conn = sqlite3.connect("data.db")
with Pool() as pool:
pool.map(lambda x: conn.execute(f"SELECT {x}"), range(10))
# TypeError: cannot pickle 'sqlite3.Connection' object
# ✅ Right
def query_db(x: int) -> list:
# Cada worker crea su propia conexión — conexiones no son thread/process-safe
conn = sqlite3.connect("data.db")
result = conn.execute(f"SELECT {x}").fetchall()
conn.close()
return result
with Pool() as pool:
results = pool.map(query_db, range(10))
Cada worker crea su propio recurso del sistema operativo en su propio espacio de proceso; es el único modelo correcto con recursos que tienen afinidad de proceso.
Error: definir la función worker dentro de if __name__ == "__main__" o dentro de otra función — aunque Python no lanza error inmediatamente, el worker no puede importarla.
# ❌ Wrong
if __name__ == "__main__":
def process_item(x): # __qualname__ es resolvible localmente, no remotamente
return x ** 2
with Pool() as pool:
pool.map(process_item, range(10))
# ✅ Right
# process_item definida a nivel de módulo, FUERA del bloque if __name__
def process_item(x: int) -> int:
return x ** 2
if __name__ == "__main__":
with Pool() as pool:
pool.map(process_item, range(10))
El if __name__ == "__main__" protege la creación del pool (correcto), pero la función worker debe vivir al nivel del módulo para ser importable por los workers hijos.
Si llegás a un caso donde realmente necesitás pasar algo que no es picklable y ninguno de estos patrones aplica, cloudpickle (usado por joblib y dask) puede serializar closures y funciones locales instrumentando el bytecode directamente — pero es una dependencia adicional con su propio modelo mental.
N° 146