Mecanismos Fundamentales para el Procesamiento Paralelo
La concurrencia representa un pilar esencial en la programación moderna, permitiendo que los programas aprovechen múltiples núcleos de procesador para mejorar el rendimiento y la eficiencia. Este capítulo se centra en los mecanismos clásicos de concurrencia en Rust, basados en threads del sistema operativo, que facilitan la ejecución paralela de tareas. Al dominar estos conceptos, se adquiere la capacidad de construir aplicaciones que gestionan recursos compartidos de manera segura y predecible, evitando errores comunes como las condiciones de carrera.
Creación de Threads con std::thread::spawn
La biblioteca estándar de Rust proporciona el módulo std::thread para la gestión de threads nativos del sistema operativo. La función principal para crear un nuevo thread es std::thread::spawn, que toma un cierre como argumento y lo ejecuta en un thread separado. Este cierre debe ser 'static, lo que implica que no puede capturar referencias a valores en el stack del thread padre, ya que el nuevo thread podría sobrevivir al padre.
Por ejemplo, se puede crear un thread simple que imprime un mensaje:
use std::thread;
let handle = thread::spawn(|| {
println!("Hola desde un thread secundario");
});En este caso, spawn devuelve un JoinHandle, que se discute en la siguiente sección. Es crucial destacar que los threads en Rust son joinable por defecto, pero si no se une explícitamente, el thread continuará ejecutándose hasta su finalización, incluso si el thread principal termina. Un detalle sutil es que los pánicos en un thread secundario no propagan al principal; en su lugar, el thread simplemente termina, y el JoinHandle reflejará el pánico al unirse.
Rust impone restricciones de ownership para prevenir accesos concurrentes no seguros. Si se intenta mover un valor al cierre, como un vector, el ownership se transfiere al thread:
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Vector en thread: {:?}", v);
});Aquí, el uso de move fuerza la transferencia de ownership, asegurando que v no sea accesible desde el thread principal después de la creación. En casos donde se requiere acceso compartido, se deben emplear estructuras como Arc, cubiertas más adelante.
Gestión de Threads con JoinHandle
La función spawn retorna un valor de tipo JoinHandle<T>, donde T es el tipo devuelto por el cierre ejecutado en el thread. Este handle permite al thread principal esperar la finalización del thread secundario mediante el método join, que bloquea hasta que el thread termine y devuelve un Result<T, Box<dyn Any + Send + 'static>>. Si el thread secundario paniquea, join retorna Err con el valor del pánico.
Un ejemplo ilustrativo es el siguiente:
use std::thread;
let handle = thread::spawn(|| {
"Resultado desde thread"
});
match handle.join() {
Ok(result) => println!("Resultado: {}", result),
Err(e) => println!("Pánico en thread: {:?}", e),
}Este mecanismo asegura la sincronización y la recolección de resultados. JoinHandle implementa Drop, lo que significa que si no se llama a join explícitamente, el handle se descarta al salir del scope, pero el thread continúa ejecutándose en segundo plano. Para un control preciso, se recomienda siempre unir threads para evitar fugas de recursos o comportamientos inesperados.
En escenarios con múltiples threads, se pueden almacenar handles en una colección y unirlos en secuencia:
let mut handles = vec![];
for i in 0..5 {
let handle = thread::spawn(move || {
i * 2
});
handles.push(handle);
}
for handle in handles {
if let Ok(result) = handle.join() {
println!("Resultado: {}", result);
}
}Aquí, el bucle asegura que todos los threads completen antes de proceder, destacando la importancia de manejar errores en entornos concurrentes.
Compartición Segura con Arc<Mutex>
Para compartir datos entre threads de manera segura, Rust ofrece Arc (Atomic Reference Counted) combinado con Mutex (Mutual Exclusion). Arc<T> es un puntero inteligente que permite múltiples owners atómicos, adecuado para entornos multithread, mientras que Mutex<T> asegura que solo un thread acceda al valor interno en un momento dado mediante bloqueo.
La creación típica implica envolver el valor en Mutex y luego en Arc:
use std::sync::{Arc, Mutex};
use std::thread;
let data = Arc::new(Mutex::new(0));
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut guard = data_clone.lock().unwrap();
*guard += 1;
});El método lock devuelve un MutexGuard que implementa DerefMut, permitiendo acceso mutable al valor. Es esencial manejar el Result devuelto por lock, ya que puede fallar si el mutex está envenenado (poisoned) debido a un pánico en otro thread mientras sostenía el lock. En la práctica, unwrap se usa en código simple, pero en producción se prefiere un manejo robusto.
Arc incrementa el conteo de referencias atómicamente, y al dropear la última referencia, el valor se libera. Esto contrasta con Rc, que no es thread-safe. Un caso borde surge cuando múltiples threads intentan bloquear el mismo mutex simultáneamente: el primero lo adquiere, y los demás bloquean hasta que se libere, potencialmente causando contención si el lock se mantiene por periodos prolongados.
Para iteraciones múltiples, se clona el Arc para cada thread:
let data = Arc::new(Mutex::new(vec![]));
let mut handles = vec![];
for i in 0..3 {
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut guard = data_clone.lock().unwrap();
guard.push(i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Datos finales: {:?}", *data.lock().unwrap());Este patrón garantiza que la vec compartida se modifique de forma segura, ilustrando cómo Rust previene data races a nivel de compilador.
Comunicación entre Threads con Canales mpsc
Los canales proporcionan un medio para la comunicación unidireccional entre threads, implementados en el módulo std::sync::mpsc (multiple producer, single consumer). Se crea un par (Sender<T>, Receiver<T>) mediante mpsc::channel, donde múltiples threads pueden enviar mensajes vía clones del Sender, pero solo uno recibe vía el Receiver.
Un ejemplo básico envía un mensaje desde un thread secundario:
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
tx.send("Mensaje desde thread").unwrap();
});
let received = rx.recv().unwrap();
println!("Recibido: {}", received);El método send es bloqueante si el canal está lleno (por defecto, los canales son síncronos y no tienen límite fijo, pero bloquean si el receptor no consume). recv bloquea hasta que un mensaje esté disponible, retornando Err si todos los senders se han dropeado. Para canales asíncronos, se usa mpsc::sync_channel con un tamaño de buffer fijo.
En configuraciones con múltiples productores:
let (tx, rx) = mpsc::channel();
let tx_clone = tx.clone();
thread::spawn(move || {
tx_clone.send(1).unwrap();
});
thread::spawn(move || {
tx.send(2).unwrap();
});
for received in rx {
println!("Recibido: {}", received);
}Aquí, el receptor itera sobre el canal hasta que se cierra (cuando todos los senders se dropean). Un detalle sutil es que los canales manejan ownership: los valores enviados se mueven al canal, y el receptor toma ownership al recibirlos. Esto facilita patrones como el procesamiento de tareas en un pool de threads.
Proyecto: Downloader Concurrente de Archivos
Para ilustrar la integración de estos conceptos, se presenta un proyecto completo que descarga archivos concurrentemente desde URLs dadas, utilizando threads para paralelismo, Arc<Mutex> para compartir un contador de progreso, y canales para comunicar resultados al thread principal.
La estructura de carpetas es la siguiente:
downloader/
├── Cargo.toml
└── src/
└── main.rs
Contenido de Cargo.toml:
[package]
name = "downloader"
version = "0.1.0"
edition = "2021"
[dependencies]
reqwest = { version = "0.11", features = ["blocking"] }
Contenido de src/main.rs:
use std::fs::File;
use std::io::Write;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use reqwest::blocking::get;
fn download_file(url: &str, path: &str, progress: Arc<Mutex<usize>>, tx: mpsc::Sender<String>) {
match get(url) {
Ok(mut response) => {
if let Ok(mut file) = File::create(path) {
let mut buffer = Vec::new();
if response.copy_to(&mut buffer).is_ok() {
if file.write_all(&buffer).is_ok() {
let mut guard = progress.lock().unwrap();
*guard += 1;
tx.send(format!("Descargado: {}", path)).unwrap();
return;
}
}
}
tx.send(format!("Error descargando: {}", url)).unwrap();
}
Err(e) => {
tx.send(format!("Error en solicitud: {} - {}", url, e)).unwrap();
}
}
}
fn main() {
let urls = vec![
("https://example.com/file1.txt", "file1.txt"),
("https://example.com/file2.txt", "file2.txt"),
("https://example.com/file3.txt", "file3.txt"),
];
let total = urls.len();
let progress = Arc::new(Mutex::new(0));
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
for (url, path) in urls {
let progress_clone = Arc::clone(&progress);
let tx_clone = tx.clone();
let url_owned = url.to_string();
let path_owned = path.to_string();
let handle = thread::spawn(move || {
download_file(&url_owned, &path_owned, progress_clone, tx_clone);
});
handles.push(handle);
}
let progress_clone = Arc::clone(&progress);
let monitor_handle = thread::spawn(move || {
loop {
let completed = *progress_clone.lock().unwrap();
println!("Progreso: {}/{}", completed, total);
if completed == total {
break;
}
thread::sleep(std::time::Duration::from_secs(1));
}
});
for message in rx {
println!("{}", message);
if *progress.lock().unwrap() == total {
break;
}
}
for handle in handles {
handle.join().unwrap();
}
monitor_handle.join().unwrap();
println!("Todas las descargas completadas.");
}
Este proyecto demuestra cómo los threads descargan archivos en paralelo, un canal recolecta mensajes de estado, y un mutex compartido tracks el progreso. El thread de monitoreo actualiza el progreso periódicamente, y el principal une todos los handles para asegurar la terminación ordenada. Nota que se requiere la crate reqwest para solicitudes HTTP bloqueantes, y los paths se asumen relativos al directorio de ejecución.
Habiendo explorado los fundamentos de la concurrencia con threads, el siguiente capítulo aborda técnicas avanzadas para el manejo de errores en entornos concurrentes, extendiendo estos principios hacia patrones más robustos de sincronización.