Un pipeline es un patrón de diseño donde un flujo de datos atraviesa una serie de etapas (stages) conectadas por channels. Cada stage es una unidad de procesamiento independiente que opera en su propia goroutine, consumiendo de un canal upstream, realizando una transformación o proceso, y enviando el resultado a un canal downstream. Este modelo permite procesar flujos de datos (streams) de forma concurrente, donde cada etapa puede trabajar en un elemento diferente del flujo al mismo tiempo.
El diseño se basa en la composición de funciones que actúan como transformadores. El sistema aprovecha el backpressure natural de Go: si un stage downstream es más lento que el upstream, los canales se llenarán y los stages anteriores se bloquearán automáticamente en la operación de envío, evitando que la memoria se sature con datos que no se pueden procesar. Es la forma ideal de construir sistemas de procesamiento de datos de alto rendimiento, pero requiere un control estricto del ciclo de vida. Si no gestionas correctamente la cancelación mediante context.Context o no cierras los canales adecuadamente, las goroutines se quedarán bloqueadas esperando datos o esperando a que alguien los lea, provocando una fuga de goroutines (goroutine leak) que eventualmente agotará los recursos del sistema.
Para implementar esto de forma robusta en producción, no basta con lanzar goroutines; necesitas una forma de propagar errores y señales de terminación a través de todo el pipeline. Aquí es donde entra en juego golang.org/x/sync/errgroup, que permite coordinar un grupo de goroutines y asegurar que, si una etapa falla, todas las demás se cancelen inmediatamente.
package main
import (
"context"
"errors"
"fmt"
"os"
"time"
"golang.org/x/sync/errgroup"
)
// Result encapsula el valor o el error que viaja por el pipeline.
// Es vital pasar errores a través de los canales en pipelines complejos.
type Result struct {
Value int
Err error
}
// generator convierte una fuente de datos (un slice) en un flujo de enteros.
func generator(ctx context.Context, numbers []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range numbers {
select {
case <-ctx.Done():
return
case out <- n:
}
}
}()
return out
}
// transformer realiza una operación pesada (simulada) sobre cada número.
// Aquí implementamos un patrón de worker pool dentro de un stage.
func transformer(ctx context.Context, in <-chan int) <-chan Result {
out := make(chan Result)
// Usamos un errgroup para gestionar los workers de este stage específico.
g, ctx := errgroup.WithContext(ctx)
// Lanzamos 3 workers para paralelizar la transformación.
for i := 0; i < 3; i++ {
g.Go(func() error {
for n := range in {
// Simulamos trabajo intensivo de CPU o latencia de red.
time.Sleep(50 * time.Millisecond)
// Simulamos un error ante un valor específico para probar la cancelación.
if n == 13 {
return errors.New("número de la mala suerte detectado")
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- Result{Value: n * n}:
}
}
return nil
})
}
// Una goroutine de limpieza para cerrar el canal cuando todos los workers terminen.
go func() {
_ = g.Wait()
close(out)
}()
return out
}
func main() {
// Contexto base para la ejecución del pipeline.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
nums := []int{2, 4, 6, 13, 18, 20} // El 13 disparará el error.
// Construcción del pipeline (composición de stages).
// generator -> transformer -> sink (main)
source := generator(ctx, nums)
results := transformer(ctx, source)
// El sink (consumidor final) procesa los resultados.
for res := range results {
if res.Err != nil {
fmt.Fprintf(os.Stderr, "Pipeline fallido: %v\n", res.Err)
return
}
fmt.Printf("Resultado procesado: %d\n", res.Value)
}
fmt.Println("Pipeline completado con éxito")
}
Análisis del pipeline
En el código anterior, hemos construido una cadena de transformación. El generator es el origen; su única responsabilidad es alimentar el canal out y cerrarlo mediante defer close(out) cuando no hay más datos. Es fundamental que use select con ctx.Done() al enviar; sin esto, si el pipeline se detiene por un error en una etapa posterior, el generator se quedaría bloqueado para siempre intentando enviar el último elemento.
El transformer es el núcleo de la concurrencia en este ejemplo. En lugar de procesar uno a uno, implementa un patrón de worker pool interno usando errgroup.Group. Al pasarle el ctx del pipeline a errgroup.WithContext(ctx), creamos una jerarquía de cancelación: si cualquiera de los 3 workers devuelve un error (como ocurre con el 13), el errgroup cancela el contexto. Esto hace que las demás goroutines en este stage y en todos los stages anteriores que escuchen <-ctx.Done() reciban la señal de salida inmediatamente.
La función g.Wait() se ejecuta en una goroutine separada para evitar bloquear el flujo principal del stage. Su función es esperar a que todos los workers terminen antes de cerrar el canal out, garantizando que no haya envíos en un canal cerrado, lo cual causaría un panic.
El error frecuente
Un error clásico al construir pipelines es olvidar la escucha del contexto durante el envío en canales no bufferizados. Si un stage tiene un consumidor que decide dejar de leer (por ejemplo, porque detectó un error o se acabó su tiempo de espera), pero el productor no está escuchando ctx.Done(), el productor se quedará bloqueado en la línea out <- n indefinidamente.
// ERROR: Goroutine que se queda bloqueada para siempre (leak)
func leakyStage(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
// Si 'out' ya no tiene quien lo lea (porque el sink falló),
// esta goroutine se queda colgada aquí eternamente.
out <- n
}
}()
return out
}
Para evitar esto, siempre debes envolver tus envíos en un bloque select que incluya el canal de cancelación del contexto.
N° 139