Comunicación asíncrona con std::future y std::promise

En C++, la comunicación entre hilos no se limita a compartir variables protegidas por mutexes; existe un mecanismo de alto nivel para trasladar valores o excepciones a través de un estado compartido (shared state). Un std::promise<T> actúa como el extremo del productor, un canal de un solo uso (one-shot) donde puedes “prometer” un valor o una excepción. Su contraparte, std::future<T>, es el extremo del consumidor que permite recuperar dicho valor. Esta abstracción es fundamental cuando necesitas ejecutar una tarea pesada en un hilo secundario y necesitas su resultado en el hilo principal sin gestionar manualmente la sincronización de baja forma.

El funcionamiento se basa en la existencia de un objeto de estado compartido que sobrevive mientras exista al menos un std::future o un std::promise que lo referencien. Cuando llamas a promise.set_value(val), el estado se marca como “listo” y el hilo que esté bloqueado en future.get() se desbloquea para recibir el dato. Si prefieres no gestionar hilos manualmente, std::async [C++11] proporciona una forma de alto nivel para lanzar tareas, devolviéndote un std::future directamente. Por otro lado, std::packaged_task [C++11] permite envolver cualquier callable (función, lambda, functor) para que su retorno sea automáticamente capturado por un future, permitiendo mover la tarea a un hilo distinto. Si necesitas que múltiples hilos esperen el mismo resultado, utilizas std::shared_future [C++11], que permite múltiples llamadas a get() y es copiable, a diferencia del std::future estándar, que es una semántica de movimiento (move-only).

Debes usar estas herramientas cuando el flujo de ejecución de tu programa dependa de resultados que no están disponibles inmediatamente, como operaciones de I/O, cálculos matemáticos intensivos o llamadas a APIs externas. Si intentas llamar a get() sobre un std::future que aún no ha recibido un valor, el hilo actual se bloqueará (o se ejecutará la tarea si es lazy mediante std::launch::deferred [C++11]). El error más crítico ocurre si un std::promise se destruye sin haber seteado un valor o una excepción; en ese caso, el std::future asociado lanzará una std::future_error con el código std::future_errc::broken_promise.

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include <vector>
#include <string>
#include <stdexcept>

// Una función de trabajo pesada que puede lanzar excepciones
int realizar_calculo_complejo(int factor) {
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    if (factor <= 0) {
        throw std::runtime_error("Error crítico: El factor debe ser positivo.");
    }
    return 42 * factor;
}

int main() {
    // 1. Uso de std::async para una tarea rápida y directa.
    // std::launch::async garantiza que se ejecute en un nuevo hilo.
    std::future<int> f_async = std::async(std::launch::async, realizar_calculo_complejo, 2);

    // 2. Uso de std::promise para comunicación manual entre hilos.
    std::promise<std::string> p_status;
    std::future<std::string> f_status = p_status.get_future();

    std::thread t_worker([p = std::move(p_status)]() mutable {
        // Simulamos un proceso que informa su estado
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        p.set_value("Procesamiento de estado completado");
    });

    // 3. Uso de std::packaged_task para encapsular una tarea y moverla.
    std::packaged_task<int(int)> task(realizar_calculo_complejo);
    std::future<int> f_task = task.get_future();
    std::thread t_task(std::move(task), 10);

    // 4. Uso de std::shared_future para permitir múltiples consumidores.
    // Convertimos el future original en uno compartido.
    std::shared_future<int> sf_async = f_async.share();

    try {
        // Consumo con timeout usando wait_for
        if (f_status.wait_for(std::chrono::seconds(1)) == std::future_status::ready) {
            std::cout << "Mensaje del trabajador: " << f_status.get() << "\n";
        }

        // Los shared_future pueden ser consultados por varios hilos
        std::cout << "Resultado de async (vía shared_future): " << sf_async.get() << "\n";
        std::cout << "Resultado de task: " << f_task.get() << "\n";
        std::cout << "Re-consulta de async (vía shared_future): " << sf_async.get() << "\n";

    } catch (const std::exception& e) {
        std::cerr << "Excepción detectada en el hilo principal: " << e.what() << "\n";
    }

    if (t_worker.joinable()) t_worker.join();
    if (t_task.joinable()) t_task.join();

    return 0;
}

Desglose del código

En el ejemplo, empezamos con f_async, un std::future obtenido mediante std::async. Al usar std::launch::async, el sistema operativo lanza un hilo nuevo para ejecutar realizar_calculo_complejo.

Para la comunicación manual, creamos p_status (un std::promise<std::string>). Es vital notar que p se mueve (std::move) dentro de la lambda del hilo t_worker. Esto es necesario porque std::promise no es copiable, solo movible, para asegurar que solo haya un único productor para ese estado compartido. Si la función dentro del hilo hubiera lanzado una excepción, usaríamos p.set_exception(std::current_exception()) para propagarla al hilo principal.

std::packaged_task<int(int)> task envuelve la función y permite que, tras ejecutarla, el resultado se deposite automáticamente en su std::future asociado (f_task). Al pasar task a t_task mediante std::move, el objeto de la tarea se transfiere al hilo de ejecución.

Finalmente, transformamos f_async en un std::shared_future mediante el método .share(). Esto es lo que permite que llamemos a .get() dos veces sobre el mismo resultado sin lanzar una excepción; el std::shared_future permite que el estado compartido permanezca válido para múltiples lectores.

El error frecuente

Un error muy común es intentar llamar a .get() más de una vez sobre un std::future estándar. A diferencia de std::shared_future, el std::future está diseñado para una semántica de transferencia de propiedad.

std::future<int> f = std::async(std::launch::async, [](){ return 42; });
int val1 = f.get(); // Correcto: el estado se consume
// int val2 = f.get(); // ERROR: Lanza std::future_error (invalid_argument)

Este error es detectado en tiempo de ejecución y lanzará una excepción. Si no capturas la excepción, el programa abortará. Si el error proviene de un std::promise que se destruyó sin setear un valor (un “broken promise”), el error es igualmente fatal. Ambos comportamientos suelen detectarse con AddressSanitizer en entornos de pruebas si se gestionan mal los tiempos de vida de los objetos en hilos.

101

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio