Transformaciones de Streams con StreamTransformer

Un StreamTransformer<S, T> es un objeto que encapsula la lógica necesaria para convertir un Stream<S> en un Stream<T>. A diferencia de un simple map, que transforma cada elemento de forma aislada, el transformador permite manejar estados internos y lógica compleja de flujo. Esto es posible porque el método bind recibe el stream original y le permite aplicar una lógica de “unión” (binding), donde puedes controlar cuándo y cómo se añaden datos al nuevo sink.

Esta arquitectura existe para separar la fuente de datos de su interpretación. En lugar de saturar la lógica de negocio con transformaciones de bajo nivel, creas componentes reutilizables. Debes usarlo cuando necesites implementar patrones como throttling, debouncing, o, lo más común en backend, el parseo de protocolos binarios donde los datos llegan fragmentados en trozos de bytes que no necesariamente coinciden con los límites de un mensaje completo. Si implementas un transformador y gestionas mal el ciclo de vida del sink o no manejas los errores, el stream puede quedar “colgado” (un stream que nunca emite onDone) o perderás datos críticos que quedaron atrapados en un buffer interno.

import 'dart:convert';
import 'dart:async';

/// Un transformador que convierte fragmentos de bytes (List<int>)
/// en strings completos basados en un delimitador.
class DelimiterParser extends StreamTransformer<List<int>, String> {
  final List<int> _delimiter;

  DelimiterParser(String delimiter) : _delimiter = ascii.encode(delimiter);

  @override
  Stream<String> bind(Stream<List<int>> stream) async* {
    final List<int> buffer = [];

    // 'await for' nos permite iterar sobre los eventos del stream original.
    await for (final chunk in stream) {
      buffer.addAll(chunk);

      // Buscamos el delimitador en el buffer acumulado.
      int delimiterIndex;
      while (
        (delimiterIndex = _findDelimiter(buffer)) != -1
      ) {
        // Extraemos el mensaje completo antes del delimitador.
        final messageBytes = buffer.sublist(0, delimiterIndex);
        
        // Limpiamos el buffer para la siguiente iteración.
        buffer.removeRange(0, delimiterIndex + _delimiter.length);

        // Emitimos el mensaje decodificado.
        yield utf8.decode(messageBytes);
      }
    }
    // Nota: Si el buffer tiene datos al terminar, se pierden si no se gestionan aquí.
  }

  int _findDelimiter(List<int> buffer) {
    for (int i = 0; i <= buffer.length - _delimiter.length; i++) {
      bool match = true;
      for (int j = 0; j < _delimiter.length; j++) {
        if (buffer[i + j] != _delimiter[j]) {
          match = false;
          break;
        }
      }
      if (match) return i;
    }
    return -1;
  }
}

void main() async {
  final controller = StreamController<List<int>>();

  // Composición de transformadores (Pipeline)
  // 1. Parseamos bytes a String con DelimiterParser.
  // 2. Transformamos a mayúsculas con map.
  // 3. Filtramos vacíos con where.
  final pipeline = controller.stream
      .transform(DelimiterParser('\n'))
      .map((data) => data.toUpperCase())
      .where((data) => data.isNotEmpty);

  final subscription = pipeline.listen(
    (data) => print('Mensaje procesado: $data'),
    onDone: () => print('Stream cerrado.'),
    onError: (e) => print('Error en el pipeline: $e'),
  );

  // Simulamos la llegada de datos fragmentados por red.
  final sink = controller.sink as Sink<List<int>>;

  sink.add(utf8.encode('Hola '));
  sink.add(utf8.encode('Mundo'));
  sink.add(utf8.encode('\n')); // Completa el primer mensaje
  
  sink.add(utf8.encode('Dart'));
  sink.add(utf8.encode(' es'));
  sink.add(utf8.encode(' increíble\n')); // Completa el segundo mensaje

  await controller.close();
  await subscription.asFuture();
}

Análisis del pipeline

En el ejemplo, hemos implementado DelimiterParser extendiendo StreamTransformer. El corazón de la lógica reside en bind, que utiliza un generador async*. Esto es fundamental: async* permite que el transformador funcione como un iterador asíncrono, donde cada yield empuja un nuevo elemento al siguiente eslabón de la cadena sin necesidad de gestionar manualmente un StreamController interno.

Cuando ejecutamos controller.stream.transform(DelimiterParser('\n')), creamos una conexión donde la salida de controller alimenta el await for del DelimiterParser. Observa cómo el buffer mantiene el estado entre cada chunk. Si recibimos "Hola ", el buffer lo guarda; cuando llega "Mundo\n", el while detecta el salto de línea, decodifica el mensaje completo y lo libera mediante yield.

La potencia real aparece en la composición: transform(...).map(...).where(...). Cada transform devuelve un nuevo Stream, creando un pipeline donde cada etapa es una unidad lógica independiente. El compilador AOT de Dart optimiza estas cadenas de transformaciones, permitiendo que el flujo de datos sea altamente eficiente al minimizar la creación de objetos intermedios innecesarios.

El error frecuente

Un error crítico al implementar StreamTransformer con buffers es el desperdicio de datos al cerrar el stream.

Si el Stream de origen se cierra (vía onDone) pero el último mensaje no contenía el delimitador (por ejemplo, el cliente envió "Mensaje" pero nunca envió el \n), el while en el bind terminará sin haber hecho yield de ese último fragmento. Para solucionar esto en implementaciones de producción, debes verificar si el buffer contiene datos después de salir del await for y emitirlos antes de finalizar el generador.

70

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio