`subprocess.Popen` para streaming y control de procesos

Cuando subprocess.run() te queda corto, es porque necesitas algo que run() no puede darte: control granular sobre el tiempo de vida del proceso. subprocess.Popen es la API de bajo nivel sobre la que run() está construida. No bloquea mientras espera, no acumula todo el output en memoria antes de que lo veas, y te deja decidir exactamente cuándo leer, cuándo escribir, cuándo esperar y cuándo matar.

La diferencia fundamental es de modelo mental: run() es “ejecuta y dame el resultado cuando termine”. Popen es “arranca el proceso y dame un objeto con el que puedo interactuar mientras vive”.

Cuando declaras Popen(..., stdout=subprocess.PIPE), Python conecta el stdout del proceso hijo a un file descriptor que tú controlas desde el lado Python. Eso significa que puedes leer de él línea a línea con readline() a medida que el proceso produce output, sin esperar a que termine. Útil para procesos de larga duración: compiladores, scrapers, pipelines de datos, servidores que imprimen progreso.

El riesgo de hacerlo mal es real. Si lees de stdout y stderr en pipes separados de forma ingenua, puedes caer en deadlock: el proceso hijo llena el buffer de stderr, se bloquea esperando que alguien lo lea, tú estás bloqueado en stdout.read(), y ninguno avanza. communicate() existe precisamente para evitar eso cuando quieres el output completo de una vez, porque drena ambos pipes en threads internos.

shlex.split() merece mención especial: convierte "ffmpeg -i input.mp4 -c:v libx264 output.mp4" en la lista que Popen espera, respetando comillas y escapes como lo haría un shell POSIX, sin invocar un shell real. Más seguro y portátil que hacer split(" ") a mano.

import subprocess
import shlex
import sys
import time

def stream_long_process(command: str, input_data: bytes | None = None) -> int:
    """
    Ejecuta un comando mostrando su output en tiempo real.
    Devuelve el código de salida del proceso.
    """
    args = shlex.split(command)

    proc = subprocess.Popen(
        args,
        stdin=subprocess.PIPE if input_data else None,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    # Si hay input, enviarlo ahora para que el proceso pueda leerlo.
    # No usamos communicate() aquí porque queremos leer stdout en streaming.
    if input_data:
        proc.stdin.write(input_data)
        proc.stdin.close()  # señal de EOF al proceso hijo

    print("[streaming stdout]")
    for line in proc.stdout:
        # proc.stdout es un iterable que bloquea hasta que llega cada línea
        print(line.decode().rstrip())

    # proc.stdout llegó a EOF; el proceso probablemente ya terminó,
    # pero llamamos a wait() para asegurar que el OS libere el PID.
    try:
        proc.wait(timeout=5)
    except subprocess.TimeoutExpired:
        proc.kill()
        proc.wait()  # recolectar el zombie tras kill()

    # Capturar stderr completo solo después de que el proceso terminó
    stderr_output = proc.stderr.read()
    if stderr_output:
        print("[stderr]", stderr_output.decode(), file=sys.stderr)

    return proc.returncode


def run_with_timeout(command: str, timeout: int = 30) -> tuple[bytes, bytes, int]:
    """
    Ejecuta un comando con timeout estricto.
    Devuelve (stdout, stderr, returncode).
    """
    args = shlex.split(command)
    proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    try:
        stdout, stderr = proc.communicate(timeout=timeout)
    except subprocess.TimeoutExpired:
        proc.kill()
        stdout, stderr = proc.communicate()  # drenar pipes tras kill()
        print(f"Proceso terminado a la fuerza (timeout {timeout}s)", file=sys.stderr)

    return stdout, stderr, proc.returncode


def monitor_process(command: str) -> None:
    """
    Demuestra poll() para supervisión no bloqueante.
    """
    args = shlex.split(command)
    proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)

    while True:
        exit_code = proc.poll()  # None si sigue corriendo; int si terminó
        if exit_code is not None:
            print(f"Proceso terminado con código {exit_code}")
            break

        # Aquí podrías hacer otro trabajo mientras el proceso corre
        line = proc.stdout.readline()
        if line:
            print("→", line.decode().rstrip())
        else:
            time.sleep(0.1)


if __name__ == "__main__":
    # Streaming: ver output de un proceso largo línea a línea
    code = stream_long_process("ping -c 4 localhost")
    print(f"Exit code: {code}")

    print("---")

    # communicate() con timeout
    out, err, rc = run_with_timeout("ls -la /tmp", timeout=10)
    print(out.decode())

Lo que está pasando en cada decisión

El patrón for line in proc.stdout en stream_long_process funciona porque el file object de stdout es un iterable que bloquea en cada iteración hasta que el proceso escribe una línea completa o cierra el pipe. No estás polling activamente; el OS te despierta cuando hay datos. Eso es eficiente y simple a la vez.

Fíjate en el proc.stdin.close() después de escribir el input. Muchos programas leen stdin hasta EOF antes de empezar a procesar. Sin esa llamada, el proceso hijo puede quedarse bloqueado esperando más input que nunca va a llegar, mientras tú estás bloqueado en proc.stdout.readline(). Es el deadlock más fácil de no ver venir.

communicate() en run_with_timeout es la elección correcta cuando no necesitas streaming: drena stdout y stderr concurrentemente en threads internos, evitando el deadlock de buffers. El timeout le dice cuántos segundos esperar antes de lanzar TimeoutExpired. Importante: después de proc.kill(), hay que llamar communicate() de nuevo para drenar lo que quedó en los pipes y liberar recursos correctamente; si no lo haces, el proceso queda en estado zombie.

poll() en monitor_process es la herramienta para supervisión no bloqueante. Devuelve None mientras el proceso sigue vivo y el returncode en cuanto termina. Úsalo cuando necesitas seguir haciendo trabajo en Python mientras vigilas si el hijo terminó, en lugar de bloquearte en wait().

La distinción entre proc.terminate() y proc.kill() es la diferencia entre SIGTERM y SIGKILL en Unix. terminate() le pide al proceso que se cierre limpiamente (puede ignorarlo); kill() es irrevocable. La secuencia correcta para procesos que no responden es terminate() → espera un momento → si poll() sigue siendo Nonekill().

Errores que debes conocer

Error: Llamar proc.stdout.read() y proc.stderr.read() en secuencia cuando ambos son PIPE. El proceso hijo llena el buffer de stderr y se bloquea; tú estás atascado en stdout.read() esperando EOF que nunca llega.

# ❌ Wrong
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out = proc.stdout.read()   # bloquea aquí si stderr se llena
err = proc.stderr.read()

# ✅ Right
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()  # drena ambos concurrentemente

communicate() usa threads internos para leer stdout y stderr en paralelo, eliminando la condición de deadlock.


Error: Hacer streaming de stdout línea a línea mientras stderr=subprocess.PIPE también está abierto y el proceso escribe en él. Mismo problema: el buffer de stderr puede llenarse mientras tú lees stdout.

# ❌ Wrong
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
for line in proc.stdout:
    process(line)

# ✅ Right
proc = subprocess.Popen(
    args,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,  # mezcla stderr en stdout
    # o: stderr=subprocess.DEVNULL si no te importa
)
for line in proc.stdout:
    process(line)

stderr=subprocess.STDOUT redirige stderr al mismo pipe que stdout, así lees ambos en el mismo loop sin riesgo de deadlock.


Error: Olvidar proc.wait() (o proc.communicate()) después de que el proceso termina, dejando un proceso zombie en la tabla de procesos.

# ❌ Wrong
proc = subprocess.Popen(args)
proc.kill()
# proceso muerto pero no recolectado

# ✅ Right
proc = subprocess.Popen(args)
proc.kill()
proc.wait()  # libera el PID y los recursos del OS

Hasta que el padre llama a wait(), el proceso hijo permanece en la tabla de procesos como zombie consumiendo una entrada de PID; en procesos de larga duración esto puede agotar el límite del sistema.

196

Dejar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

Scroll al inicio