Cuando un servicio dependiente (downstream) empieza a fallar o a responder con una latencia excesiva, el sistema que lo consume suele enfrentar un dilema: ¿esperar el timeout y agotar sus propios recursos o fallar de inmediato? Si no aplicas estrategias de resiliencia, una lentitud en un servicio externo puede propagar una cascada de fallos que termine bloqueando todas las goroutines de tu propia aplicación, causando un colapso total.
Para evitar esto, aplicamos tres mecanismos fundamentales. El Circuit Breaker actúa como un interruptor de seguridad; cuando detecta que un servicio está fallando sistemáticamente, “abre” el circuito y rechaza todas las llamadas inmediatamente sin siquiera intentar la conexión, permitiendo que el servicio externo se recupere. El Bulkhead (mamparo) consiste en aislar recursos (como pools de conexiones o límites de goroutines) para que el fallo o la saturación de una tarea específica no afecte a las demás. Finalmente, el Backpressure es el mecanismo de control de flujo: cuando un productor genera datos más rápido de lo que un consumidor puede procesarlos, el sistema debe señalizar al productor para que reduzca su tasa o, en su defecto, rechazar la carga sobrante para evitar una explosión de memoria.
Si implementas mal el Circuit Breaker, podrías rechazar peticiones legítimas durante picos de latencia transitorios. Si fallas en el Bulkhead, una sola consulta pesada podría consumir todo el pool de conexiones de la base de datos, dejando al resto de la API sin recursos. Si ignoras el Backpressure y confías en canales sin límite o colas infinitas, tu proceso terminará con un out of memory (OOM) cuando la demanda supere la capacidad de procesamiento.
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// --- CIRCUIT BREAKER ---
type State int
const (
Closed State = iota
Open
HalfOpen
)
// CircuitBreaker implementa una máquina de estados para evitar llamadas a servicios fallidos.
type CircuitBreaker struct {
mu sync.RWMutex
state State
failures int
threshold int
timeout time.Duration
nextAllowed time.Time
}
func NewCircuitBreaker(threshold int, timeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{threshold: threshold, timeout: timeout}
}
func (cb *CircuitBreaker) Execute(fn func() error) error {
cb.mu.Lock()
if cb.state == Open {
if time.Now().After(cb.nextAllowed) {
cb.state = HalfOpen
} else {
cb.mu.Unlock()
return errors.New("circuit breaker is open")
}
}
cb.mu.Unlock()
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
if cb.failures >= cb.threshold {
cb.state = Open
cb.nextAllowed = time.Now().Add(cb.timeout)
}
return err
}
// Si la llamada fue exitosa y estábamos en HalfOpen, cerramos el circuito
if cb.state == HalfOpen {
cb.state = Closed
cb.failures = 0
}
return nil
}
// --- BULKHEAD ---
// Bulkhead limita la concurrencia de un tipo específico de tarea.
type Bulkhead struct {
semaphore chan struct{}
}
func NewBulkhead(maxConcurrent int) *Bulkhead {
return &Bulkhead{semaphore: make(chan struct{}, maxConcurrent)}
}
func (b *Bulkhead) Execute(fn func()) error {
select {
case b.semaphore <- struct{}{}:
defer func() { <-b.semaphore }()
fn()
return nil
default:
return errors.New("bulkhead capacity reached")
}
}
// --- SIMULACIÓN DEL SISTEMA ---
type ExternalService struct {
fail bool
}
func (s *ExternalService) Call() error {
if s.fail {
return errors.New("downstream error")
}
time.Sleep(50 * time.Millisecond) // Simula latencia
return nil
}
func main() {
service := &ExternalService{fail: false}
cb := NewCircuitBreaker(3, 2*time.Second)
bh := NewBulkhead(2) // Solo 2 tareas pesadas simultáneas
// Canal con capacidad limitada para Backpressure
// Si el canal se llena, rechazamos la carga inmediatamente.
workQueue := make(chan int, 5)
// Worker que procesa tareas (Consumer)
go func() {
for job := range workQueue {
// Aplicamos Bulkhead para proteger recursos de tareas pesadas
err := bh.Execute(func() {
// Aplicamos Circuit Breaker para la llamada externa
err := cb.Execute(func() error {
return service.Call()
})
if err != nil {
fmt.Printf("[Worker] Error procesando job %d: %v\n", job, err)
} else {
fmt.Printf("[Worker] Job %d completado con éxito\n", job)
}
})
if err != nil {
fmt.Printf("[Worker] Bulkhead rechazó job %d: %v\n", job, err)
}
}
}()
// Simulador de tráfico (Producer)
for i := 1; i <= 20; i++ {
// BACKPRESSURE: Intentar enviar al canal con un select no bloqueante
select {
case workQueue <- i:
fmt.Printf("[Producer] Enviado job %d\n", i)
default:
fmt.Printf("[Producer] Backpressure: Rechazando job %d (cola llena)\n", i)
}
// Simular que el servicio externo empieza a fallar a mitad del proceso
if i == 10 {
fmt.Println("!!! El servicio externo empezó a fallar !!!")
service.fail = true
}
// El servicio se recupera después de un tiempo
if i == 17 {
fmt.Println("!!! El servicio externo se recuperó !!!")
service.fail = false
}
time.Sleep(100 * time.Millisecond)
}
close(workQueue)
time.Sleep(3 * time.Second) // Esperar a que terminen los workers
}
Desglose de la implementación
El código implementa un pipeline de procesamiento de tareas que utiliza los tres patrones para proteger la integridad del proceso.
En el CircuitBreaker, la lógica de Execute gestiona la transición de estados. Cuando el número de failures alcanza el threshold, el estado cambia a Open y se establece un nextAllowed. Durante este tiempo, cualquier llamada a Execute retorna un error inmediatamente sin ejecutar el callback fn, lo que ahorra tiempo de espera y recursos de CPU. Si el tiempo ha pasado, entramos en HalfOpen, donde una única llamada exitosa resetea el contador y vuelve al estado Closed.
El Bulkhead utiliza un patrón de semáforo basado en un chan struct{} con capacidad limitada. En el main, el worker intenta adquirir un “slot” mediante b.semaphore <- struct{}{}. Si el canal está lleno, el select entra en el caso default, retornando un error de capacidad. Esto evita que un pico de tareas pesadas consuma toda la memoria o bloquee el procesamiento de otros tipos de mensajes.
El Backpressure se observa en el Producer. En lugar de hacer un envío bloqueante workQueue <- i, utilizamos un select con un default. Si el buffer del canal (tamaño 5) está lleno, el productor no se queda esperando (lo que causaría un bloqueo en el hilo de entrada/HTTP), sino que rechaza la carga inmediatamente.
El error frecuente
Un error crítico en Go es confundir un canal con capacidad limitada con un sistema de mensajería infinito.
// ERROR: Backpressure inexistente (Uso de canales sin buffer o con buffer muy grande)
ch := make(chan Job, 1000000) // Esto no es backpressure, es una bomba de tiempo
// ERROR: Bloqueo de goroutines (Falta de select en el productor)
func Producer(ch chan Job) {
for {
job := getJob()
ch <- job // Si el consumer es lento, esto bloquea la goroutine del productor infinitamente
}
}
En un sistema de alta carga, si el productor bloquea en el canal, estarás creando miles de goroutines en estado de espera que consumen memoria y stack. Si usas un buffer gigantesco, simplemente estás posponiendo el colapso por OOM. El backpressure real requiere que el productor sea consciente de la saturación (usando select con default o un context.Context con timeout) para poder responder con un error 429 Too Many Requests o similar.
N° 220