Streaming HTTP y WebSockets en Python

Cuando haces response = requests.get(url) y accedes a response.content, requests descarga todo el cuerpo antes de darte el control. Para un JSON de 2 KB eso es perfecto. Para un CSV de 4 GB o la respuesta en streaming de un LLM, estás invitando a un MemoryError o simplemente desperdiciando segundos esperando bytes que podrías procesar inmediatamente.

El streaming HTTP resuelve esto cambiando el contrato: en lugar de esperar la respuesta completa, recibes el cuerpo en chunks conforme llegan del socket. La clave es decirle a requests que no consuma el stream automáticamente — eso se logra con stream=True — y luego iterar con iter_content(chunk_size=N). Con httpx en contexto async, el equivalente es aiter_bytes(), que libera el event loop entre chunks en lugar de bloquear el hilo.

Los WebSockets son una bestia diferente: usan HTTP solo para el handshake inicial (el Upgrade: websocket que convierte la conexión TCP en full-duplex), y a partir de ahí el protocolo cambia completamente. Ya no hay request/response — ambos extremos pueden enviar mensajes en cualquier momento. Esto los hace ideales para feeds en tiempo real, chat, y cualquier cosa donde el servidor necesita empujar datos sin que el cliente pregunte.

Cuándo te importa cada uno:
iter_content / aiter_bytes: descarga de archivos, exportaciones grandes, respuestas de LLMs con text/event-stream.
WebSockets: comunicación bidireccional sostenida — el servidor necesita hablar sin que el cliente pida.

Si usas streaming síncrono sin stream=True, requests ya descargó todo antes de que llegues al loop. Si usas WebSockets para algo que es fundamentalmente request/response, estás añadiendo complejidad sin beneficio.

import asyncio
import httpx
import websockets
import json
from pathlib import Path


# ── 1. Streaming síncrono con requests ───────────────────────────────────────
import requests

def download_large_file(url: str, dest: Path) -> int:
    """Descarga un archivo grande escribiendo chunk a chunk. Devuelve bytes totales."""
    total = 0
    # stream=True es la mitad del contrato; sin él requests consume todo en memoria
    with requests.get(url, stream=True, timeout=30) as response:
        response.raise_for_status()
        with dest.open("wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                # iter_content filtra chunks vacíos (keep-alive) cuando chunk_size está fijo
                if chunk:
                    f.write(chunk)
                    total += len(chunk)
    return total


# ── 2. Streaming asíncrono con httpx ─────────────────────────────────────────
async def stream_llm_response(api_url: str, prompt: str) -> str:
    """
    Consume una respuesta Server-Sent Events de un LLM.
    aiter_bytes() cede control al event loop entre cada chunk recibido.
    """
    payload = {"prompt": prompt, "stream": True}
    collected = []

    async with httpx.AsyncClient(timeout=60.0) as client:
        async with client.stream("POST", api_url, json=payload) as response:
            response.raise_for_status()
            async for chunk in response.aiter_bytes(chunk_size=1024):
                # Los LLMs suelen devolver líneas "data: {...}\n\n"
                text = chunk.decode("utf-8", errors="replace")
                for line in text.splitlines():
                    if line.startswith("data: ") and line != "data: [DONE]":
                        try:
                            token = json.loads(line[6:]).get("token", "")
                            print(token, end="", flush=True)
                            collected.append(token)
                        except json.JSONDecodeError:
                            pass  # chunk partido entre dos iteraciones — manejado más abajo

    return "".join(collected)


# ── 3. WebSocket: cliente que envía y recibe ──────────────────────────────────
async def chat_client(ws_url: str, username: str) -> None:
    """
    Cliente de chat mínimo. El servidor puede empujar mensajes
    en cualquier momento sin que el cliente los solicite explícitamente.
    """
    async with websockets.connect(ws_url) as ws:
        # Registro inicial
        await ws.send(json.dumps({"type": "join", "user": username}))

        # Tarea separada para recibir mensajes entrantes sin bloquear el envío
        async def receive_loop() -> None:
            async for raw in ws:  # itera hasta que la conexión se cierra
                msg = json.loads(raw)
                print(f"[{msg['user']}] {msg['text']}")

        receiver = asyncio.create_task(receive_loop())

        messages = ["hola a todos", "¿cómo están?", "hasta luego"]
        for text in messages:
            await ws.send(json.dumps({"type": "message", "user": username, "text": text}))
            await asyncio.sleep(1)

        receiver.cancel()


# ── 4. WebSocket: servidor eco mínimo ────────────────────────────────────────
async def echo_handler(ws: websockets.WebSocketServerProtocol) -> None:
    """
    El handler recibe la conexión ya establecida (handshake hecho por la librería).
    Itera sobre mensajes entrantes hasta que el cliente cierra.
    """
    async for message in ws:
        # Eco simple: devuelve el mismo payload con timestamp
        response = json.dumps({"echo": message, "server": "ok"})
        await ws.send(response)


async def run_echo_server() -> None:
    async with websockets.serve(echo_handler, "localhost", 8765):
        await asyncio.Future()  # corre hasta cancelación externa


# ── Punto de entrada ──────────────────────────────────────────────────────────
if __name__ == "__main__":
    # Demo streaming HTTP (requiere servidor real)
    # asyncio.run(stream_llm_response("http://localhost:11434/api/generate", "Hola"))

    # Demo WebSocket (lanza servidor y cliente en paralelo)
    async def demo():
        server_task = asyncio.create_task(run_echo_server())
        await asyncio.sleep(0.1)  # deja que el servidor arranque
        await chat_client("ws://localhost:8765", "alice")
        server_task.cancel()

    asyncio.run(demo())

Lo que está pasando realmente

El stream=True en requests le dice a urllib3 que no llame a response.read() automáticamente. El objeto response mantiene abierta la conexión TCP hasta que termines de iterar o llames a response.close(). El with del ejemplo lo hace automáticamente — sin ese contexto, puedes filtrar conexiones si saltas del loop con una excepción.

iter_content(chunk_size=8192) no garantiza que cada chunk sea exactamente 8192 bytes: ese es el máximo que leerá del socket en una llamada. En la práctica, el primer chunk puede ser más pequeño si el servidor envía datos poco a poco. El if chunk: filtra los bytes vacíos que requests inserta cuando el servidor mantiene la conexión viva con keep-alive frames.

Con httpx asíncrono, aiter_bytes() es genuinamente no bloqueante: cada await dentro del loop cede al event loop, lo que permite que otras corrutinas progresen mientras esperas datos de red. El equivalente síncrono en httpx (iter_bytes()) bloquea igual que requests — la asincronía importa cuando tienes múltiples streams concurrentes.

En el código WebSocket, fíjate en el patrón asyncio.create_task(receive_loop()) dentro del cliente. Si no separas recepción y envío en tareas distintas, un await ws.recv() bloquea hasta que llega un mensaje, y nunca podrías enviar nada mientras esperas. Dos tareas concurrentes sobre el mismo ws es el patrón estándar para clientes full-duplex.

El servidor usa async for message in ws — cuando el cliente cierra la conexión limpiamente (frame de cierre WebSocket), el iterador termina y el handler retorna. Si el cliente desaparece abruptamente, websockets lanza ConnectionClosedError, que puedes capturar si necesitas limpiar recursos.

Errores que debes conocer

Error: Olvidar stream=True en requests e iterar iter_content de todos modos — la iteración funciona pero ya descargaste todo en memoria.

# ❌ Wrong
response = requests.get(url)  # descargó todo aquí
for chunk in response.iter_content(chunk_size=8192):
    process(chunk)

# ✅ Right
with requests.get(url, stream=True) as response:
    for chunk in response.iter_content(chunk_size=8192):
        process(chunk)

El iter_content sin stream=True simplemente corta response.content en trozos — útil cero, el daño ya está hecho.

Error: Decodificar cada chunk independientemente cuando la respuesta es UTF-8 — un carácter multibyte puede quedar partido entre dos chunks y producir UnicodeDecodeError.

# ❌ Wrong
async for chunk in response.aiter_bytes():
    text = chunk.decode("utf-8")  # explota si el chunk corta un carácter de 3 bytes

# ✅ Right
decoder = codecs.getincrementaldecoder("utf-8")("replace")
async for chunk in response.aiter_bytes():
    text = decoder.decode(chunk)  # buffer interno maneja bytes incompletos

IncrementalDecoder mantiene un buffer interno para los bytes incompletos al final de cada chunk, exactamente para este caso.

Error: Usar await ws.recv() en un loop sin manejar websockets.exceptions.ConnectionClosed — si el servidor cierra la conexión, la excepción mata la tarea silenciosamente si no hay try/except.

# ❌ Wrong
while True:
    msg = await ws.recv()  # lanza ConnectionClosed cuando el servidor cierra

# ✅ Right
try:
    async for msg in ws:  # el iterador maneja ConnectionClosed internamente
        process(msg)
except websockets.exceptions.ConnectionClosedError as e:
    logger.warning("Conexión cerrada inesperadamente: %s", e)

El async for sobre el WebSocket absorbe el cierre limpio y solo propaga cierres con código de error, dándote un punto de control claro.

156

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio