Multiprocessing en Python: paralelismo real esquivando el GIL

El GIL (Global Interpreter Lock) es un mutex que el intérprete CPython mantiene para proteger el estado interno del runtime. Solo un thread puede ejecutar bytecode Python en cualquier momento dado — aunque tengas 16 núcleos y 16 threads activos. Para trabajo I/O-bound esto no importa demasiado porque el GIL se libera durante operaciones bloqueantes. Para trabajo CPU-bound es un techo de cristal: más threads no significa más velocidad.

multiprocessing resuelve esto de forma directa y sin magia: crea procesos separados del sistema operativo. Cada proceso tiene su propio intérprete CPython, su propio heap, y — crucialmente — su propio GIL independiente. El sistema operativo puede distribuir esos procesos en núcleos distintos y ejecutarlos en paralelo de verdad.

El diseño implica un trade-off que debes internalizar antes de escribir una sola línea. Crear un proceso implica hacer un fork() o spawn() del proceso actual, inicializar un intérprete Python nuevo, y cargar los módulos necesarios. Eso cuesta entre 100 y 1000 veces más que crear un thread. Además, los procesos no comparten memoria — cualquier dato que pase entre ellos debe serializarse con pickle, cruzar una pipe o socket, y deserializarse en el otro extremo.

La pregunta práctica es: ¿cuánto trabajo va a hacer cada proceso? Si la tarea dura 50 ms y el overhead de crear el proceso es 30 ms, el beneficio es marginal. Si la tarea dura 10 segundos, el overhead desaparece en el ruido. multiprocessing vale la pena cuando el trabajo por unidad es lo suficientemente pesado como para amortizar el costo de arranque y el costo de serialización de los datos.

Lo que rompe este cálculo: tareas muy cortas en volumen alto, objetos no-picklables (locks, conexiones de base de datos, file handles), y asumir que la memoria se comparte cuando no se comparte — mutaciones a variables globales en un proceso hijo no afectan al padre ni a los otros hijos.

import multiprocessing as mp
import time
import math


