gRPC es un framework RPC (Remote Procedure Call) que usa Protocol Buffers como formato de serialización y HTTP/2 como transporte. La diferencia con REST no es filosófica sino estructural: defines un contrato en un archivo .proto, compilas ese contrato en código Python tipado, y tanto el cliente como el servidor operan sobre ese contrato. No hay ambigüedad sobre campos opcionales, no hay application/json que parsear a mano, no hay discusiones sobre si debe ser POST /activate o PUT /users/{id}/status.
Lo que hace que gRPC sea especialmente valioso en microservicios no es solo la eficiencia de Protobuf frente a JSON (aunque es real, entre 3x y 10x en tamaño y velocidad de serialización), sino la generación de código. El archivo .proto actúa como IDL (Interface Definition Language): es la única fuente de verdad y de ella se derivan clientes y servidores en cualquier lenguaje soportado. Si cambias un campo y no regeneras el cliente, el compilador te avisa antes de que llegues a producción.
El modelo de comunicación es más rico que REST: gRPC soporta cuatro patrones de streaming. El unario es el equivalente a una llamada HTTP normal (request → response). El server streaming permite que el servidor devuelva múltiples mensajes para una sola petición. El client streaming invierte esa relación. Y el bidireccional abre un canal full-duplex donde ambos lados envían mensajes independientemente. HTTP/1.1 simplemente no puede expresar eso sin workarounds.
Cuándo no usar gRPC: cuando el cliente es un navegador sin grpc-web, cuando necesitas que un humano pueda leer y depurar el tráfico con curl, o cuando el equipo es muy pequeño y el overhead de compilar .proto supera los beneficios. Para APIs públicas orientadas a terceros, REST sigue ganando por conveniencia de consumo. Para comunicación interna entre servicios propios, gRPC es casi siempre la elección correcta.
Lo que rompe si lo usas mal: el error más costoso es versionar proto files descuidadamente. Reutilizar un número de campo con un tipo diferente corrompe silenciosamente los mensajes porque Protobuf serializa por número, no por nombre. También es fácil abusar del ThreadPoolExecutor sin medir latencia real y estrellarte con un pool exhausto bajo carga.
Ejemplo completo: servicio de inventario con streaming y interceptor
Instala las dependencias necesarias:
pip install grpcio grpcio-tools
1. Definición del contrato (inventory.proto)
syntax = "proto3";
package inventory;
service InventoryService {
// Unario: consulta de un producto
rpc GetProduct(ProductRequest) returns (ProductResponse);
// Server streaming: stream de actualizaciones de stock
rpc WatchStock(ProductRequest) returns (stream StockUpdate);
// Client streaming: carga masiva de productos
rpc BulkUpload(stream ProductData) returns (UploadSummary);
// Bidireccional: sincronización en tiempo real
rpc Sync(stream SyncEvent) returns (stream SyncAck);
}
message ProductRequest {
string product_id = 1;
}
message ProductResponse {
string product_id = 1;
string name = 2;
int32 stock = 3;
float price = 4;
}
message StockUpdate {
string product_id = 1;
int32 new_stock = 2;
string timestamp = 3;
}
message ProductData {
string product_id = 1;
string name = 2;
int32 stock = 3;
float price = 4;
}
message UploadSummary {
int32 processed = 1;
int32 failed = 2;
}
message SyncEvent {
string product_id = 1;
string event_type = 2; // "UPDATE" | "DELETE"
int32 stock = 3;
}
message SyncAck {
string product_id = 1;
bool accepted = 2;
}
2. Generación del código Python
python -m grpc_tools.protoc \ -I. \ --python_out=. \ --grpc_python_out=. \ inventory.proto
Esto genera inventory_pb2.py (clases de mensajes) e inventory_pb2_grpc.py (stubs de cliente y esqueleto de servidor).
3. Implementación completa (server.py)
from __future__ import annotations
import time
import logging
import threading
from concurrent import futures
from datetime import datetime, timezone
from typing import Generator, Iterator
import grpc
from grpc import ServicerContext
import inventory_pb2 as pb
import inventory_pb2_grpc as pb_grpc
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
# Base de datos en memoria para el ejemplo
_PRODUCTS: dict[str, dict] = {
"SKU-001": {"name": "Widget Pro", "stock": 42, "price": 9.99},
"SKU-002": {"name": "Gadget Max", "stock": 7, "price": 49.99},
}
_LOCK = threading.Lock()
# ── Interceptor transversal ───────────────────────────────────────────────────
class LoggingInterceptor(grpc.ServerInterceptor):
"""Registra método, duración y código de estado para cada llamada RPC."""
def intercept_service(self, continuation, handler_call_details):
method = handler_call_details.method
start = time.perf_counter()
# continuation devuelve el handler real; lo envolvemos
handler = continuation(handler_call_details)
if handler is None:
return handler
def timed_unary(request, context: ServicerContext):
try:
response = handler.unary_unary(request, context)
logger.info("%s OK (%.3fs)", method, time.perf_counter() - start)
return response
except Exception as exc:
logger.error("%s ERROR: %s", method, exc)
raise
# Solo envolvemos unary_unary por brevedad; en producción envuelves los cuatro
if handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
timed_unary,
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler
# ── Servicer ─────────────────────────────────────────────────────────────────
class InventoryServicer(pb_grpc.InventoryServiceServicer):
def GetProduct(
self, request: pb.ProductRequest, context: ServicerContext
) -> pb.ProductResponse:
"""Unario: devuelve un producto o aborta con NOT_FOUND."""
with _LOCK:
product = _PRODUCTS.get(request.product_id)
if product is None:
# set_code + set_details es el equivalente a un 404 con body
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"Product {request.product_id!r} not found")
return pb.ProductResponse()
return pb.ProductResponse(
product_id=request.product_id,
name=product["name"],
stock=product["stock"],
price=product["price"],
)
def WatchStock(
self, request: pb.ProductRequest, context: ServicerContext
) -> Generator[pb.StockUpdate, None, None]:
"""Server streaming: emite actualizaciones de stock cada segundo."""
product_id = request.product_id
# context.is_active() permite detectar si el cliente canceló el stream
while context.is_active():
with _LOCK:
product = _PRODUCTS.get(product_id)
if product is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
return
yield pb.StockUpdate(
product_id=product_id,
new_stock=product["stock"],
timestamp=datetime.now(timezone.utc).isoformat(),
)
time.sleep(1.0)
def BulkUpload(
self,
request_iterator: Iterator[pb.ProductData],
context: ServicerContext,
) -> pb.UploadSummary:
"""Client streaming: recibe un stream de productos y devuelve un resumen."""
processed = failed = 0
for item in request_iterator:
if not item.product_id or item.price < 0:
failed += 1
continue
with _LOCK:
_PRODUCTS[item.product_id] = {
"name": item.name,
"stock": item.stock,
"price": item.price,
}
processed += 1
return pb.UploadSummary(processed=processed, failed=failed)
def Sync(
self,
request_iterator: Iterator[pb.SyncEvent],
context: ServicerContext,
) -> Generator[pb.SyncAck, None, None]:
"""Bidireccional: procesa eventos y emite acks en tiempo real."""
for event in request_iterator:
accepted = False
with _LOCK:
if event.event_type == "UPDATE" and event.product_id in _PRODUCTS:
_PRODUCTS[event.product_id]["stock"] = event.stock
accepted = True
elif event.event_type == "DELETE" and event.product_id in _PRODUCTS:
del _PRODUCTS[event.product_id]
accepted = True
yield pb.SyncAck(product_id=event.product_id, accepted=accepted)
# ── Arranque del servidor ─────────────────────────────────────────────────────
def serve() -> None:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[LoggingInterceptor()],
)
pb_grpc.add_InventoryServiceServicer_to_server(InventoryServicer(), server)
# En producción usa grpc.ssl_server_credentials() con certificados reales
server.add_insecure_port("[::]:50051")
server.start()
logger.info("gRPC server listening on port 50051")
server.wait_for_termination()
if __name__ == "__main__":
serve()
4. Cliente (client.py)
from __future__ import annotations
import grpc
import inventory_pb2 as pb
import inventory_pb2_grpc as pb_grpc
def run() -> None:
# insecure_channel es solo para desarrollo; en producción usa ssl_channel_credentials
with grpc.insecure_channel("localhost:50051") as channel:
stub = pb_grpc.InventoryServiceStub(channel)
# ── Unario ───────────────────────────────────────────────────────────
try:
product = stub.GetProduct(pb.ProductRequest(product_id="SKU-001"))
print(f"[Unario] {product.name} – stock: {product.stock}")
except grpc.RpcError as exc:
# exc.code() devuelve el StatusCode; nunca uses str(exc) para comparar
print(f"[Unario] Error {exc.code()}: {exc.details()}")
# ── Server streaming ─────────────────────────────────────────────────
print("[Server streaming] Watching SKU-002 for 3 seconds...")
stream = stub.WatchStock(pb.ProductRequest(product_id="SKU-002"))
for i, update in enumerate(stream):
print(f" stock={update.new_stock} at {update.timestamp}")
if i >= 2:
# cancel() envía una señal de cancelación al servidor;
# el servidor la detecta con context.is_active()
stream.cancel()
break
# ── Client streaming ─────────────────────────────────────────────────
def product_batch():
for pid, name, stock, price in [
("SKU-003", "New Widget", 100, 5.99),
("SKU-004", "Super Gadget", 50, 99.00),
("", "Invalid", 0, -1.0), # fallará validación
]:
yield pb.ProductData(
product_id=pid, name=name, stock=stock, price=price
)
summary = stub.BulkUpload(product_batch())
print(f"[Client streaming] processed={summary.processed} failed={summary.failed}")
# ── Bidireccional ─────────────────────────────────────────────────────
events = [
pb.SyncEvent(product_id="SKU-001", event_type="UPDATE", stock=100),
pb.SyncEvent(product_id="SKU-999", event_type="UPDATE", stock=5),
]
for ack in stub.Sync(iter(events)):
print(f"[Bidireccional] {ack.product_id} accepted={ack.accepted}")
if __name__ == "__main__":
run()
Lo que importa de cada decisión
El contrato .proto es código, no documentación. Los números de campo (= 1, = 2) son los identificadores reales de serialización. Protobuf nunca los escribe en el wire; un campo name = 2 serializado como int32 será deserializado como int32 por cualquier cliente aunque lo renombres a label. Esto es lo que hace que Protobuf sea retrocompatible con campos nuevos opcionales, pero también es la trampa: si reutilizas el número 2 para un campo de tipo diferente, obtienes corrupción silenciosa en tiempo de ejecución.
El ThreadPoolExecutor limita la concurrencia real. gRPC en Python todavía no tiene soporte nativo para asyncio en la misma madurez que las bindings de Go o Java (aunque grpc.aio existe). Con max_workers=10 tienes 10 threads simultáneos. Si cada request tarda 200ms esperando base de datos, tu throughput máximo es 50 RPS por instancia antes de que la cola empiece a crecer. Dimensiona esto en función de tu latencia P95, no de tu instinto.
context.is_active() en server streaming es obligatorio. Si el cliente cancela un stream y el servidor sigue generando eventos, esos eventos se descartan silenciosamente pero el thread del servidor sigue consumiendo recursos hasta que el bucle termina por sí solo. En flujos infinitos como WatchStock, esto es una fuga de goroutines… perdón, de threads.
El interceptor usa grpc.ServerInterceptor, que en la librería Python actual opera sobre handlers, no sobre métodos directamente. El patrón correcto es wrapping del handler devuelto por continuation. Fíjate en que solo hemos envuelto unary_unary en el ejemplo; en producción necesitas los cuatro casos (unary_unary, unary_stream, stream_unary, stream_stream) o usas una librería como grpc-interceptor que abstrae eso.
grpc.RpcError es la excepción base del cliente. Implementa code() y details(), que te dan el StatusCode y el mensaje de error del servidor respectivamente. Nunca hagas str(exc) para comparar el tipo de error; usa exc.code() == grpc.StatusCode.NOT_FOUND.
Errores que debes conocer
Error: Reutilizar un número de campo proto para un tipo diferente al evolucionar el schema, causando corrupción silenciosa en deserialización.
# ❌ Wrong — campo 3 era int32 stock, ahora se reutiliza para otra cosa
message ProductResponse {
string product_id = 1;
string name = 2;
string category = 3; // antes era: int32 stock = 3;
}
# ✅ Right — reserva el número antiguo y usa uno nuevo
message ProductResponse {
reserved 3; // protoc emitirá error si alguien lo usa
reserved "stock";
string product_id = 1;
string name = 2;
string category = 4;
}
La directiva reserved convierte lo que sería corrupción silenciosa en un error de compilación.
Error: Ignorar context.is_active() en streaming del servidor, dejando threads bloqueados cuando el cliente desconecta.
# ❌ Wrong — bucle infinito que ignora cancelación del cliente
def WatchStock(self, request, context):
while True:
yield get_update(request.product_id)
time.sleep(1.0)
# ✅ Right — respeta el ciclo de vida del stream
def WatchStock(self, request, context):
while context.is_active():
yield get_update(request.product_id)
time.sleep(1.0)
Cuando el cliente cancela, context.is_active() pasa a False en la siguiente iteración y el thread se libera limpiamente.
Error: Crear un channel por request en lugar de reutilizarlo, destruyendo el beneficio de multiplexación de HTTP/2.
# ❌ Wrong — nuevo channel (y nuevo TCP handshake + TLS) por cada llamada
def get_product(product_id: str) -> pb.ProductResponse:
with grpc.insecure_channel("localhost:50051") as channel:
return pb_grpc.InventoryServiceStub(channel).GetProduct(
pb.ProductRequest(product_id=product_id)
)
# ✅ Right — channel compartido como singleton o inyectado como dependencia
_CHANNEL = grpc.insecure_channel("localhost:50051")
_STUB = pb_grpc.InventoryServiceStub(_CHANNEL)
def get_product(product_id: str) -> pb.ProductResponse:
return _STUB.GetProduct(pb.ProductRequest(product_id=product_id))
Un channel gRPC es thread-safe y gestiona internamente un pool de conexiones HTTP/2 con multiplexación; crear uno por request anula esa arquitectura y degrada la latencia a niveles peores que REST.