python pipe subprocess named-pipes

python - Múltiples tuberías en subproceso



pipe subprocess (2)

Estoy tratando de usar Sailfish, que toma varios archivos fastq como argumentos, en una tubería de ruffus. Ejecuto Sailfish usando el módulo de subproceso en python, pero <() en la llamada de subproceso no funciona incluso cuando configuro shell=True .

Este es el comando que quiero ejecutar usando python:

sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]

o (preferiblemente):

sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]

Una generalización:

someprogram <(someprocess) <(someprocess)

¿Cómo haría para hacer esto en Python? ¿Es el subproceso el enfoque correcto?


Para emular la sustitución del proceso bash :

#!/usr/bin/env python from subprocess import check_call check_call(''someprogram <(someprocess) <(anotherprocess)'', shell=True, executable=''/bin/bash'')

En Python, podría usar tuberías con nombre:

#!/usr/bin/env python from subprocess import Popen with named_pipes(n=2) as paths: someprogram = Popen([''someprogram''] + paths) processes = [] for path, command in zip(paths, [''someprocess'', ''anotherprocess'']): with open(path, ''wb'', 0) as pipe: processes.append(Popen(command, stdout=pipe, close_fds=True)) for p in [someprogram] + processes: p.wait()

donde named_pipes(n) es:

import os import shutil import tempfile from contextlib import contextmanager @contextmanager def named_pipes(n=1): dirname = tempfile.mkdtemp() try: paths = [os.path.join(dirname, ''named_pipe'' + str(i)) for i in range(n)] for path in paths: os.mkfifo(path) yield paths finally: shutil.rmtree(dirname)

Otra forma más preferible (no es necesario crear una entrada con nombre en el disco) para implementar la sustitución del proceso bash es usar /dev/fd/N nombres de archivo (si están disponibles) como lo sugiere @Dunes . En FreeBSD, fdescfs(5) ( /dev/fd/# ) crea entradas para todos los descriptores de archivo abiertos por el proceso . Para probar la disponibilidad, ejecute:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

Si falla intente vincular /dev/fd a proc(5) como se hace en algunos Linux:

$ ln -s /proc/self/fd /dev/fd

Aquí está la implementación basada en /dev/fd del comando bash someprogram <(someprocess) <(anotherprocess) :

#!/usr/bin/env python3 from contextlib import ExitStack from subprocess import CalledProcessError, Popen, PIPE def kill(process): if process.poll() is None: # still running process.kill() with ExitStack() as stack: # for proper cleanup processes = [] for command in [[''someprocess''], [''anotherprocess'']]: # start child processes processes.append(stack.enter_context(Popen(command, stdout=PIPE))) stack.callback(kill, processes[-1]) # kill on someprogram exit fds = [p.stdout.fileno() for p in processes] someprogram = stack.enter_context( Popen([''someprogram''] + [''/dev/fd/%d'' % fd for fd in fds], pass_fds=fds)) for p in processes: # close pipes in the parent p.stdout.close() # exit stack: wait for processes if someprogram.returncode != 0: # errors shouldn''t go unnoticed raise CalledProcessError(someprogram.returncode, someprogram.args)

Nota: en mi máquina Ubuntu, el código del subprocess solo funciona en Python 3.4+, a pesar de que pass_fds está disponible desde Python 3.2.


Si bien JF Sebastian ha proporcionado una respuesta utilizando canalizaciones con nombre, es posible hacerlo con canalizaciones anónimas.

import shlex from subprocess import Popen, PIPE inputcmd0 = "zcat hello.gz" # gzipped file containing "hello" inputcmd1 = "zcat world.gz" # gzipped file containing "world" def get_filename(file_): return "/dev/fd/{}".format(file_.fileno()) def get_stdout_fds(*processes): return tuple(p.stdout.fileno() for p in processes) # setup producer processes inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE) inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE) # setup consumer process # pass input processes pipes by "filename" eg. /dev/fd/5 cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout), file1=get_filename(inputproc1.stdout)) print("command is:", cmd) # pass_fds argument tells Popen to let the child process inherit the pipe''s fds someprogram = Popen(shlex.split(cmd), stdout=PIPE, pass_fds=get_stdout_fds(inputproc0, inputproc1)) output, error = someprogram.communicate() for p in [inputproc0, inputproc1, someprogram]: p.wait() assert output == b"hello/nworld/n"