def is_prime(n: int) -> bool:
    """CPU-bound pura: sin I/O, sin liberar el GIL."""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(math.isqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True


def count_primes_in_range(start: int, end: int) -> int:
    """Cada proceso ejecuta esto de forma completamente independiente."""
    return sum(1 for n in range(start, end) if is_prime(n))


def split_range(total_end: int, num_chunks: int) -> list[tuple[int, int]]:
    chunk_size = total_end // num_chunks
    chunks = []
    for i in range(num_chunks):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_chunks - 1 else total_end
        chunks.append((start, end))
    return chunks


def run_sequential(limit: int) -> tuple[int, float]:
    t0 = time.perf_counter()
    result = count_primes_in_range(0, limit)
    return result, time.perf_counter() - t0


def run_parallel(limit: int, num_workers: int) -> tuple[int, float]:
    chunks = split_range(limit, num_workers)

    t0 = time.perf_counter()
    # Pool gestiona el ciclo de vida de los procesos; starmap serializa
    # automáticamente los argumentos y resultados con pickle.
    with mp.Pool(processes=num_workers) as pool:
        partial_counts = pool.starmap(count_primes_in_range, chunks)

    total = sum(partial_counts)
    return total, time.perf_counter() - t0


if __name__ == "__main__":
    # El guard __main__ es obligatorio en plataformas que usan spawn
    # (Windows, macOS por defecto desde Python 3.8).
    # Sin él, cada proceso hijo importa el módulo y vuelve a ejecutar
    # este bloque, creando un fork bomb.

    LIMIT = 2_000_000
    WORKERS = mp.cpu_count()

    print(f"Núcleos disponibles: {WORKERS}")
    print(f"Buscando primos hasta {LIMIT:,}\n")

    seq_count, seq_time = run_sequential(LIMIT)
    print(f"Secuencial : {seq_count:,} primos en {seq_time:.3f}s")

    par_count, par_time = run_parallel(LIMIT, WORKERS)
    print(f"Paralelo   : {par_count:,} primos en {par_time:.3f}s")

    # Ambos deben dar el mismo resultado; si no, hay un bug en split_range.
    assert seq_count == par_count, "Los resultados no coinciden"

    speedup = seq_time / par_time
    print(f"\nSpeedup: {speedup:.2f}x con {WORKERS} procesos")
    print(f"Eficiencia: {speedup / WORKERS * 100:.1f}% por núcleo")

Lo que este código ilustra

count_primes_in_range es puro trabajo de CPU — un bucle en Python puro sin ninguna llamada que libere el GIL. Con threads, ejecutarlos en paralelo no haría nada porque el GIL impide que dos threads ejecuten este bytecode al mismo tiempo. Con procesos, cada uno tiene su propio intérprete y puede quemar un núcleo completo independientemente.

El with mp.Pool(...) as pool es el patrón correcto de gestión de recursos. Pool mantiene un pool de procesos pre-creados — no crea y destruye un proceso por cada llamada a starmap. Esto amortiza el costo de arranque: pagas la creación de WORKERS procesos una vez, y luego los reutilizas para todas las tareas. Al salir del with, pool.terminate() y pool.join() se llaman automáticamente.

starmap recibe una lista de tuplas y las desempaqueta como argumentos posicionales — equivale a llamar count_primes_in_range(*chunk) para cada chunk. Los argumentos viajan del proceso padre a los hijos serializados con pickle, y los resultados (enteros simples en este caso) regresan por el mismo mecanismo. El costo de serialización aquí es insignificante porque tanto los argumentos como los resultados son integers pequeños; si estuvieras pasando DataFrames de 500 MB, la historia cambiaría radicalmente.

La función split_range divide el trabajo en partes aproximadamente iguales. Esto importa porque si un chunk es diez veces más grande que otro, el proceso que lo recibió seguirá trabajando mientras los otros están ociosos — el tiempo total lo dicta el proceso más lento. Para trabajo con distribución desigual, Pool.imap_unordered con chunks pequeños balancea mejor la carga dinámicamente.

El speedup real que verás en producción raramente es lineal. Con 8 núcleos, 5-6x es un resultado sólido. El resto se pierde en overhead de comunicación, en el hecho de que los núcleos comparten caché y memoria bandwidth, y en que el sistema operativo tiene otros procesos que atender.

Errores que debes conocer

Error: Usar multiprocessing sin el guard if __name__ == "__main__" en sistemas con método de inicio spawn.

# ❌ Wrong
import multiprocessing as mp

def worker(x):
    return x * x

pool = mp.Pool(4)  # En Windows esto se ejecuta en cada proceso hijo también
results = pool.map(worker, range(10))
# ✅ Right
import multiprocessing as mp

def worker(x):
    return x * x

if __name__ == "__main__":
    with mp.Pool(4) as pool:
        results = pool.map(worker, range(10))
    print(results)

El guard asegura que solo el proceso padre ejecuta el código de arranque; los hijos importan el módulo pero no entran al bloque __main__.


Error: Intentar pasar objetos no-picklables como argumentos o resultados.

# ❌ Wrong
import multiprocessing as mp
import threading

lock = threading.Lock()  # No es picklable

def worker(shared_lock):
    with shared_lock:
        print("trabajando")

if __name__ == "__main__":
    with mp.Pool(2) as pool:
        pool.map(worker, [lock, lock])  # PicklingError en tiempo de ejecución
# ✅ Right
import multiprocessing as mp

def worker(_):
    # Cada proceso trabaja de forma independiente; no hay estado compartido.
    print(f"trabajando en {mp.current_process().name}")

if __name__ == "__main__":
    with mp.Pool(2) as pool:
        pool.map(worker, range(2))

Si necesitas coordinación entre procesos, usa mp.Lock(), mp.Queue() o mp.Manager() — primitivas diseñadas específicamente para cruzar fronteras de proceso.


Error: Aplicar multiprocessing a tareas demasiado cortas donde el overhead domina.

# ❌ Wrong — overhead de proceso >> trabajo útil
import multiprocessing as mp

def trivial(x):
    return x + 1  # Microsegundos de trabajo

if __name__ == "__main__":
    with mp.Pool(8) as pool:
        # El overhead de pickle + IPC por cada número destruye el rendimiento
        results = pool.map(trivial, range(100_000))
# ✅ Right — agrupa el trabajo para amortizar el overhead
import multiprocessing as mp

def process_chunk(chunk: list[int]) -> list[int]:
    return [x + 1 for x in chunk]  # Muchos elementos por viaje de IPC

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

if __name__ == "__main__":
    data = list(range(100_000))
    chunk_size = len(data) // mp.cpu_count()
    with mp.Pool() as pool:
        nested = pool.map(process_chunk, list(chunks(data, chunk_size)))
    results = [x for sub in nested for x in sub]

Enviar lotes reduce el número de viajes de serialización de 100,000 a cpu_count(), que es donde está la diferencia entre más lento que secuencial y genuinamente más rápido.

142

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio