Implementación de Fan-out y Fan-in en Go

Cuando necesitas procesar un volumen masivo de datos de forma eficiente, no puedes limitarte a un hilo de ejecución secuencial. El procesamiento paralelo en Go se apoya en dos patrones fundamentales que permiten escalar la carga de trabajo: Fan-out y Fan-in.

El Fan-out ocurre cuando múltiples goroutines leen de un mismo canal de entrada, distribuyendo el trabajo entre ellas para procesarlo en paralelo. Es la clave para aprovechar todos los núcleos de tu CPU o para manejar múltiples llamadas de red simultáneas. Por su parte, el Fan-in es el proceso inverso: consiste en consolidar los resultados provenientes de múltiples canales de entrada en un único canal de salida, permitiendo que el consumidor final reciba un flujo continuo y ordenado de resultados.

Estos patrones funcionan gracias al modelo de comunicación de Go (CSP), donde el canal actúa como el mediador que sincroniza el flujo de datos. Sin embargo, la gestión del ciclo de vida es crítica. Si implementas un fanIn mal diseñado, podrías dejar goroutines colgadas en memoria (goroutine leaks) o causar un deadlock si el canal de salida se cierra antes de tiempo o nunca se cierra. Debes usar este patrón cuando tengas tareas independientes que puedan ejecutarse sin compartir estado y necesites centralizar su salida. Si fallas en la coordinación del cierre de canales, tu programa simplemente se quedará bloqueado esperando datos que nunca llegarán o fallará con un pánico al intentar escribir en un canal cerrado.

Para un control profesional de la concurrencia, si necesitas limitar cuántas goroutines se ejecutan simultáneamente (por ejemplo, para no saturar una base de datos), lo ideal es usar errgroup.Group con SetLimit [disponible desde Go 1.20] en lugar de lanzar goroutines de forma indiscriminada.

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Job representa la unidad de trabajo.
type Job struct {
	ID    int
	Value int
}

// Result representa el resultado de un procesamiento.
type Result struct {
	JobID int
	Output string
	Err   error
}

// producer genera una serie de tareas y las envía a un canal.
func producer(ctx context.Context, n int) <-chan Job {
	out := make(chan Job)
	go func() {
		defer close(out)
		for i := 1; i <= n; i++ {
			select {
			case <-ctx.Done():
				return
			case out <- Job{ID: i, Value: i * 10}:
			}
		}
	}()
	return out
}

// worker procesa trabajos desde un canal de entrada y los envía a su propio canal de salida.
// Esto es lo que permite el efecto Fan-out cuando varios workers escuchan al mismo canal.
func worker(ctx context.Context, id int, jobs <-chan Job) <-chan Result {
	out := make(chan Result)
	go func() {
		defer close(out)
		for job := range jobs {
			// Simulamos una tarea intensiva de CPU o I/O
			time.Sleep(100 * time.Millisecond)
			
			res := Result{JobID: job.ID, Output: fmt.Sprintf("Worker %d procesó %d", id, job.Value)}
			
			select {
			case <-ctx.Done():
				return
			case out <- res:
			}
		}
	}()
	return out
}

// fanIn consolida múltiples canales de resultados en un único canal de salida.
func fanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {
	out := make(chan Result)
	var wg sync.WaitGroup

	// Lanzamos una goroutine de multiplexación por cada canal de entrada.
	for _, c := range channels {
		wg.Add(1)
		go func(ch <-chan Result) {
			defer wg.Done()
			for r := range ch {
				select {
				case <-ctx.Done():
					return
				case out <- r:
				}
			}
		}(c)
	}

	// Esta goroutine es crucial: espera a que todos los multiplexores terminen
	// para cerrar el canal de salida final.
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 1. Generamos los datos (Source)
	jobsCh := producer(ctx, 10)

	// 2. Fan-out: Distribuimos el trabajo en 3 workers.
	// Cada worker recibe el mismo canal 'jobsCh' pero Go garantiza que 
	// cada job sea entregado a UN SOLO worker.
	worker1 := worker(ctx, 1, jobsCh)
	worker2 := worker(ctx, 2, jobsCh)
	worker3 := worker(ctx, 3, jobsCh)

	// 3. Fan-in: Consolidamos los 3 canales de resultados en uno solo.
	results := fanIn(ctx, worker1, worker2, worker3)

	// 4. Consumo final de los resultados consolidado.
	for res := range results {
		if res.Err != nil {
			fmt.Printf("Error en job %d: %v\n", res.JobID, res.Err)
			continue
		}
		fmt.Println(res.Output)
	}
}

Desglose del patrón

En el código anterior, la arquitectura sigue un flujo lineal de datos pero con ramificaciones en la ejecución.

Primero, el producer crea un canal de Job y lo cierra cuando termina de emitir datos, lo cual es vital para que los workers sepan que deben terminar.

En la sección de Fan-out, observa cómo pasamos el mismo jobsCh a tres llamadas de worker. No es un error: en Go, cuando múltiples goroutines leen de un mismo canal, el runtime asegura que cada mensaje sea entregado a una y solo una de las goroutines. Esto distribuye la carga automáticamente. Cada worker devuelve su propio canal de Result, creando múltiples flujos de datos independientes.

El corazón del Fan-in es la función fanIn. Aquí es donde aplicamos la lógica de multiplexación:
1. Usamos un sync.WaitGroup para rastrear cuántos canales de entrada estamos vigilando.
2. Lanzamos una goroutine por cada canal (for _, c := range channels). Estas goroutines actúan como “puentes”, leyendo de un canal de origen y escribiendo en el canal out.
3. La llamada a wg.Add(1) dentro del bucle es fundamental para asegurar que el contador sea correcto antes de lanzar las goroutines.
4. La decisión más crítica es la goroutine que ejecuta wg.Wait(). Si llamaras a wg.Wait() en el cuerpo principal de fanIn, bloquearías la función y nunca retornararías el canal out, causando un deadlock inmediato. Al lanzarlo en su propia goroutine, permitimos que fanIn retorne el canal out mientras el proceso de espera ocurre en segundo plano.

El error frecuente

Un error clásico al implementar fanIn es cerrar el canal de salida (out) dentro de una de las goroutines de multiplexación o intentar cerrarlo en la función principal sin esperar a las demás.

// ERROR: Esto causará un panic: "send on closed channel" 
// o un deadlock si se cierra prematuramente.
for _, c := range channels {
    go func(ch <-chan Result) {
        for r := range ch {
            out <- r
        }
        close(out) // <--- ERROR: Se cierra el canal compartido demasiado pronto
    }(c)
}

Si una goroutine cierra out mientras otra todavía está intentando escribir en él, el programa colapsará con un panic. La única forma segura de cerrar un canal multiplexado es mediante la combinación de un sync.WaitGroup y una goroutine dedicada exclusivamente a esperar la finalización de todos los productores antes de ejecutar el close(out).

140

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio