Concurrencia avanzada: Isolates, Semáforos y la gestión de tareas

Para dominar el modelo de ejecución de Dart, es crucial entender que, aunque el event loop maneja la asincronía mediante Future e Isolate, la gestión de recursos requiere patrones más complejos. Un Future es una representación de un valor que estará disponible en el futuro, operando dentro del mismo hilo de ejecución (event loop), lo que significa que no ofrece paralelismo real para tareas intensivas de CPU, sino solo no-bloqueo para I/O. Por el contrario, un Isolate [disponible desde Dart 1.0] es un aislamiento de memoria completo; cada uno tiene su propio heap y su propio event loop, permitiendo un paralelismo real en múltiples núcleos de la CPU.

Si necesitas realizar un cálculo matemático pesado, usar solo Future congelará la ejecución de tu programa principal; ahí es donde entra el Isolate. Sin embargo, crear un Isolate es costoso en términos de memoria y tiempo de inicialización. El patrón de Worker Pool soluciona esto manteniendo un número fijo de Isolates vivos que procesan una cola de tareas. Por otro lado, si tienes 1,000 peticiones HTTP, lanzar 1,000 Futures simultáneamente puede saturar tu red o tu memoria; aquí aplicamos un Semáforo para limitar la concurrencia.

Si implementas mal la comunicación entre Isolates o no manejas los errores en tareas “fire-and-forget”, podrías terminar con procesos zombis o errores silenciosos que nunca llegan al log principal.

import 'dart:async';
import 'dart:isolate';
import 'dart:math';

// 1. Semáforo para limitar la concurrencia de Futures
class Semaphore {
  int _permits;
  final List<Completer<void>> _waiters = [];

  Semaphore(this._permits);

  Future<void> acquire() async {
    if (_permits > 0) {
      _permits--;
      return;
    }
    final completer = Completer<void>();
    _waiters.add(completer);
    return completer.future;
  }

  void release() {
    if (_waiters.isNotEmpty) {
      _waiters.removeAt(0).complete();
    } else {
      _permits++;
    }
  }
}

// 2. Worker Pool para tareas intensivas (CPU)
class WorkerPool {
  final int workerCount;
  final List<SendPort> _ports = [];
  final ReceivePort _mainReceivePort = ReceivePort();

  WorkerPool(this.workerCount);

  Future<void> init() async {
    for (int i = 0; i < workerCount; i++) {
      final workerPort = await Isolate.spawn(_workerEntry, _mainReceivePort.sendPort);
      // El worker envía su SendPort de vuelta
      final SendPort workerSendPort = await Completer<SendPort>().asFuture().then((_) {
        // Este es un truco para esperar la respuesta del worker vía un ReceivePort temporal
        return _mainReceivePort.listen((message) {
          if (message is SendPort) (Completer<SendPort>().asFuture().then((_) {})); // Simplificado para el ejemplo
        });
      });
      // Nota: En producción, usaríamos una arquitectura de mensajería más robusta.
      // Para este ejemplo, simplificamos la comunicación.
    }
  }

  // Implementación simplificada del entry point para el ejemplo
  static void _workerEntry(SendPort mainSendPort) {
    final workerReceivePort = ReceivePort();
    mainSendPort.send(workerReceivePort.sendPort);

    workerReceivePort.listen((message) {
      if (message is Map<String, dynamic>) {
        final int id = message['id'];
        final int data = message['data'];
        // Simulación de cálculo pesado
        final result = data * data; 
        mainSendPort.send({'id': id, 'result': result});
      }
    });
  }
}

// 3. Wrapper para APIs con Callbacks (Promisificación)
class LegacyService {
  void fetchData(String query, void Function(String) onSuccess, void Function(Exception) onError) {
    // Simula una API antigua basada en callbacks
    Timer(const Duration(milliseconds: 500), () {
      if (query == "error") {
        onError(Exception("Fallo en LegacyService"));
      } else {
        onSuccess("Resultado de $query");
      }
    });
  }
}

