Patrones de mensajería con NATS, Kafka y RabbitMQ en Go

La comunicación entre microservicios depende de la elección del broker adecuado según el compromiso entre latencia, durabilidad y complejidad de enrutamiento. NATS es la opción de baja latencia para mensajería interna rápida; utiliza el concepto de Queue Groups para distribuir mensajes entre instancias de un mismo servicio de forma sencilla, pero su modo estándar es efímero (sin persistencia). Si necesitas Event Sourcing o pipelines de datos masivos donde sea vital re-procesar eventos pasados, Kafka es el estándar: su arquitectura basada en un log de append-only y la gestión de offsets por partición permite una durabilidad extrema y replay de datos. Por otro lado, RabbitMQ brilla cuando la lógica de enrutamiento es compleja (usando exchanges y bindings para dirigir mensajes a colas específicas según reglas de routing), siendo ideal para flujos de trabajo con garantías de entrega granular.

El problema surge cuando la implementación en Go no respeta el ciclo de vida de los procesos distribuidos. Si usas Kafka, debes gestionar la asignación de particiones; si usas NATS, debes manejar la reconexión; y en cualquiera de ellos, si no gestionas correctamente el contexto y los grupos de error, terminarás con goroutines huérfanas o procesos que no cierran limpiamente cuando el orquestador (como Kubernetes) envía una señal de terminación.

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"golang.org/x/sync/errgroup"
)

// Message representa un mensaje genérico de nuestro sistema distribuido.
type Message struct {
	ID        string
	Payload   string
	Partition int
}

// Consumer define la interfaz para cualquier cliente de mensajería (NATS, Kafka o RabbitMQ).
// Implementar esto permite que el código de negocio sea agnóstico al broker.
type Consumer interface {
	// Fetch recupera el siguiente mensaje. Debe respetar el contexto.
	Fetch(ctx context.Context) (*Message, error)
	// Close cierra la conexión de forma limpia.
	Close() error
}

// MockKafkaConsumer simula un consumidor de Kafka que lee de una partición específica.
type MockKafkaConsumer struct {
	partition int
	data      chan *Message
}

func (m *MockKafkaConsumer) Fetch(ctx context.Context) (*Message, error) {
	select {
	case msg, ok := <-m.data:
		if !ok {
			return nil, errors.New("canal de datos cerrado")
		}
		return msg, nil
	case <-ctx.Done():
		return nil, ctx.Err()
	}
}

func (m *MockKafkaConsumer) Close() error {
	close(m.data)
	return nil
}

func main() {
	// El contexto es el mecanismo principal para propagar la señal de apagado.
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer stop()

	// errgroup coordina la ejecución de múltiples goroutines y captura el primer error.
	g, ctx := errgroup.WithContext(ctx)

	// Simulamos un canal de mensajes que vendría de un broker real.
	msgChan := make(chan *Message)
	
	// Inicializamos un pool de consumidores (por ejemplo, uno por cada partición de Kafka).
	numPartitions := 3
	consumers := make([]*MockKafkaConsumer, numPartitions)

	for i := 0; i < numPartitions; i++ {
		consumers[i] = &MockKafkaConsumer{
			partition: i,
			data:      msgChan,
		}
	}

	// Lanzamos el worker pool.
	for i := 0; i < numPartitions; i++ {
		partitionID := i
		consumer := consumers[i]

		g.Go(func() error {
			fmt.Printf("[Worker] Iniciando consumidor en partición %d\n", partitionID)
			for {
				msg, err := consumer.Fetch(ctx)
				if err != nil {
					// Si el error es por cancelación de contexto, es un shutdown normal.
					if errors.Is(err, context.Canceled) {
						fmt.Printf("[Worker %d] Shutdown controlado\n", partitionID)
						return nil
					}
					return fmt.Errorf("error en worker %d: %w", partitionID, err)
				}

				// Simulación de procesamiento de lógica de negocio.
				fmt.Printf("[Worker %d] Procesando mensaje %s (Partición: %d)\n", partitionID, msg.ID, msg.Partition)
				time.Sleep(500 * time.Millisecond) 
			}
		})
	}

	// Goroutine para simular la llegada de mensajes desde el broker.
	g.Go(func() error {
		defer close(msgChan)
		for i := 0; i < 10; i++ {
			select {
			case <-ctx.Done():
				return nil
			case <-time.After(300 * time.Millisecond):
				msgChan <- &Message{
					ID:        fmt.Sprintf("msg-%d", i),
					Payload:   "datos de carga",
					Partition: i % numPartitions,
				}
			}
		}
		return nil
	})

	// Esperamos a que todos los workers terminen o uno falle.
	if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
		fmt.Printf("Error fatal en el sistema: %v\n", err)
		os.Exit(1)
	}

	fmt.Println("Todos los servicios se han detenido correctamente.")
}

Desglose de la implementación

La estructura se basa en la separación de preocupaciones mediante la interfaz Consumer. En un escenario real, MockKafkaConsumer sería reemplazado por un cliente de segmentio/kafka-go o nats.io/nats.go.

  1. Gestión de Ciclo de Vida: Utilizamos signal.NotifyContext para capturar SIGTERM. Esto es crítico en entornos de contenedores. Cuando recibimos la señal, el ctx.Done() se cierra, lo que propaga la señal de salida a todos los consumidores.
  2. El patrón errgroup: En lugar de lanzar goroutines sueltas con go func(), usamos g.Go. Esto nos permite capturar errores de cualquier worker. Si un consumidor de una partición de Kafka falla (por ejemplo, por un error de red persistente), errgroup cancela el contexto de todos los demás workers y el proceso principal recibe el error para reportarlo al orquestador.
  3. El loop de consumo: Fíjate en el select dentro de Fetch. Es vital que la lectura del broker sea sensible al ctx.Done(). Si el método Fetch fuera una llamada de red bloqueante que no acepta context.Context, la goroutine se quedaría colgada incluso después de que la aplicación intente cerrarse, provocando un zombie process.
  4. Concurrencia por partición: El bucle for i := 0; i < numPartitions; i++ ilustra cómo escalar horizontalmente el procesamiento. En Kafka, esto garantiza que no haya dos consumidores leyendo la misma partición, evitando colisiones de orden de mensajes.

El error frecuente

Un error clásico es no manejar la cancelación del contexto dentro de un bucle de consumo que utiliza canales, especialmente cuando el canal de datos no es el que se cierra, sino el que se lee.

// ERROR: Este worker nunca termina si msgChan no se cierra, 
// aunque el context sea cancelado, porque Fetch podría bloquearse.
func badWorker(ctx context.Context, consumer Consumer) error {
    for {
        // Si Fetch no implementa select con ctx.Done(), 
        // el worker se queda bloqueado aquí para siempre si no hay mensajes.
        msg, err := consumer.Fetch(ctx) 
        if err != nil {
            return err
        }
        process(msg)
    }
}

Si Fetch es una implementación síncrona que no escucha el ctx.Done(), la goroutine queda bloqueada en la llamada de red. Siempre asegura que tus métodos de cliente de mensajería sean context-aware.

232

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio