Observabilidad en Python: logs, métricas y trazas unificadas

Cuando un servicio falla en producción a las 3 AM, la diferencia entre resolverlo en diez minutos o en dos horas está casi siempre en la calidad de la instrumentación. Observabilidad no es lo mismo que monitoring: monitoring te dice que algo está roto, observabilidad te permite entender por qué sin tocar el sistema en vivo.

Los tres pilares son complementarios y no sustituibles entre sí. Los logs son eventos discretos con contexto (“el usuario 42 intentó pagar”); las métricas son valores numéricos que evolucionan en el tiempo (latencia p99, tasa de errores, RPS); los traces son el recorrido completo de una request a través de múltiples servicios, mostrando qué llamó a qué y cuánto tardó cada paso. Puedes tener los tres y seguir sin observabilidad real si no están correlacionados. El secreto está en el trace ID: un identificador único que viaja con cada request y que, cuando aparece en logs, métricas y spans a la vez, convierte tres flujos de datos independientes en una narrativa coherente.

OpenTelemetry (OTel) es el estándar de instrumentación agnóstico al vendor que absorbe los dos proyectos anteriores (OpenCensus y OpenTracing). Su apuesta es separar la API de instrumentación del backend de almacenamiento: instrumentas una vez, eliges después si exportas a Jaeger, Grafana Tempo, Datadog o cualquier otro colector compatible con OTLP. Para métricas, prometheus_client sigue siendo el path más directo cuando ya tienes Prometheus en tu stack, y OTel puede coexistir con él sin fricciones.

El costo real de instrumentación no es el overhead en CPU (que existe pero es medible y típicamente < 2% en trazas con sampling al 10-20%), sino el costo cognitivo de mantener la instrumentación actualizada cuando la lógica de negocio cambia. Si añades un span pero no lo actualizas cuando refactorizas la función, el trace miente, y un trace que miente es peor que no tener ninguno.

# requirements:
# opentelemetry-api==1.24.0
# opentelemetry-sdk==1.24.0
# opentelemetry-exporter-otlp-proto-grpc==1.24.0
# opentelemetry-instrumentation-fastapi==0.45b0
# prometheus-client==0.20.0
# fastapi==0.111.0
# uvicorn==0.29.0

import logging
import time
import random
import structlog
from contextlib import asynccontextmanager

from fastapi import FastAPI, Request, HTTPException
from prometheus_client import Counter, Histogram, make_asgi_app, REGISTRY
from prometheus_client import multiprocess, CollectorRegistry

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace import Status, StatusCode
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

# ── Configuración de OpenTelemetry ────────────────────────────────────────────

def setup_tracing(service_name: str, otlp_endpoint: str = "http://localhost:4317") -> trace.Tracer:
    resource = Resource.create({"service.name": service_name, "service.version": "1.0.0"})
    provider = TracerProvider(resource=resource)

    exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
    # BatchSpanProcessor es no-bloqueante: acumula spans y los envía en lotes.
    # SimpleSpanProcessor (síncrono) solo para desarrollo/debug.
    provider.add_span_processor(BatchSpanProcessor(exporter))

    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)

# ── Métricas Prometheus ───────────────────────────────────────────────────────

REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total de requests HTTP",
    # labelnames define la cardinalidad; nunca uses valores de alta cardinalidad
    # (e.g., user_id) como label: destruyen el rendimiento de Prometheus.
    labelnames=["method", "endpoint", "status_code"],
)

REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "Latencia de requests HTTP",
    labelnames=["method", "endpoint"],
    # Buckets calibrados para APIs web típicas; ajusta según tu SLO.
    buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5],
)

EXTERNAL_CALL_ERRORS = Counter(
    "external_call_errors_total",
    "Errores en llamadas a servicios externos",
    labelnames=["service", "operation"],
)

# ── Logger estructurado con contexto de trace ─────────────────────────────────

structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        # JSONRenderer hace que los logs sean parseables por Loki, Elasticsearch, etc.
        structlog.processors.JSONRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    logger_factory=structlog.PrintLoggerFactory(),
)

def get_logger_with_trace_context(name: str) -> structlog.BoundLogger:
    """Inyecta trace_id y span_id del span activo en cada línea de log."""
    span = trace.get_current_span()
    ctx = span.get_span_context()

    bound_fields = {"logger": name}
    if ctx.is_valid:
        # trace_id en formato hex de 32 chars es el estándar W3C Trace Context.
        bound_fields["trace_id"] = format(ctx.trace_id, "032x")
        bound_fields["span_id"] = format(ctx.span_id, "016x")

    return structlog.get_logger().bind(**bound_fields)


# ── Simulación de llamada a servicio externo ──────────────────────────────────

tracer: trace.Tracer  # inicializado en lifespan

def call_inventory_service(product_id: str) -> dict:
    """
    Función que llama a un servicio externo. Ilustra cómo crear spans hijos
    y cómo registrar errores de forma que el backend de trazas los resalte.
    """
    log = get_logger_with_trace_context("inventory_client")

    with tracer.start_as_current_span(
        "inventory.check_stock",
        # SpanKind.CLIENT marca que este proceso es el caller en una llamada remota.
        kind=trace.SpanKind.CLIENT,
    ) as span:
        span.set_attribute("inventory.product_id", product_id)
        span.set_attribute("rpc.system", "http")

        log.info("checking_inventory", product_id=product_id)

        # Simulación de latencia variable y fallos ocasionales
        time.sleep(random.uniform(0.01, 0.08))

        if random.random() < 0.15:  # 15% de error simulado
            EXTERNAL_CALL_ERRORS.labels(
                service="inventory", operation="check_stock"
            ).inc()
            # record_exception captura el stack trace dentro del span.
            err = ConnectionError("Inventory service timeout")
            span.record_exception(err)
            span.set_status(Status(StatusCode.ERROR, "upstream timeout"))
            log.error("inventory_call_failed", product_id=product_id, error=str(err))
            raise HTTPException(status_code=503, detail="Inventory unavailable")

        result = {"product_id": product_id, "stock": random.randint(0, 100)}
        span.set_attribute("inventory.stock", result["stock"])
        log.info("inventory_checked", product_id=product_id, stock=result["stock"])
        return result


# ── Aplicación FastAPI ────────────────────────────────────────────────────────

@asynccontextmanager
async def lifespan(app: FastAPI):
    global tracer
    tracer = setup_tracing("order-service")
    yield
    # En producción harías flush del BatchSpanProcessor aquí para no perder
    # spans en vuelo durante un shutdown graceful.
    trace.get_tracer_provider().shutdown()  # type: ignore[attr-defined]

app = FastAPI(lifespan=lifespan)

# FastAPIInstrumentor intercepta cada request y crea spans automáticamente,
# propagando el contexto W3C Trace Context desde headers entrantes.
FastAPIInstrumentor.instrument_app(app)

# Monta el endpoint /metrics como sub-aplicación ASGI; no pasa por el
# middleware de autenticación de FastAPI —asegúrate de protegerlo en producción.
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)


@app.middleware("http")
async def observability_middleware(request: Request, call_next):
    """
    Middleware que registra métricas de latencia y conteo por endpoint.
    OTel ya creó el span; aquí solo añadimos las métricas Prometheus.
    """
    start = time.perf_counter()
    response = await call_next(request)
    duration = time.perf_counter() - start

    endpoint = request.url.path
    method = request.method
    status = str(response.status_code)

    REQUEST_COUNT.labels(method=method, endpoint=endpoint, status_code=status).inc()
    REQUEST_LATENCY.labels(method=method, endpoint=endpoint).observe(duration)

    return response


@app.get("/orders/{order_id}")
async def get_order(order_id: str, product_id: str = "SKU-001"):
    log = get_logger_with_trace_context("order_handler")

    # Recuperamos el span creado por FastAPIInstrumentor y le añadimos atributos
    # de dominio. Los atributos de negocio son lo que distingue un trace útil
    # de uno genérico.
    current_span = trace.get_current_span()
    current_span.set_attribute("order.id", order_id)
    current_span.set_attribute("order.product_id", product_id)

    log.info("order_request_started", order_id=order_id, product_id=product_id)

    inventory = call_inventory_service(product_id)

    if inventory["stock"] == 0:
        current_span.set_attribute("order.result", "out_of_stock")
        log.warning("order_rejected_no_stock", order_id=order_id)
        raise HTTPException(status_code=409, detail="Out of stock")

    current_span.set_attribute("order.result", "accepted")
    log.info("order_processed", order_id=order_id, stock_remaining=inventory["stock"])

    return {
        "order_id": order_id,
        "product_id": product_id,
        "status": "accepted",
        "stock_remaining": inventory["stock"],
    }

Qué hace cada decisión y por qué importa

La separación entre span raíz y span hijo es lo más importante del ejemplo. FastAPIInstrumentor crea automáticamente el span raíz para cada request HTTP, propaga el contexto de trazas que venga en los headers traceparent (estándar W3C), y cierra el span al terminar. call_inventory_service crea un span hijo con start_as_current_span: esto establece una relación padre-hijo en el trace, y cualquier backend compatible (Jaeger, Tempo) te mostrará el timeline anidado de cuánto tardó cada parte.

La correlación log-trace ocurre en get_logger_with_trace_context. Cuando llamas a trace.get_current_span(), OTel devuelve el span activo en el contexto de ejecución actual (usa contextvars internamente, por lo que funciona correctamente con async). Extraes trace_id y span_id y los inyectas como campos en el logger estructurado. El resultado es que cada línea de log tiene el mismo trace_id que el span correspondiente. En Grafana puedes configurar una “derived field” en Loki que convierta ese campo en un link directo al trace en Tempo.

