Streams: Secuencias de datos asíncronas en Dart

Un Stream<T> representa una secuencia asíncrona de eventos (datos, errores o una señal de finalización) que se emiten a lo largo del tiempo. Mientras que un Future te entrega un único valor y termina su ejecución, un Stream es un flujo continuo, similar a un canal de agua que puede soltar gotas (datos), enviar una señal de que hubo una obstrucción (error) o simplemente cerrarse. Esta estructura es fundamental porque en el desarrollo de software real, muchas fuentes de datos no son instantáneas: un WebSocket, la lectura de un archivo por fragmentos o la entrada de un sensor no te dan el resultado de golpe, sino que emiten eventos de forma disparada. Debes usar un Stream cuando necesites reaccionar a una serie de eventos que ocurren en un orden específico a través del tiempo. Si intentas manejar un flujo continuo con Future, terminarás bloqueando la ejecución o perdiendo datos; por el contrario, si intentas escuchar dos veces un Stream de suscripción única, el sistema lanzará una excepción.

Existen dos tipos de streams: los de suscripción única (single-subscription), que son el estándar para operaciones de I/O y solo permiten un único oyente, y los de difusión (broadcast), que permiten que múltiples partes de tu código escuchen el mismo flujo simultáneamente mediante el método asBroadcastStream(). Para consumir estos datos, la forma más idiomática es usar await for, que suspende la función actual de forma no bloqueante hasta que llegue el siguiente evento, o el método listen, que te devuelve una StreamSubscription. Esta última es crucial porque te otorga el control manual para pausar (pause), reanudar (resume) o cancelar (cancel) la escucha, evitando así fugas de memoria.

import 'dart:async';

void main() async {
  // Creamos un controlador para gestionar el flujo de datos manualmente
  final controller = StreamController<double>();

  // Definimos una secuencia transformada:
  // 1. Convertimos de Celsius a Fahrenheit
  // 2. Filtramos solo temperaturas que sean mayores a 30°F
  // 3. Usamos distinct() para no repetir el mismo valor consecutivo
  final processedStream = controller.stream
      .map((celsius) => (celsius * 9 / 5) + 32)
      .where((fahrenheit) => fahrenheit > 30)
      .distinct();

  // Consumidor 1: Usamos 'await for' para procesar el stream de forma secuencial.
  // Ejecutamos esto en una función separada para no bloquear el hilo principal.
  final processingTask = consumeWithAwaitFor(processedStream);

  // Consumidor 2: Usamos 'listen' sobre un stream de difusión (broadcast).
  // Esto permite que múltiples suscriptores vean lo mismo.
  final broadcastStream = controller.asBroadcastStream();
  
  final subscription = broadcastStream.listen((val) {
    print('Log de Auditoría (Broadcast): Valor actual en Celsius: $val');
  });

  // Simulamos la llegada de datos de un sensor
  print('--- Iniciando transmisión de datos ---');
  controller.add(25.0); // 77.0°F -> Pasa el filtro
  controller.add(25.0); // 77.0°F -> 'distinct' lo ignora por ser igual al anterior
  controller.add(10.0); // 50.0°F -> Pasa el filtro
  controller.add(0.0);  // 32.0°F -> Pasa el filtro
  controller.add(-10.0); // 14.0°F -> 'where' lo bloquea
  controller.add(30.0); // 86.0°F -> Pasa el filtro
  
  // Esperamos un breve instante para que los eventos asíncronos se procesen
  await Future.delayed(const Duration(milliseconds: 100));

  // Cerramos el controlador para enviar el evento 'done'
  await controller.close();
  await subscription.cancel();
  await processingTask;
}

Future<void> consumeWithAwaitFor(Stream<double> stream) async {
  try {
    await for (final temp in stream) {
      print('Consumidor principal (await for): Temperatura detectada: ${temp.toStringAsFixed(1)}°F');
    }
    print('Consumidor principal: Flujo completado con éxito.');
  } catch (e) {
    print('Consumidor principal: Error detectado: $e');
  }
}

Desglose del ejemplo

En el código anterior, el StreamController actúa como el origen de la verdad; es el encargado de inyectar datos mediante .add(). Fíjate en la cadena de transformaciones aplicada a processedStream. Cuando llamas a .map(), no estás modificando los datos originales, sino creando un nuevo Stream que aplica la lógica de conversión. El método .where() actúa como un filtro de flujo: si un valor no cumple la condición, el evento se descarta y nunca llega al siguiente eslabón de la cadena. distinct() es particularmente eficiente para evitar actualizaciones innecesarias en la UI o en procesos de log, ya que compara el valor actual con el último emitido y solo deja pasar el nuevo si es distinto.

El uso de await for dentro de consumeWithAwaitFor es la forma más limpia de consumir un flujo cuando necesitas que el flujo de ejecución de tu función sea coherente con la llegada de los datos. El runtime de Dart pausa la ejecución de esa función específica en el await for hasta que el StreamController emita algo o se cierre. Por otro lado, el broadcastStream creado con asBroadcastStream() es lo que permite que la suscripción manual con .listen() funcione sin interferir con el await for. Si intentaras usar .listen() dos veces sobre el controller.stream original (que es de suscripción única), el programa lanzaría un error.

El subscription obtenido mediante .listen() es fundamental para la gestión de recursos. Al llamar a subscription.cancel(), le indicamos a la VM que ya no necesitamos escuchar, lo cual libera memoria y evita que el callback de la función se ejecute si el flujo llegara a emitir algo más tarde.

El error frecuente

Si intentas suscribirte dos veces a un stream de suscripción única (el que obtienes por defecto con controller.stream), obtendrás un error en tiempo de ejecución: StateError: Stream has already been listened to.

final stream = controller.stream;

// Primer suscriptor (Correcto)
stream.listen((data) => print(data));

// Segundo suscriptor (Lanza StateError)
stream.listen((data) => print(data)); 

Esto ocurre porque los streams de suscripción única están diseñados para un flujo de datos único y lineal; una vez que se ha establecido un canal de comunicación con un oyente, el stream se marca como “usado” para garantizar la integridad de la secuencia de eventos. Si necesitas que múltiples componentes escuchen el mismo evento, debes transformar ese stream explícitamente usando .asBroadcastStream().

68

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio