Bases de datos con SQLx
Integración asíncrona con bases de datos relacionales
En el contexto de aplicaciones Rust que requieren persistencia de datos, SQLx emerge como una biblioteca asincrónica que facilita la interacción con bases de datos SQL sin el overhead de un ORM completo. Este capítulo explora las herramientas esenciales de SQLx para ejecutar consultas, gestionar conexiones eficientes, manejar migraciones y controlar transacciones, preparando el terreno para aplicaciones robustas y escalables. Su importancia radica en la capacidad de SQLx para combinar seguridad de tipos con rendimiento asincrónico, permitiendo a los desarrolladores de Rust integrar datos persistentes de manera directa y eficiente.
Ejecución de consultas con sqlx::query!
La macro sqlx::query! representa una de las características centrales de SQLx, ya que permite la ejecución de consultas SQL con verificación estática de tipos en tiempo de compilación. Esta macro analiza la consulta en el momento de la compilación, asegurando que los tipos de los parámetros de entrada y los resultados de salida coincidan con el esquema de la base de datos, lo que reduce errores en tiempo de ejecución. Para utilizarla, es necesario configurar una conexión a la base de datos, típicamente mediante un pool de conexiones, aunque en ejemplos aislados se puede emplear una conexión única.
Considérese una consulta simple para insertar un registro en una tabla de tareas. La macro requiere que se especifique la cadena de conexión a la base de datos a través de la variable de entorno DATABASE_URL durante la compilación, lo que habilita la verificación estática. Un ejemplo mínimo ilustra su uso:
use sqlx::postgres::PgPool;
async fn insert_task(pool: &PgPool, description: &str) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO tasks (description) VALUES ($1)",
description
)
.execute(pool)
.await?;
Ok(())
}
Aquí, la macro query! genera un tipo anónimo que encapsula la consulta, verificando que $1 corresponda a un tipo compatible con &str. En casos borde, si la consulta no coincide con el esquema (por ejemplo, un campo inexistente), el compilador fallará, previniendo errores sutiles. Comparado con bibliotecas en otros lenguajes como Python’s SQLAlchemy, donde la verificación es dinámica, SQLx impone rigurosidad estática, alineándose con la filosofía de seguridad de Rust. Para consultas que devuelven filas, la macro soporta mapeo directo a structs:
#[derive(sqlx::FromRow)]
struct Task {
id: i32,
description: String,
}
async fn get_task(pool: &PgPool, id: i32) -> Result<Task, sqlx::Error> {
let task = sqlx::query_as!(
Task,
"SELECT id, description FROM tasks WHERE id = $1",
id
)
.fetch_one(pool)
.await?;
Ok(task)
}
La derivación FromRow asegura que los campos del struct coincidan con las columnas seleccionadas. Detalles sutiles incluyen el manejo de tipos nulos: si un campo puede ser NULL, debe usarse Option<T> en el struct para evitar pánicos en tiempo de ejecución. Esta aproximación minimiza el boilerplate mientras mantiene la integridad de tipos.
Gestión de pools de conexiones
Los pools de conexiones en SQLx optimizan el rendimiento en entornos asincrónicos al reutilizar conexiones existentes en lugar de establecer nuevas para cada operación. La estructura Pool (específica por backend, como PgPool para PostgreSQL o MySqlPool para MySQL) se crea a partir de una URL de conexión y gestiona un conjunto configurable de conexiones mínimas y máximas. Esto es crucial para aplicaciones de alto tráfico, donde el overhead de conexión puede convertirse en un cuello de botella.
La creación de un pool se realiza de manera asincrónica, permitiendo su integración en runtime como Tokio. Un ejemplo aislado demuestra la inicialización:
use sqlx::{Pool, Postgres};
use sqlx::postgres::PgPoolOptions;
async fn create_pool() -> Result<Pool<Postgres>, sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://user:pass@localhost/db")
.await?;
Ok(pool)
}
Reglas formales dictan que el pool debe ser clonable y compartible entre tareas asincrónicas, ya que implementa Clone y Send. En configuraciones avanzadas, se puede ajustar el tiempo de vida de conexiones inactivas con .idle_timeout(Some(Duration::from_secs(30))), previniendo fugas de recursos. Comparado con pools en Java’s HikariCP, SQLx integra nativamente con el modelo asincrónico de Rust, evitando bloqueos síncronos. Para MySQL, el proceso es análogo, sustituyendo Postgres por MySql y ajustando la URL соответственно.
Un aspecto sutil es el manejo de errores de conexión: si el pool no puede adquirir una conexión dentro del tiempo de espera configurado (por defecto, 30 segundos), retornará sqlx::Error::PoolTimedOut. Esto resalta la necesidad de monitoreo en producción, donde pools subdimensionados pueden llevar a degradación de rendimiento.
34.3 Migraciones con sqlx-cli
La herramienta de línea de comandos sqlx-cli facilita la gestión de migraciones de esquema, permitiendo versionar cambios en la base de datos de manera reproducible. Instalada vía cargo install sqlx-cli, opera independientemente del código de la aplicación, generando scripts SQL en un directorio migrations/ y aplicándolos secuencialmente. Esto asegura consistencia entre entornos de desarrollo, pruebas y producción.
Para inicializar migraciones, se ejecuta sqlx migrate new <nombre>, lo que crea un par de archivos .up.sql y .down.sql con timestamps. Por ejemplo, una migración para crear una tabla de tareas:
-- migrations/20230101120000_create_tasks.up.sql
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
description TEXT NOT NULL,
completed BOOLEAN DEFAULT FALSE
);
La aplicación se realiza con sqlx migrate run, que verifica una tabla interna _sqlx_migrations para rastrear versiones aplicadas. Casos borde incluyen conflictos de migraciones concurrentes, resueltos mediante locks en la tabla de metadatos. A diferencia de herramientas como Alembic en Python, sqlx-cli no infiere migraciones automáticamente, requiriendo scripts manuales para mayor control y simplicidad.
La reversión se logra con sqlx migrate revert, ejecutando el script .down.sql correspondiente. En entornos con múltiples backends, sqlx-cli soporta PostgreSQL y MySQL, ajustando sintaxis internamente cuando es necesario. Un detalle sutil es la integración con DATABASE_URL: si no se especifica, se hereda del entorno, facilitando flujos CI/CD.
Control de transacciones
Las transacciones en SQLx permiten agrupar operaciones atómicas, asegurando que todas se completen con éxito o ninguna, mediante la estructura Transaction. Iniciada desde un pool o conexión, una transacción se gestiona con métodos como begin, commit y rollback, integrándose perfectamente en bloques asincrónicos.
Un ejemplo aislado ilustra una transacción para actualizar múltiples registros:
use sqlx::postgres::{PgPool, PgTransaction};
async fn update_tasks(pool: &PgPool, ids: Vec<i32>, completed: bool) -> Result<(), sqlx::Error> {
let mut tx: PgTransaction = pool.begin().await?;
for id in ids {
sqlx::query!(
"UPDATE tasks SET completed = $1 WHERE id = $2",
completed, id
)
.execute(&mut tx)
.await?;
}
tx.commit().await?;
Ok(())
}
Aquí, begin adquiere una conexión del pool y la envuelve en una transacción. Reglas formales exigen que todas las operaciones dentro de la transacción usen la referencia mutable &mut tx, evitando mezclas con el pool original. En casos de error, rollback se invoca explícitamente o implícitamente al dropear la transacción sin commit.
Comparado con transacciones en Go’s database/sql, SQLx soporta nesting limitado mediante savepoints, accesibles con tx.savepoint("nombre"). Detalles sutiles incluyen el aislamiento: por defecto, sigue el nivel de la base de datos (e.g., READ COMMITTED en PostgreSQL), pero se puede ajustar con consultas SQL previas al begin. Para MySQL, el comportamiento es similar, con consideraciones para motores como InnoDB que soportan transacciones ACID.
Proyecto completo: API REST de tareas persistente con Postgres/MySQL
Este proyecto implementa una API REST simple para gestionar tareas, utilizando SQLx con PostgreSQL (o MySQL como alternativa). La estructura de carpetas sigue convenciones estándar de Cargo:
tareas-api/
├── Cargo.toml
├── migrations/
│ ├── 20240101000001_create_tasks.up.sql
│ └── 20240101000001_create_tasks.down.sql
├── src/
│ ├── main.rs
│ └── models.rs
Contenido de Cargo.toml:
[package]
name = "tareas-api"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.6"
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "macros"] }
# Cambiar a "mysql" para MySQL
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Migración en migrations/20240101000001_create_tasks.up.sql (para PostgreSQL; ajustar para MySQL eliminando SERIAL y usando AUTO_INCREMENT):
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
description TEXT NOT NULL,
completed BOOLEAN DEFAULT FALSE
);
migrations/20240101000001_create_tasks.down.sql:
DROP TABLE tasks;
En src/models.rs:
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
#[derive(Serialize, FromRow)]
pub struct Task {
pub id: i32,
pub description: String,
pub completed: bool,
}
#[derive(Deserialize)]
pub struct CreateTask {
pub description: String,
}
En src/main.rs:
use axum::{
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post, put},
Json, Router,
};
use sqlx::{PgPool, Pool, Postgres};
use std::net::SocketAddr;
use std::sync::Arc;
use crate::models::{CreateTask, Task};
mod models;
async fn create_pool() -> Pool<Postgres> {
// Para MySQL: use sqlx::MySqlPool; y ajustar URL
PgPool::connect("postgres://user:pass@localhost/tareas_db")
.await
.expect("Failed to connect to database")
}
async fn get_tasks(State(pool): State<Arc<PgPool>>) -> impl IntoResponse {
let tasks = sqlx::query_as!(Task, "SELECT id, description, completed FROM tasks")
.fetch_all(&**pool)
.await
.unwrap();
(StatusCode::OK, Json(tasks))
}
async fn create_task(State(pool): State<Arc<PgPool>>, Json(payload): Json<CreateTask>) -> impl IntoResponse {
sqlx::query!(
"INSERT INTO tasks (description) VALUES ($1)",
payload.description
)
.execute(&**pool)
.await
.unwrap();
StatusCode::CREATED
}
async fn complete_task(State(pool): State<Arc<PgPool>>, Path(id): Path<i32>) -> impl IntoResponse {
let mut tx = pool.begin().await.unwrap();
sqlx::query!("UPDATE tasks SET completed = TRUE WHERE id = $1", id)
.execute(&mut tx)
.await
.unwrap();
tx.commit().await.unwrap();
StatusCode::OK
}
#[tokio::main]
async fn main() {
let pool = Arc::new(create_pool().await);
let app = Router::new()
.route("/tasks", get(get_tasks).post(create_task))
.route("/tasks/:id/complete", put(complete_task))
.with_state(pool);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
Para compilar y ejecutar: asegúrese de que sqlx-cli esté instalado, ejecute sqlx migrate run con DATABASE_URL configurada, luego cargo run. La API expone endpoints para listar/crear tareas y marcar como completadas, demostrando pools, queries y transacciones en un contexto práctico. Para MySQL, reemplace postgres por mysql en dependencias y código, ajustando la URL y migraciones según sea necesario.
Habiendo establecido las bases para interactuar con bases de datos relacionales mediante SQLx, el siguiente capítulo profundizará en patrones avanzados de concurrencia asincrónica, extendiendo estas técnicas a escenarios distribuidos.