void main() async {
  final semaphore = Semaphore(2); // Máximo 2 tareas concurrentes
  final legacy = LegacyService();

  print('--- Iniciando ejecución avanzada ---\n');

  // A. Patrón Completer: Promisificar el LegacyService
  final futureResult = Completer<String>();
  legacy.fetchData("Dart", 
    (data) => futureResult.complete(data), 
    (err) => futureResult.completeError(err)
  );
  print('Resultado Promisificado: ${await futureResult.value}');

  // B. Patrón Timeout y EagerError
  final tasks = [
    Future.delayed(const Duration(seconds: 1), () => "Task 1"),
    Future.delayed(const Duration(seconds: 2), () => throw Exception("Task 2 falló")),
    Future.delayed(const Duration(seconds: 3), () => "Task 3"),
  ];

  try {
    // eagerError: true hace que falle inmediatamente si cualquiera falla
    await Future.wait(tasks, eagerError: true);
  } on Object catch (e) {
    print('EagerError detectado: $e');
  }

  // C. Semáforo + Limitación de concurrencia en I/O
  print('\nEjecutando tareas con semáforo (concurrencia limitada a 2)...');
  final results = await Future.wait(List.generate(5, (i) async {
    await semaphore.acquire();
    try {
      print('  [I/O] Iniciando tarea $i');
      await Future.delayed(const Duration(milliseconds: 800));
      return 'Resultado $i';
    } finally {
      semaphore.release();
    }
  }));
  print('Resultados I/O: $results');

  // D. Fire and Forget (Isolate para logging)
  // No esperamos este isolate, se lanza y sigue el flujo.
  Isolate.spawn((String msg) {
    // En un caso real, este isolate enviaría logs a un servidor
    print('  [Background] Log de analytics: $msg');
  }, 'Evento crítico completado');

  // E. Ejemplo de Worker (Simulando la lógica de un Pool)
  print('\nEjecutando cálculo pesado en Isolate...');
  final responsePort = ReceivePort();
  await Isolate.spawn((SendPort sp) {
    final rx = ReceivePort();
    sp.send(rx.sendPort);
    rx.listen((msg) {
      if (msg is Map) sp.send(msg);
    });
    // Simulamos recepción de trabajo
    // (En un pool real, esto vendría de una cola)
  }, responsePort.sendPort);

  print('--- Fin de ejecución ---\n');
}

Análisis del patrón de ejecución

En el ejemplo anterior, hemos orquestado varios niveles de abstracción:

  1. Semaphore: A diferencia de un simple Future.wait, el semáforo controla el flujo de entrada. Cuando llamas a acquire(), si no hay permisos, el Completer se guarda en una lista de espera (_waiters). Esto evita el desbordamiento de recursos al asegurar que solo $N$ procesos asíncronos estén activos en un momento dado, manteniendo el consumo de memoria bajo control.
  2. Completer: Se utiliza para convertir el estilo de programación de callbacks (común en librerías de C++ o APIs antiguas de JS) en el modelo moderno de Future. El Completer actúa como el puente: tú tienes el control manual de cuándo se considera que la operación ha terminado mediante .complete() o .completeError().
  3. Future.wait con eagerError: true: Por defecto, Future.wait espera a que todas las tareas terminen (o fallen) antes de devolver el error. Con eagerError: true, el primer error que ocurra dispara la excepción al llamador inmediatamente. Esto es vital en pipelines de validación donde, si el primer paso falla, no tiene sentido esperar el resto de las validaciones costosas.
  4. Isolates y Paralelismo: En el ejemplo del Isolate.spawn, la tarea se desplaza a un hilo del sistema operativo distinto. La comunicación se hace mediante SendPort y ReceivePort. Es crucial recordar que los objetos enviados a través de estos puertos se copian (o se mueven mediante transferencia de memoria en casos muy específicos), lo que significa que no hay acceso a la memoria compartida.

El error frecuente

Un error común al implementar el patrón “fire and forget” con Isolate.spawn es no gestionar las excepciones dentro del Isolate hijo. Si el código dentro del spawn lanza una excepción que no es capturada internamente, el Isolate muere silenciosamente sin enviar un mensaje al proceso principal.

// MAL: El error en el isolate no se propaga y es difícil de trackear
Isolate.spawn((_) {
  throw Exception("Error catastrófico"); 
}, null);

// BIEN: Capturar errores y enviarlos mediante un SendPort
Isolate.spawn((SendPort sp) {
  try {
    throw Exception("Error controlado");
  } catch (e) {
    sp.send(e);
  }
}, mySendPort);

Si no implementas este mecanismo de reporte de errores, el Isolate desaparecerá del runtime sin dejar rastro en tus logs de errores principales, complicando enormemente el debugging en producción.

117

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio