Cuando haces response = requests.get(url) y accedes a response.content, requests descarga todo el cuerpo antes de darte el control. Para un JSON de 2 KB eso es perfecto. Para un CSV de 4 GB o la respuesta en streaming de un LLM, estás invitando a un MemoryError o simplemente desperdiciando segundos esperando bytes que podrías procesar inmediatamente.
El streaming HTTP resuelve esto cambiando el contrato: en lugar de esperar la respuesta completa, recibes el cuerpo en chunks conforme llegan del socket. La clave es decirle a requests que no consuma el stream automáticamente — eso se logra con stream=True — y luego iterar con iter_content(chunk_size=N). Con httpx en contexto async, el equivalente es aiter_bytes(), que libera el event loop entre chunks en lugar de bloquear el hilo.
Los WebSockets son una bestia diferente: usan HTTP solo para el handshake inicial (el Upgrade: websocket que convierte la conexión TCP en full-duplex), y a partir de ahí el protocolo cambia completamente. Ya no hay request/response — ambos extremos pueden enviar mensajes en cualquier momento. Esto los hace ideales para feeds en tiempo real, chat, y cualquier cosa donde el servidor necesita empujar datos sin que el cliente pregunte.
Cuándo te importa cada uno:
– iter_content / aiter_bytes: descarga de archivos, exportaciones grandes, respuestas de LLMs con text/event-stream.
– WebSockets: comunicación bidireccional sostenida — el servidor necesita hablar sin que el cliente pida.
Si usas streaming síncrono sin stream=True, requests ya descargó todo antes de que llegues al loop. Si usas WebSockets para algo que es fundamentalmente request/response, estás añadiendo complejidad sin beneficio.
import asyncio
import httpx
import websockets
import json
from pathlib import Path
# ── 1. Streaming síncrono con requests ───────────────────────────────────────
import requests
def download_large_file(url: str, dest: Path) -> int:
"""Descarga un archivo grande escribiendo chunk a chunk. Devuelve bytes totales."""
total = 0
# stream=True es la mitad del contrato; sin él requests consume todo en memoria
with requests.get(url, stream=True, timeout=30) as response:
response.raise_for_status()
with dest.open("wb") as f:
for chunk in response.iter_content(chunk_size=8192):
# iter_content filtra chunks vacíos (keep-alive) cuando chunk_size está fijo
if chunk:
f.write(chunk)
total += len(chunk)
return total
# ── 2. Streaming asíncrono con httpx ─────────────────────────────────────────
async def stream_llm_response(api_url: str, prompt: str) -> str:
"""
Consume una respuesta Server-Sent Events de un LLM.
aiter_bytes() cede control al event loop entre cada chunk recibido.
"""
payload = {"prompt": prompt, "stream": True}
collected = []
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream("POST", api_url, json=payload) as response:
response.raise_for_status()
async for chunk in response.aiter_bytes(chunk_size=1024):
# Los LLMs suelen devolver líneas "data: {...}\n\n"
text = chunk.decode("utf-8", errors="replace")
for line in text.splitlines():
if line.startswith("data: ") and line != "data: [DONE]":
try:
token = json.loads(line[6:]).get("token", "")
print(token, end="", flush=True)
collected.append(token)
except json.JSONDecodeError:
pass # chunk partido entre dos iteraciones — manejado más abajo
return "".join(collected)
# ── 3. WebSocket: cliente que envía y recibe ──────────────────────────────────
async def chat_client(ws_url: str, username: str) -> None:
"""
Cliente de chat mínimo. El servidor puede empujar mensajes
en cualquier momento sin que el cliente los solicite explícitamente.
"""
async with websockets.connect(ws_url) as ws:
# Registro inicial
await ws.send(json.dumps({"type": "join", "user": username}))
# Tarea separada para recibir mensajes entrantes sin bloquear el envío
async def receive_loop() -> None:
async for raw in ws: # itera hasta que la conexión se cierra
msg = json.loads(raw)
print(f"[{msg['user']}] {msg['text']}")
receiver = asyncio.create_task(receive_loop())
messages = ["hola a todos", "¿cómo están?", "hasta luego"]
for text in messages:
await ws.send(json.dumps({"type": "message", "user": username, "text": text}))
await asyncio.sleep(1)
receiver.cancel()
# ── 4. WebSocket: servidor eco mínimo ────────────────────────────────────────
async def echo_handler(ws: websockets.WebSocketServerProtocol) -> None:
"""
El handler recibe la conexión ya establecida (handshake hecho por la librería).
Itera sobre mensajes entrantes hasta que el cliente cierra.
"""
async for message in ws:
# Eco simple: devuelve el mismo payload con timestamp
response = json.dumps({"echo": message, "server": "ok"})
await ws.send(response)
async def run_echo_server() -> None:
async with websockets.serve(echo_handler, "localhost", 8765):
await asyncio.Future() # corre hasta cancelación externa
# ── Punto de entrada ──────────────────────────────────────────────────────────
if __name__ == "__main__":
# Demo streaming HTTP (requiere servidor real)
# asyncio.run(stream_llm_response("http://localhost:11434/api/generate", "Hola"))
# Demo WebSocket (lanza servidor y cliente en paralelo)
async def demo():
server_task = asyncio.create_task(run_echo_server())
await asyncio.sleep(0.1) # deja que el servidor arranque
await chat_client("ws://localhost:8765", "alice")
server_task.cancel()
asyncio.run(demo())
Lo que está pasando realmente
El stream=True en requests le dice a urllib3 que no llame a response.read() automáticamente. El objeto response mantiene abierta la conexión TCP hasta que termines de iterar o llames a response.close(). El with del ejemplo lo hace automáticamente — sin ese contexto, puedes filtrar conexiones si saltas del loop con una excepción.
iter_content(chunk_size=8192) no garantiza que cada chunk sea exactamente 8192 bytes: ese es el máximo que leerá del socket en una llamada. En la práctica, el primer chunk puede ser más pequeño si el servidor envía datos poco a poco. El if chunk: filtra los bytes vacíos que requests inserta cuando el servidor mantiene la conexión viva con keep-alive frames.
Con httpx asíncrono, aiter_bytes() es genuinamente no bloqueante: cada await dentro del loop cede al event loop, lo que permite que otras corrutinas progresen mientras esperas datos de red. El equivalente síncrono en httpx (iter_bytes()) bloquea igual que requests — la asincronía importa cuando tienes múltiples streams concurrentes.
En el código WebSocket, fíjate en el patrón asyncio.create_task(receive_loop()) dentro del cliente. Si no separas recepción y envío en tareas distintas, un await ws.recv() bloquea hasta que llega un mensaje, y nunca podrías enviar nada mientras esperas. Dos tareas concurrentes sobre el mismo ws es el patrón estándar para clientes full-duplex.
El servidor usa async for message in ws — cuando el cliente cierra la conexión limpiamente (frame de cierre WebSocket), el iterador termina y el handler retorna. Si el cliente desaparece abruptamente, websockets lanza ConnectionClosedError, que puedes capturar si necesitas limpiar recursos.
Errores que debes conocer
Error: Olvidar stream=True en requests e iterar iter_content de todos modos — la iteración funciona pero ya descargaste todo en memoria.
# ❌ Wrong
response = requests.get(url) # descargó todo aquí
for chunk in response.iter_content(chunk_size=8192):
process(chunk)
# ✅ Right
with requests.get(url, stream=True) as response:
for chunk in response.iter_content(chunk_size=8192):
process(chunk)
El iter_content sin stream=True simplemente corta response.content en trozos — útil cero, el daño ya está hecho.
Error: Decodificar cada chunk independientemente cuando la respuesta es UTF-8 — un carácter multibyte puede quedar partido entre dos chunks y producir UnicodeDecodeError.
# ❌ Wrong
async for chunk in response.aiter_bytes():
text = chunk.decode("utf-8") # explota si el chunk corta un carácter de 3 bytes
# ✅ Right
decoder = codecs.getincrementaldecoder("utf-8")("replace")
async for chunk in response.aiter_bytes():
text = decoder.decode(chunk) # buffer interno maneja bytes incompletos
IncrementalDecoder mantiene un buffer interno para los bytes incompletos al final de cada chunk, exactamente para este caso.
Error: Usar await ws.recv() en un loop sin manejar websockets.exceptions.ConnectionClosed — si el servidor cierra la conexión, la excepción mata la tarea silenciosamente si no hay try/except.
# ❌ Wrong
while True:
msg = await ws.recv() # lanza ConnectionClosed cuando el servidor cierra
# ✅ Right
try:
async for msg in ws: # el iterador maneja ConnectionClosed internamente
process(msg)
except websockets.exceptions.ConnectionClosedError as e:
logger.warning("Conexión cerrada inesperadamente: %s", e)
El async for sobre el WebSocket absorbe el cierre limpio y solo propaga cierres con código de error, dándote un punto de control claro.
N° 156