Emisión de Streams: Generadores y Controladores

En Dart, un Stream es una secuencia de eventos asíncronos que puedes escuchar. Para manejarlos, existen dos paradigios principales: el declarativo, donde usas funciones generadoras asíncronas para producir valores mediante async* y yield, y el imperativo, donde utilizas un StreamController para inyectar valores manualmente mediante un sink.

Las funciones con async* [disponible desde Dart 2.12] permiten escribir lógica asíncrona que parece síncrona pero que emite valores a lo largo del tiempo. Cuando usas yield, insertas un valor en el flujo de datos y el generador “pausa” su ejecución hasta la próxima llamada. Si necesitas delegar la emisión a otro Stream o Iterable, utilizas yield*, lo que permite “aplanar” secuencias. Un detalle crítico es el back-pressure: si el consumidor de un generador asíncrono es más lento que el productor, el generador se pausará automáticamente en cada await o yield, evitando saturar la memoria.

Por otro lado, cuando el flujo de datos no depende de una lógica secuencial (como un bucle), sino de eventos externos (como un callback de una librería de red o un sensor), necesitas un StreamController. Este objeto actúa como el cerebro del stream: tiene un sink para añadir datos (add) o errores (addError), y un método close() para liberar recursos. Si necesitas que varios suscriptores escuchen el mismo evento, debes usar StreamController.broadcast(), ya que un controlador estándar solo permite un único suscriptor (single-subscription).

También existen métodos de fábrica para casos rápidos: Stream.fromFuture para convertir un Future en un stream de un solo valor, o Stream.periodic para emitir eventos en intervalos de tiempo fijos.

import 'dart:async';

/// Representa un monitor de clima que emite temperaturas y alertas.
class WeatherStation {
  // Usamos broadcast para permitir que múltiples partes del sistema 
  // (ej. un logger y un tablero de control) escuchen las alertas.
  final StreamController<String> _alertController = StreamController<String>.broadcast(
    onListen: () => print('[Station] Un suscriptor se ha unido a las alertas.'),
    onCancel: () => print('[Station] Un suscriptor ha dejado de escuchar alertas.'),
  );

  // Exponemos el stream, pero mantenemos el controlador privado para encapsular la lógica.
  Stream<String> get alerts => _alertController.stream;

  // Usamos async* para crear un stream de temperaturas de forma declarativa.
  Stream<double> get temperatureHistory async* {
    // yield* delega la emisión a otro generador (fase de calentamiento).
    yield* _warmUpSequence();

    for (int i = 0; i < 5; i++) {
      // Simulamos una lectura de sensor que tarda tiempo.
      await Future.delayed(const Duration(seconds: 1));
      
      final temp = 20.0 + i + (DateTime.now().millisecond % 5);
      
      // Emisión de un valor normal.
      yield temp;

      // Si la temperatura es alta, inyectamos un evento manualmente en el sink.
      if (temp > 23.0) {
        _alertController.sink.add('ALERTA: Temperatura crítica detectada: ${temp}°C');
      }
    }
  }

  // Generador asíncrono para la fase inicial de estabilización.
  Stream<double> _warmUpSequence() async* {
    print('[Station] Iniciando fase de calentamiento...');
    yield 0.0; 
    await Future.delayed(const Duration(milliseconds: 500));
    yield 15.0;
  }

  void shutdown() {
    _alertController.close();
    print('[Station] Sistema cerrado.');
  }
}

void main() async {
  final station = WeatherStation();

  // Suscriptor 1: Escucha las alertas (Stream de tipo broadcast).
  final alertSub = station.alerts.listen((msg) => print('  [LOG] $msg'));

  // Suscriptor 2: Escucha el historial de temperaturas.
  final tempSub = station.temperatureHistory.listen((t) {
    print('  [DATA] Temperatura actual: $t°C');
  });

  // Esperamos a que el flujo de la estación termine.
  await Future.delayed(const Duration(seconds: 7));

  // Es vital cancelar las suscripciones para evitar fugas de memoria.
  await alertSub.cancel();
  await tempSub.cancel();
  station.shutdown();
}

Desglose del ejemplo

En WeatherStation, el método temperatureHistory es un generador asíncrono (async*). Fíjate en cómo yield* _warmUpSequence() no solo emite valores, sino que “conecta” el flujo de la estación con los valores de la fase de calentamiento antes de continuar con el bucle for. Si _warmUpSequence fuera un Stream lento, el bucle principal esperaría a que este termine para seguir emitiendo temperaturas.

El uso de _alertController.sink.add(...) es el mecanismo imperativo. A diferencia del yield, que es parte de la lógica del flujo de datos, el sink permite “empujar” información desde cualquier punto (como una condición de error o un evento externo) hacia el Stream de alertas.

Como definimos _alertController como un StreamController.broadcast(), el main pudo suscribirse a station.alerts de forma independiente a la lectura de temperaturas. Si hubiéramos usado un StreamController estándar, el segundo suscriptor en main habría lanzado un error.

El error frecuente

Un error muy común al trabajar con StreamController es intentar suscribirse más de una vez a un controlador que no es broadcast.

final controller = StreamController<int>();
// Primera suscripción: funciona correctamente.
controller.stream.listen((v) => print(v));

// Segunda suscripción: lanzará un StateError.
// "Bad state: Stream has already been listened to."
controller.stream.listen((v) => print(v)); 

Si tu lógica requiere que múltiples objetos escuchen el mismo flujo de eventos, asegúrate siempre de inicializarlo con .broadcast().

69

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio