sirve que para examples ejemplos python ipc pipe subprocess blocking

python - que - subprocess popen()



bloques-enviar entrada a la tubería de subproceso de python (11)

Estoy probando subprocesos con Pipelines. Soy consciente de que puedo hacer lo que hacen los programas a continuación en Python directamente, pero ese no es el punto. Solo quiero probar la tubería para saber cómo usarla.

Mi sistema es Linux Ubuntu 9.04 con el pitón predeterminado 2.6.

Empecé con este ejemplo de documentación .

from subprocess import Popen, PIPE p1 = Popen(["grep", "-v", "not"], stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) output = p2.communicate()[0] print output

Eso funciona, pero como el stdin p1 no se está redirigiendo, tengo que escribir cosas en la terminal para alimentar la tubería. Cuando escribo ^D cierre stdin, obtengo el resultado que quiero.

Sin embargo, quiero enviar datos a la tubería utilizando una variable de cadena de python. Primero intenté escribir en stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) p1.stdin.write(''test/n'') output = p2.communicate()[0] # blocks forever here

No funcionó Intenté usar p2.stdout.read() lugar en la última línea, pero también bloquea. p1.stdin.flush() y p1.stdin.close() pero tampoco funcionó. Entonces me moví para comunicarme:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) p1.communicate(''test/n'') # blocks forever here output = p2.communicate()[0]

Entonces eso no es todo.

Noté que ejecutar un único proceso (como p1 arriba, eliminar p2 ) funciona perfectamente. Y pasar un identificador de archivo a p1 ( stdin=open(...) ) también funciona. Entonces el problema es:

¿Es posible pasar datos a una canalización de 2 o más subprocesos en python, sin bloqueo? Por qué no?

Soy consciente de que podría ejecutar un caparazón y ejecutar la tubería en el caparazón, pero eso no es lo que quiero.

ACTUALIZACIÓN 1 : siguiendo la sugerencia de Aaron Digulla a continuación, ahora estoy tratando de usar hilos para que funcione.

Primero, intenté ejecutar p1.communicate en un hilo.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) t = threading.Thread(target=p1.communicate, args=(''some data/n'',)) t.start() output = p2.communicate()[0] # blocks forever here

De acuerdo, no funcionó. Intenté otras combinaciones como cambiarlo a .write() y también p2.read() . Nada. Ahora probemos el enfoque opuesto:

def get_output(subp): output = subp.communicate()[0] # blocks on thread print ''GOT:'', output p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) t = threading.Thread(target=get_output, args=(p2,)) t.start() p1.communicate(''data/n'') # blocks here. t.join()

el código termina bloqueando en alguna parte. Ya sea en el hilo generado, o en el hilo principal, o en ambos. Entonces no funcionó. Si sabe cómo hacerlo funcionar, sería más fácil si puede proporcionar un código de trabajo. Estoy intentando aquí.

ACTUALIZACIÓN 2

Paul Du Bois respondió a continuación con algo de información, así que hice más pruebas. He leído todo el subprocess.py module y cómo funciona. Así que traté de aplicar exactamente eso al código.

Estoy en Linux, pero como estaba probando con hilos, mi primer enfoque fue replicar el código de enhebrado de Windows exacto visto en el método de communicate() subprocess.py , pero para dos procesos en lugar de uno. Aquí está la lista completa de lo que probé:

import os from subprocess import Popen, PIPE import threading def get_output(fobj, buffer): while True: chunk = fobj.read() # BLOCKS HERE if not chunk: break buffer.append(chunk) p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) b = [] # create a buffer t = threading.Thread(target=get_output, args=(p2.stdout, b)) t.start() # start reading thread for x in xrange(100000): p1.stdin.write(''hello world/n'') # write data p1.stdin.flush() p1.stdin.close() # close input... t.join()

Bien. No funcionó. Incluso después de p1.stdin.close() , p2.stdout.read() sigue bloqueando.

Luego probé el código posix en subprocess.py :

import os from subprocess import Popen, PIPE import select p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) numwrites = 100000 to_read = [p2.stdout] to_write = [p1.stdin] b = [] # create buffer while to_read or to_write: read_now, write_now, xlist = select.select(to_read, to_write, []) if read_now: data = os.read(p2.stdout.fileno(), 1024) if not data: p2.stdout.close() to_read = [] else: b.append(data) if write_now: if numwrites > 0: numwrites -= 1 p1.stdin.write(''hello world!/n''); p1.stdin.flush() else: p1.stdin.close() to_write = [] print b

También bloquea en select.select() . Al difundir las print , descubrí esto:

  • La lectura está funcionando. El código lee muchas veces durante la ejecución.
  • Escribir también está funcionando. Los datos se escriben en p1.stdin .
  • Al final de numwrites , se llama p1.stdin.close() .
  • Cuando select() comienza a bloquear, solo to_read tiene algo, p2.stdout . to_write ya está vacío.
  • os.read() llamada a os.read() siempre devuelve algo, por lo que p2.stdout.close() nunca se llama.

Conclusión de ambas pruebas : cerrar el stdin del primer proceso en la tubería ( grep en el ejemplo) no lo hace volcar su salida de buffer a la siguiente y morir.

¿No hay forma de hacerlo funcionar?

PD: No quiero usar un archivo temporal, ya he probado con archivos y sé que funciona. Y no quiero usar Windows.


Trabajando con archivos grandes

Se deben aplicar dos principios de manera uniforme al trabajar con archivos de gran tamaño en Python.

  1. Como cualquier rutina IO puede bloquearse, debemos mantener cada etapa de la canalización en una secuencia o proceso diferente . Usamos hilos en este ejemplo, pero los subprocesos le permiten evitar el GIL.
  2. Debemos usar lecturas y escrituras incrementales para que no esperemos EOF antes de comenzar a avanzar.

Una alternativa es usar IO no bloqueante, aunque esto es engorroso en Python estándar. Consulte gevent para una biblioteca de threading liviana que implementa la API de IO síncrona utilizando primitivas sin bloqueo.

Código de ejemplo

Construiremos una tubería tonta que sea aproximadamente

{cat /usr/share/dict/words} | grep -v not / | {upcase, filtered tee to stderr} | cut -c 1-10 / | {translate ''E'' to ''3''} | grep K | grep Z | {downcase}

donde cada etapa entre llaves {} se implementa en Python, mientras que los demás usan programas externos estándar. TL; DR: Vea esta esencia .

Comenzamos con las importaciones esperadas.

#!/usr/bin/env python from subprocess import Popen, PIPE import sys, threading

Etapas Python de la tubería

Todas, excepto la última etapa implementada por Python de la canalización, deben ir en un hilo para que su IO no bloquee a las demás. En su lugar, podrían ejecutarse en subprocesos de Python si desea que se ejecuten en paralelo (evite el GIL).

def writer(output): for line in open(''/usr/share/dict/words''): output.write(line) output.close() def filter(input, output): for line in input: if ''k'' in line and ''z'' in line: # Selective ''tee'' sys.stderr.write(''### '' + line) output.write(line.upper()) output.close() def leeter(input, output): for line in input: output.write(line.replace(''E'', ''3'')) output.close()

Cada uno de estos debe ser puesto en su propio hilo, lo cual haremos usando esta función de conveniencia.

def spawn(func, **kwargs): t = threading.Thread(target=func, kwargs=kwargs) t.start() return t

Crear la tubería

Cree las etapas externas usando las etapas Popen y Python usando spawn . El argumento bufsize=-1 dice que use el buffering del sistema por defecto (generalmente 4 kib). En general, esto es más rápido que el almacenamiento en búfer predeterminado (sin búfer) o de línea, pero querrá un búfer de línea si desea supervisar visualmente el resultado sin demoras.

grepv = Popen([''grep'',''-v'',''not''], stdin=PIPE, stdout=PIPE, bufsize=-1) cut = Popen([''cut'',''-c'',''1-10''], stdin=PIPE, stdout=PIPE, bufsize=-1) grepk = Popen([''grep'', ''K''], stdin=PIPE, stdout=PIPE, bufsize=-1) grepz = Popen([''grep'', ''Z''], stdin=grepk.stdout, stdout=PIPE, bufsize=-1) twriter = spawn(writer, output=grepv.stdin) tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin) tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Conduce la tubería

Montado como se indica arriba, todos los buffers en la tubería se llenarán, pero dado que nadie está leyendo desde el final ( grepz.stdout ), todos bloquearán. Podríamos leer todo el asunto en una sola llamada a grepz.stdout.read() , pero eso usaría mucha memoria para archivos grandes. En cambio, leemos incrementalmente .

for line in grepz.stdout: sys.stdout.write(line.lower())

Los hilos y procesos se limpian una vez que llegan a EOF . Podemos limpiar explícitamente usando

for t in [twriter, tfilter, tleeter]: t.join() for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 y versiones anteriores

Internamente, subprocess.Popen Llamadas de subprocess.Popen fork , configura los descriptores de archivos de pipa y llama a exec . El proceso secundario de fork tiene copias de todos los descriptores de archivos en el proceso principal, y ambas copias deberán cerrarse antes de que el lector correspondiente obtenga EOF . Esto se puede solucionar cerrando manualmente las canalizaciones (ya sea por close_fds=True o un argumento preexec_fn adecuado para subprocess.Popen ) o estableciendo el indicador FD_CLOEXEC para que exec cierre automáticamente el descriptor de archivo. Esta bandera se establece automáticamente en Python-2.7 y posterior, ver issue12786 . Podemos obtener el comportamiento de Python-2.7 en versiones anteriores de Python llamando

p._set_cloexec_flags(p.stdin)

antes de pasar p.stdin como argumento a un subsiguiente subprocess.Popen . subprocess.Popen .


Creo que puedes estar examinando el problema equivocado. Ciertamente, como Aaron dice que si intentas ser un productor al principio de una tubería y un consumidor del final de la tubería, es fácil entrar en una situación de estancamiento. Este es el problema que comunica () resuelve.

comunicar () no es exactamente correcto para usted, ya que stdin y stdout están en diferentes subprocesos; pero si echas un vistazo a la implementación en subprocess.py verás que hace exactamente lo que Aaron sugirió.

Una vez que vea que se comunican tanto las lecturas como las escrituras, verá que en su segundo intento, la función de comunicación () compite con p2 para la salida de p1:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) # ... p1.communicate(''data/n'') # reads from p1.stdout, as does p2

Me estoy ejecutando en win32, que definitivamente tiene diferentes características de E / S y almacenamiento en búfer, pero esto funciona para mí:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) t = threading.Thread(target=get_output, args=(p2,)) t.start() p1.stdin.write(''hello world/n'' * 100000) p1.stdin.close() t.join()

Sintonicé el tamaño de la entrada para producir un interbloqueo al usar un p2.read () sin rosca ingenuo

También puede tratar de almacenar en un archivo, por ejemplo,

fd, _ = tempfile.mkstemp() os.write(fd, ''hello world/r/n'' * 100000) os.lseek(fd, 0, os.SEEK_SET) p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) print p2.stdout.read()

Eso también funciona para mí sin interbloqueos.


Debe hacer esto en varios hilos. De lo contrario, terminará en una situación donde no puede enviar datos: child p1 no leerá su entrada ya que p2 no lee la salida de p1 porque no lee la salida de p2.

Entonces necesitas un hilo de fondo que lea lo que p2 escribe. Eso permitirá que p2 continúe después de escribir algunos datos en la tubería, para que pueda leer la siguiente línea de entrada desde p1, lo que nuevamente le permite a p1 procesar los datos que le envía.

Alternativamente, puede enviar los datos a p1 con un hilo de fondo y leer los resultados de p2 en el hilo principal. Pero cualquiera de los lados debe ser un hilo.


Descubrí cómo hacerlo.

No se trata de hilos, y no de select ().

Cuando ejecuto el primer proceso ( grep ), crea dos descriptores de archivos de bajo nivel, uno para cada tubería. Llamemos a esos a y b .

Cuando ejecuto el segundo proceso, b pasa a cut sdtin . Pero hay un valor predeterminado de muerte cerebral en Popen - close_fds=False .

El efecto de eso es que el cut también hereda a . Entonces, grep no puede morir incluso si cierro a , porque stdin todavía está abierto en el proceso de cut (el cut ignora).

El siguiente código ahora funciona perfectamente.

from subprocess import Popen, PIPE p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True) p1.stdin.write(''Hello World/n'') p1.stdin.close() result = p2.stdout.read() assert result == "Hello Worl/n"

close_fds=True DEBERÍA SER EL PREDETERMINADO en los sistemas Unix. En Windows, cierra todos los fds, por lo que evita las tuberías.

EDITAR:

PD: Para personas con un problema similar al leer esta respuesta: como dijo pooryorick en un comentario, eso también podría bloquear si los datos escritos en p1.stdin son más grandes que los almacenamientos intermedios. En ese caso, debe dividir los datos en fragmentos más pequeños y usar select.select() para saber cuándo leer / escribir. El código en la pregunta debería dar una pista sobre cómo implementar eso.

EDIT2: Encontré otra solución, con más ayuda de pooryorick: en lugar de usar close_fds=True y cerrar TODOS los fds, uno podría cerrar el fd s que pertenece al primer proceso, al ejecutar el segundo, y funcionará. El cierre debe hacerse en el niño, por lo que la función preexec_fn de Popen es muy útil para hacer eso. Al ejecutar p2 puedes hacer:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)


En uno de los comentarios anteriores, le pedí a nosklo que publicara algún código para respaldar sus afirmaciones sobre select.select o para votar mis respuestas que había votado anteriormente. Él respondió con el siguiente código:

from subprocess import Popen, PIPE import select p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True) data_to_write = 100000 * ''hello world/n'' to_read = [p2.stdout] to_write = [p1.stdin] b = [] # create buffer written = 0 while to_read or to_write: read_now, write_now, xlist = select.select(to_read, to_write, []) if read_now: data = p2.stdout.read(1024) if not data: p2.stdout.close() to_read = [] else: b.append(data) if write_now: if written < len(data_to_write): part = data_to_write[written:written+1024] written += len(part) p1.stdin.write(part); p1.stdin.flush() else: p1.stdin.close() to_write = [] print b

Un problema con este script es que adivina el tamaño / naturaleza de los búferes de canal del sistema. El script experimentaría menos fallas si pudiera eliminar números mágicos como 1024.

El gran problema es que este código de script solo funciona de manera consistente con la combinación correcta de entrada de datos y programas externos. grep y cut funcionan con líneas, por lo que sus búferes internos se comportan de forma un poco diferente. Si utilizamos un comando más genérico como "cat" y escribimos bits de datos más pequeños en la tubería, la condición de carrera fatal aparecerá con mayor frecuencia:

from subprocess import Popen, PIPE import select import time p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True) data_to_write = ''hello world/n'' to_read = [p2.stdout] to_write = [p1.stdin] b = [] # create buffer written = 0 while to_read or to_write: time.sleep(1) read_now, write_now, xlist = select.select(to_read, to_write, []) if read_now: print ''I am reading now!'' data = p2.stdout.read(1024) if not data: p1.stdout.close() to_read = [] else: b.append(data) if write_now: print ''I am writing now!'' if written < len(data_to_write): part = data_to_write[written:written+1024] written += len(part) p1.stdin.write(part); p1.stdin.flush() else: print ''closing file'' p1.stdin.close() to_write = [] print b

En este caso, se manifestarán dos resultados diferentes:

write, write, close file, read -> success write, read -> hang

Así que, de nuevo, desafío a nosklo ya sea a un código postal que muestre el uso de select.select para manejar entradas arbitrarias y almacenamiento en búfer de canalización desde un único hilo, o para modificar mis respuestas.

En pocas palabras: no intente manipular ambos extremos de una tubería de un solo hilo. No vale la pena. Vea la pypi.python.org/pypi/pipeline/0.1 para un buen ejemplo de bajo nivel de cómo hacer esto correctamente.


Hay tres trucos principales para hacer que las tuberías funcionen como se esperaba

  1. Asegúrese de que cada extremo de la tubería se use en un subproceso / proceso diferente (algunos de los ejemplos cerca de la parte superior tienen este problema).

  2. cerrar explícitamente el extremo no utilizado de la tubería en cada proceso

  3. tratar con el almacenamiento en búfer deshabilitándolo (opción Python -u), usando pty, o simplemente llenando el búfer con algo que no afectará los datos (tal vez ''/ n'', pero lo que sea que encaje).

Los ejemplos en el módulo "pipeline" de Python (soy el autor) se ajustan exactamente a su escenario y hacen que los pasos de bajo nivel sean bastante claros.

http://pypi.python.org/pypi/pipeline/

Más recientemente, utilicé el módulo de subproceso como parte de un patrón productor-procesador-consumidor-controlador:

http://www.darkarchive.org/w/Pub/PythonInteract

Este ejemplo trata sobre stdin almacenado sin recurrir al uso de pty, y también ilustra qué extremos de tubería deben estar cerrados en donde. Prefiero los procesos a enhebrar, pero el principio es el mismo. Además, ilustra la sincronización de colas para alimentar al productor y recopilar resultados del consumidor, y cómo cerrarlos limpiamente (tenga cuidado con los centinelas insertados en las colas). Este patrón permite que se genere nueva entrada en función de la producción reciente, lo que permite el descubrimiento recursivo y el procesamiento.


La solución ofrecida por Nosklo se romperá rápidamente si se escriben demasiados datos en el extremo receptor de la tubería:

from subprocess import Popen, PIPE p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True) p1.stdin.write(''Hello World/n'' * 20000) p1.stdin.close() result = p2.stdout.read() assert result == "Hello Worl/n"

Si este script no se cuelga en su máquina, simplemente aumente "20000" a algo que exceda el tamaño de los buffers de tubería de su sistema operativo.

Esto se debe a que el sistema operativo está almacenando la entrada en "grep", pero una vez que ese búfer está lleno, la llamada p1.stdin.write se bloqueará hasta que algo se lea desde p2.stdout . En los escenarios de juguetes, puede obtener la forma de escribir / leer desde un tubo en el mismo proceso, pero en el uso normal, es necesario escribir desde un hilo / proceso y leer desde un hilo / proceso separado. Esto es cierto para subprocess.popen, os.pipe, os.popen *, etc.

Otro giro es que a veces se desea seguir alimentando la tubería con elementos generados a partir de la salida anterior de la misma tubería. La solución es hacer que tanto el alimentador de tubería como el lector de tubería sean asíncronos al programa hombre, e implementar dos colas: una entre el programa principal y el alimentador de tubería y otra entre el programa principal y el lector de tubería. http://www.darkarchive.org/w/Pub/PythonInteract es un ejemplo de eso.

El subproceso es un buen modelo de conveniencia, pero debido a que oculta los detalles de las llamadas os.popen y os.fork que hace bajo el capó, a veces puede ser más difícil de manejar que las llamadas de bajo nivel que utiliza. Por esta razón, el subproceso no es una buena forma de aprender cómo funcionan realmente las tuberías entre procesos.


Respondiendo a la afirmación de nosklo (ver otros comentarios a esta pregunta) que no se puede hacer sin close_fds=True :

close_fds=True solo es necesario si ha dejado abiertos otros descriptores de archivos. Al abrir varios procesos secundarios, siempre es bueno hacer un seguimiento de los archivos abiertos que pueden heredarse, y cerrar explícitamente los que no sean necesarios:

from subprocess import Popen, PIPE p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE) p1.stdin.write(''Hello World/n'') p1.stdin.close() p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE) result = p2.stdout.read() assert result == "Hello Worl/n"

close_fds predeterminado en False porque el subproceso prefiere confiar en el programa de llamadas para saber qué está haciendo con los descriptores de archivos abiertos, y solo proporciona a quien llama una opción fácil de cerrarlos todos si eso es lo que quiere hacer.

Pero el problema real es que los amortiguadores de tuberías te morderán por todos menos por ejemplos de juguetes. Como ya dije en mis otras respuestas a esta pregunta, la regla de oro es no abrir el lector y el escritor en el mismo proceso / hilo. Cualquiera que quiera usar el módulo de subproceso para la comunicación bidireccional estaría bien servido para estudiar os.pipe y os.fork, primero. De hecho, no son tan difíciles de usar si tienes un pypi.python.org/pypi/pipeline/0.1 para mirar.


¡Es mucho más simple de lo que piensas!

import sys from subprocess import Popen, PIPE # Pipe the command here. It will read from stdin. # So cat a file, to stdin, like (cat myfile | ./this.py), # or type on terminal and hit control+d when done, etc # No need to handle this yourself, that''s why we have shell''s! p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE) nextData = None while True: nextData = p.stdout.read() if nextData in (b'''', ''''): break sys.stdout.write ( nextData.decode(''utf-8'') ) p.wait()

Este código está escrito para python 3.6 y funciona con python 2.7.

Úselo como:

cat README.md | python ./example.py

o

python example.py < README.md

Para canalizar el contenido de "README.md" a este programa.

Pero ... en este punto, ¿por qué no simplemente usar "cat" directamente, y canalizar la salida como desee? me gusta:

cat filename | grep -v not | cut -c 1-10

escrito en la consola también hará el trabajo. Personalmente, solo usaría la opción de código si estuviera procesando el resultado, de lo contrario, un script de shell sería más fácil de mantener y conservar.

Simplemente, usa el caparazón para hacer las tuberías por ti. En uno, por el otro. Eso es lo que hará GRANDE al hacer, gestionar procesos y gestionar cadenas de entrada y salida de un solo ancho. Algunos lo llamarían la mejor característica no interactiva de la caparazón ...



Aquí hay un ejemplo del uso de Popen junto con os.fork para lograr lo mismo. En lugar de usarlo close_fds, solo cierra las tuberías en los lugares correctos. Mucho más simple que tratar de usar select.select, y aprovecha al máximo los búferes de tuberías del sistema.

from subprocess import Popen, PIPE import os import sys p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE) pid = os.fork() if pid: #parent p1.stdin.close() p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE) data = p2.stdout.read() sys.stdout.write(data) p2.stdout.close() else: #child data_to_write = ''hello world/n'' * 100000 p1.stdin.write(data_to_write) p1.stdin.close()