La cardinalidad de labels en Prometheus es un tema que destruye infraestructuras. REQUEST_COUNT usa endpoint como label, lo cual es razonable si tus rutas son /orders/{order_id} (FastAPI las normaliza así). Si usaras el valor concreto del order_id, tendrías una serie temporal distinta por cada orden —millones de series, Prometheus explota en memoria. Por eso el comentario en EXTERNAL_CALL_ERRORS advierte explícitamente: nunca IDs de usuario, tokens, ni UUIDs como labels.

BatchSpanProcessor vs SimpleSpanProcessor: el batch es asíncrono y acumula spans en un buffer en memoria antes de exportarlos. Tiene latencia de exportación (configurable, por defecto 5 segundos) pero no bloquea el path crítico de tu request. SimpleSpanProcessor exporta de forma síncrona en cada span, lo que añade la latencia de red al tiempo de respuesta de tu API —útil para ver spans en tiempo real durante desarrollo, pero nunca en producción.

span.record_exception + span.set_status(ERROR): estas dos llamadas son distintas y complementarias. record_exception adjunta el stack trace como un evento dentro del span. set_status(ERROR) marca el span como fallido, lo que hace que los backends lo resalten en rojo y que las alertas basadas en error rate del trace funcionen. Si solo haces una de las dos, pierdes información.

Errores que debes conocer

Error: usar atributos de alta cardinalidad como span.set_attribute("user.id", user_id) en el span raíz y luego intentar agregar métricas sobre ellos usando ese atributo como dimensión.

# ❌ Wrong: exportar atributos de alta cardinalidad como labels de métricas
REQUEST_COUNT = Counter("requests_total", "...", labelnames=["user_id"])

@app.middleware("http")
async def bad_middleware(request: Request, call_next):
    user_id = request.headers.get("X-User-ID", "anonymous")
    response = await call_next(request)
    REQUEST_COUNT.labels(user_id=user_id).inc()  # una serie por usuario
    return response

# ✅ Right: alta cardinalidad en spans (OTel los indexa por trace_id),
# baja cardinalidad en métricas Prometheus
REQUEST_COUNT = Counter("requests_total", "...", labelnames=["endpoint", "status_code"])

@app.middleware("http")
async def good_middleware(request: Request, call_next):
    response = await call_next(request)
    # el user_id va al span, donde tiene sentido buscarlo por trace_id
    trace.get_current_span().set_attribute("user.id", request.headers.get("X-User-ID", ""))
    REQUEST_COUNT.labels(endpoint=request.url.path, status_code=response.status_code).inc()
    return response

Prometheus está diseñado para métricas de baja cardinalidad; los traces están diseñados para datos de alta cardinalidad. Usa cada herramienta para lo que fue diseñada.


Error: no llamar a shutdown() en el TracerProvider al terminar el proceso, lo que provoca pérdida de los últimos spans que todavía están en el buffer del BatchSpanProcessor.

# ❌ Wrong: el proceso termina sin vaciar el buffer
app = FastAPI()
tracer = setup_tracing("my-service")
# Al hacer Ctrl+C o SIGTERM, los últimos N spans se pierden silenciosamente

# ✅ Right: lifespan garantiza el shutdown graceful
@asynccontextmanager
async def lifespan(app: FastAPI):
    global tracer
    tracer = setup_tracing("my-service")
    yield
    trace.get_tracer_provider().shutdown()  # type: ignore[attr-defined]

app = FastAPI(lifespan=lifespan)

En despliegues con rolling updates o canary deployments, esto es especialmente importante porque los pods se reemplazan frecuentemente y perderías trazas de los últimos segundos de vida de cada instancia.


Error: llamar a get_logger_with_trace_context fuera del contexto de un span activo en código async, cuando la ejecución ya salió del scope del with tracer.start_as_current_span(...).

# ❌ Wrong: log fuera del scope del span — trace_id estará vacío
async def process_background_job(order_id: str):
    with tracer.start_as_current_span("background.process") as span:
        result = do_work(order_id)
    # El span ya cerró; get_current_span() devuelve un NonRecordingSpan
    log = get_logger_with_trace_context("job")
    log.info("job_done", order_id=order_id)  # sin trace_id en el log

# ✅ Right: log dentro del scope
async def process_background_job(order_id: str):
    with tracer.start_as_current_span("background.process") as span:
        result = do_work(order_id)
        log = get_logger_with_trace_context("job")
        log.info("job_done", order_id=order_id)  # trace_id presente

contextvars en Python es scoped: fuera del with, el contexto de OTel ya no tiene un span activo válido, y ctx.is_valid devuelve False, por lo que get_logger_with_trace_context simplemente no inyecta los campos de traza.

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio