Transformadores de io: Pipe, MultiReader, TeeReader y LimitReader

Cuando trabajas con grandes volúmenes de datos, el error más común es intentar cargarlos por completo en la memoria ([]byte). En sistemas de alta carga, esto dispara el uso de la CPU por la presión sobre el Garbage Collector y puede causar errores de out of memory. La solución no es usar más RAM, sino usar mejores transformadores de flujo.

Estos transformadores son decoradores que implementan la interfaz io.Reader o io.Writer para manipular datos mientras pasan de un punto A a un punto B. En lugar de leer un archivo, guardarlo en un buffer y luego procesarlo, transformas el flujo de forma que el dato se procese “al vuelo”.

io.Pipe crea un canal de comunicación en memoria mediante un *io.PipeReader y un *io.PipeWriter que están sincronizados; lo que escribes en uno, aparece instantáneamente en el otro sin buffers intermedios. io.MultiReader permite tratar múltiples fuentes de datos como si fueran una única secuencia continua, útil para concatenar archivos o fragmentos. io.TeeReader actúa como un “divisor” o “espejo”: cada vez que lees de la fuente original, el dato se duplica automáticamente hacia un io.Writer adicional (ideal para logs o cálculos de checksums). Por último, io.LimitReader actúa como una válvula de seguridad, limitando la cantidad de bytes que se pueden extraer de una fuente para evitar que un cliente malintencionado sature tu memoria.

Si usas mal un io.Pipe en la misma goroutine que intentas leer, bloquearás la ejecución indefinidamente. Si olvidas cerrar el Writer de un pipe, el Reader se quedará esperando un EOF que nunca llegará.

package main

import (
	"bytes"
	"fmt"
	"io"
	"strings"
)

func main() {
	// 1. MultiReader: Combinamos dos fragmentos en un único flujo lógico
	parte1 := strings.NewReader("Datos iniciales: ")
	parte2 := strings.NewReader("el resto del mensaje.")
	unido := io.MultiReader(parte1, parte2)

	// 2. Pipe: Conectamos el productor (goroutine) con el consumidor (main)
	// Sin usar un buffer intermedio, optimizando la memoria.
	pr, pw := io.Pipe()

	go func() {
		// Es vital cerrar el writer para que el reader reciba el EOF
		defer pw.Close()
		if _, err := io.Copy(pw, unido); err != nil {
			fmt.Printf("Error en el pipe: %v\n", err)
		}
	}()

	// 3. TeeReader + LimitReader: 
	// Queremos leer del pipe, pero queremos guardar una copia de lo que 
	// leemos en un log (Tee) y asegurarnos de no leer más de 20 bytes (Limit).
	auditLog := &bytes.Buffer{}
	espejo := io.TeeReader(pr, auditLog)
	limitado := io.LimitReader(espejo, 20)

	buf := make([]byte, 100)
	n, err := limitado.Read(buf)
	if err != nil && err != io.EOF {
		panic(err)
	}

	fmt.Printf("Contenido leído: %q\n", string(buf[:n]))
	fmt.Printf("Log de auditoría (Tee): %q\n", auditLog.String())
	
	// El resto de los datos en el pipe se perderán si no se lee, 
	// y el writer se bloqueará si intentara escribir más debido a la naturaleza síncrona de io.Pipe.
}

Análisis del flujo

En el ejemplo, empezamos con io.MultiReader, que toma parte1 y parte2 y los presenta de forma secuencial. El io.Copy dentro de la goroutine toma ese flujo combinado y lo inyecta en el pw (PipeWriter).

Lo más crítico ocurre en la cadena de transformadores: limitado -> espejo -> pr.

Cuando ejecutamos limitado.Read(buf), la llamada desciende por la cadena:
1. limitado le pide bytes a espejo.
2. espejo (TeeReader) le pide bytes a pr.
3. pr (PipeReader) recibe los datos directamente de la goroutine.
4. Aquí ocurre la magia: espejo recibe los datos de pr y, antes de devolverlos a limitado, los escribe en auditLog mediante su Writer.
5. limitado recibe los bytes de espejo, pero solo deja pasar hasta 20 bytes.

Como resultado, auditLog solo contiene los primeros 20 bytes. Si intentaras leer más del limitado, recibirías un EOF aunque el pipe todavía tenga datos, porque io.LimitReader ha cumplido su propósito de restringir el flujo.

El error frecuente

El error más peligroso con io.Pipe es el deadlock por ejecución síncrona.

// ERROR: Esto causará un bloqueo eterno (deadlock)
pr, pw := io.Pipe()
_, err := pw.Write([]byte("hola")) // El programa se queda parado aquí
_, err = pr.Read(make([]byte, 4))   // Nunca llega a ejecutarse

io.Pipe no tiene un buffer interno. La operación Write es bloqueante: el escritor se queda pausado hasta que un lector ejecute una operación de Read. Por eso, en producción, siempre debes escribir en un PipeWriter dentro de una goroutine separada o asegurar que el proceso de lectura ocurra en paralelo.

110

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio