Async workers y task queues: Celery, RQ y ARQ

Cuando un endpoint recibe una petición y necesita enviar un email de bienvenida, redimensionar una imagen o consultar una API externa que tarda tres segundos, tienes un problema: HTTP no espera. El cliente espera una respuesta, el servidor web tiene un número limitado de workers, y bloquear uno de ellos en una tarea lenta es exactamente la receta para que tu aplicación se caiga bajo carga.

La solución es separar trabajo de respuesta: el endpoint encola la tarea y responde inmediatamente con un 202 Accepted (o un ID de tarea), y un proceso completamente separado —el worker— consume esa cola y ejecuta el trabajo real. Esto no es solo una optimización de rendimiento; es un cambio arquitectónico que desacopla la latencia del procesamiento de la latencia percibida por el usuario.

El mecanismo central es siempre el mismo: un broker (Redis o RabbitMQ) actúa como intermediario. El servidor web publica un mensaje serializado en una cola; el worker lo lee, deserializa, ejecuta la función y opcionalmente publica el resultado en un result backend (también Redis, normalmente). El worker es un proceso Python ordinario que corre en un loop infinito esperando mensajes.

Dónde difieren Celery, RQ y ARQ es en la complejidad de configuración, el modelo de concurrencia y los trade-offs de madurez versus modernidad.

Celery lleva más de quince años en producción. Soporta múltiples brokers, task routing avanzado, retry policies, beat scheduler para tareas periódicas, canvas para componer workflows (chains, groups, chords), y monitoring con Flower. Esa madurez tiene un precio: la configuración puede ser verbosa, los import-time side effects causan bugs sutiles, y debuggear un deadlock en un chord es una tarde de tu vida.

RQ (Redis Queue) elimina todo eso a cambio de simplicidad. Requiere Redis y nada más, las tareas son funciones Python normales, y la configuración cabe en diez líneas. El trade-off es obvio: nada de multiple brokers, nada de canvas, nada de beat nativo.

ARQ es la opción cuando tu stack ya es async. Está diseñada para corrutinas async def, corre en un event loop en lugar de threads o procesos, y encaja naturalmente con FastAPI o cualquier aplicación basada en asyncio. Si usas RQ con código async, terminas con asyncio.run() dentro del worker, que es exactamente lo que no quieres.

La pregunta sobre cuándo no necesitas nada de esto merece respuesta directa: si tu tarea tarda menos de un segundo y puede fallar sin consecuencias para el usuario, BackgroundTasks de FastAPI o asyncio.gather son suficientes. La complejidad operacional de un task queue —broker en infraestructura, workers desplegados, monitoring— solo se justifica cuando necesitas durabilidad (la tarea sobrevive a un restart), retry automático, o separación de recursos entre el servidor web y el procesamiento pesado.

# requirements: celery[redis], rq, arq, redis, httpx, Pillow

# ─── CELERY ──────────────────────────────────────────────────────────────────
# celery_app.py
from celery import Celery

app = Celery(
    "workers",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",  # distinto DB para resultados
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_acks_late=True,   # el worker hace ACK después de ejecutar, no al recibir
    worker_prefetch_multiplier=1,  # evita que un worker acapare todas las tareas
)


@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_image(self, image_path: str, user_id: int) -> dict:
    """bind=True da acceso a self para poder llamar retry()."""
    try:
        from PIL import Image
        img = Image.open(image_path)
        thumbnail = img.resize((200, 200))
        output_path = image_path.replace("original", "thumb")
        thumbnail.save(output_path)
        return {"status": "ok", "path": output_path, "user_id": user_id}
    except Exception as exc:
        # reintenta con backoff exponencial; si se agota, propaga la excepción
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)


# Para encolar desde el servidor web (FastAPI, Django, Flask, lo que sea):
# result = process_image.delay("media/original/foto.jpg", user_id=42)
# result.id  → UUID que puedes devolver al cliente para polling

# Iniciar el worker:
# celery -A celery_app worker --loglevel=info --concurrency=4


# ─── RQ ──────────────────────────────────────────────────────────────────────
# rq_tasks.py
import httpx


def fetch_and_store(url: str, record_id: int) -> dict:
    """Función Python normal: RQ no exige decoradores."""
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    data = response.json()
    # aquí guardarías en DB; simplificado para el ejemplo
    return {"record_id": record_id, "fetched": len(data)}


# rq_enqueue.py
from redis import Redis
from rq import Queue
from rq_tasks import fetch_and_store

redis_conn = Redis()
q = Queue(connection=redis_conn)

# job tiene .id, .status, .result disponibles para polling posterior
job = q.enqueue(
    fetch_and_store,
    "https://jsonplaceholder.typicode.com/posts",
    42,
    job_timeout=120,   # mata la tarea si supera 2 minutos
    retry=3,           # RQ >= 1.10
)
print(f"Job encolado: {job.id}")

# Iniciar el worker:
# rq worker --with-scheduler


# ─── ARQ ─────────────────────────────────────────────────────────────────────
# arq_workers.py
import asyncio
import httpx
from arq import create_pool
from arq.connections import RedisSettings


# Las tareas son corrutinas normales; el primer argumento es siempre el contexto
async def send_welcome_email(ctx: dict, user_email: str, username: str) -> str:
    """ctx contiene la sesión de Redis y cualquier recurso del startup."""
    http_client: httpx.AsyncClient = ctx["http_client"]
    # llamada real a un servicio de email
    response = await http_client.post(
        "https://api.mailservice.example/send",
        json={"to": user_email, "template": "welcome", "name": username},
        timeout=10,
    )
    response.raise_for_status()
    return f"email enviado a {user_email}"


class WorkerSettings:
    """ARQ descubre la configuración a través de esta clase."""
    functions = [send_welcome_email]
    redis_settings = RedisSettings(host="localhost", port=6379)
    max_jobs = 10

    @staticmethod
    async def on_startup(ctx: dict):
        # el cliente HTTP se crea una vez por worker, no por tarea
        ctx["http_client"] = httpx.AsyncClient()

    @staticmethod
    async def on_shutdown(ctx: dict):
        await ctx["http_client"].aclose()


# arq_enqueue.py
async def enqueue_welcome():
    redis = await create_pool(RedisSettings())
    job = await redis.enqueue_job(
        "send_welcome_email",
        "ana@ejemplo.com",
        "Ana",
    )
    print(f"Job: {job.job_id}")
    await redis.aclose()


if __name__ == "__main__":
    asyncio.run(enqueue_welcome())

# Iniciar el worker:
# arq arq_workers.WorkerSettings

Qué significa cada decisión

En Celery, task_acks_late=True junto con worker_prefetch_multiplier=1 cambia la semántica de durabilidad. Por defecto, Celery hace ACK al broker en el momento de recibir la tarea; si el worker muere durante la ejecución, la tarea se pierde. Con acks_late, el ACK ocurre solo cuando la función termina (o falla definitivamente), de modo que un worker caído devuelve la tarea a la cola. prefetch_multiplier=1 evita que el worker reserve más tareas de las que puede ejecutar simultáneamente, lo que sería problemático con tareas largas en un broker con visibilidad timeout.

El patrón bind=True + self.retry() con countdown=2 ** self.request.retries implementa backoff exponencial sin dependencias externas: primer retry en 1 s, segundo en 2 s, tercero en 4 s. Llamar raise self.retry(exc=exc) en lugar de simplemente self.retry(exc=exc) es obligatorio para que Celery propague la excepción correctamente en el estado de la tarea.

En RQ, la ausencia de decoradores es intencionada: cualquier función importable es una tarea. Esto tiene un corolario importante —la función debe ser importable en el proceso worker, así que no puedes encolar lambdas ni closures locales. job_timeout no es opcional en producción: sin él, una tarea colgada bloquea un worker para siempre.

En ARQ, el patrón on_startup/on_shutdown es donde vive la gestión de recursos. Crear un httpx.AsyncClient por tarea es caro y además viola el principio de reutilización de conexiones. El contexto ctx es un diccionario compartido por todas las tareas que corren en el mismo worker, así que cualquier recurso que pongas ahí debe ser thread-safe (irrelevante en asyncio puro) y correctamente cerrado en on_shutdown. Fíjate que ARQ identifica las tareas por nombre de función como string "send_welcome_email" —si renombras la función sin migrar los jobs pendientes en Redis, esos jobs fallarán silenciosamente.

La elección entre las tres herramientas se reduce a dos ejes: ¿tu código es sync o async? ¿Necesitas features avanzadas como scheduling, routing o workflows compuestos? Si es async y no necesitas features avanzadas, ARQ. Si es sync y quieres simplicidad, RQ. Si necesitas scheduling, beat, routing por prioridad o ya tienes un equipo familiarizado con él, Celery —pero configúralo con cuidado desde el principio porque cambiar acks_late o el serializer en producción con tareas en vuelo es complicado.

Errores que debes conocer

Error: Importar modelos ORM dentro de la definición del módulo de tareas en lugar de dentro de la función, causando que el worker intente conectarse a la base de datos en el momento del import.

# ❌ Wrong
from myapp.models import User  # se ejecuta cuando Celery importa el módulo

@app.task
def notify_user(user_id: int):
    user = User.objects.get(id=user_id)
    ...

# ✅ Right
@app.task
def notify_user(user_id: int):
    from myapp.models import User  # import diferido: ocurre dentro del worker, post-setup
    user = User.objects.get(id=user_id)
    ...

El import diferido garantiza que la conexión a la base de datos se establece después de que el worker haya completado su inicialización, no durante la carga del módulo.


Error: Encolar objetos no serializables (instancias ORM, objetos complejos) en lugar de pasar identificadores primitivos.

# ❌ Wrong
user = User.objects.get(id=42)
process_image.delay(user)  # falla o serializa estado stale

# ✅ Right
process_image.delay(user_id=42)  # el worker recupera el objeto fresco

Los workers son procesos separados; pasar IDs en lugar de objetos es la única forma de garantizar que el worker trabaja con datos consistentes y que la serialización no falla.


Error: En ARQ, encolar una tarea con el nombre incorrecto porque la función fue renombrada o movida sin actualizar el string de referencia.

# ❌ Wrong
await redis.enqueue_job("send_email", user_email)  # función renombrada a send_welcome_email

# ✅ Right
from arq_workers import send_welcome_email
await redis.enqueue_job(send_welcome_email.__name__, user_email)

Usar función.__name__ en lugar de un string literal hace que el refactoring rompa el código en compile-time (en cuanto el import falla) en lugar de en runtime silencioso.

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio