python - Múltiples tuberías en subproceso
pipe subprocess (2)
Estoy tratando de usar Sailfish, que toma varios archivos fastq como argumentos, en una tubería de ruffus.
Ejecuto Sailfish usando el módulo de subproceso en python, pero
<()
en la llamada de subproceso no funciona incluso cuando configuro
shell=True
.
Este es el comando que quiero ejecutar usando python:
sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]
o (preferiblemente):
sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]
Una generalización:
someprogram <(someprocess) <(someprocess)
¿Cómo haría para hacer esto en Python? ¿Es el subproceso el enfoque correcto?
Para emular la sustitución del proceso bash :
#!/usr/bin/env python
from subprocess import check_call
check_call(''someprogram <(someprocess) <(anotherprocess)'',
shell=True, executable=''/bin/bash'')
En Python, podría usar tuberías con nombre:
#!/usr/bin/env python
from subprocess import Popen
with named_pipes(n=2) as paths:
someprogram = Popen([''someprogram''] + paths)
processes = []
for path, command in zip(paths, [''someprocess'', ''anotherprocess'']):
with open(path, ''wb'', 0) as pipe:
processes.append(Popen(command, stdout=pipe, close_fds=True))
for p in [someprogram] + processes:
p.wait()
donde
named_pipes(n)
es:
import os
import shutil
import tempfile
from contextlib import contextmanager
@contextmanager
def named_pipes(n=1):
dirname = tempfile.mkdtemp()
try:
paths = [os.path.join(dirname, ''named_pipe'' + str(i)) for i in range(n)]
for path in paths:
os.mkfifo(path)
yield paths
finally:
shutil.rmtree(dirname)
Otra forma más preferible (no es necesario crear una entrada con nombre en el disco) para implementar la sustitución del proceso bash es usar
/dev/fd/N
nombres de archivo (si están disponibles) como lo
sugiere @Dunes
.
En FreeBSD,
fdescfs(5)
(
/dev/fd/#
) crea entradas para todos los descriptores de archivo abiertos por el proceso
.
Para probar la disponibilidad, ejecute:
$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available
Si falla
intente vincular
/dev/fd
a
proc(5)
como se hace en algunos Linux:
$ ln -s /proc/self/fd /dev/fd
Aquí está la implementación basada en
/dev/fd
del comando bash
someprogram <(someprocess) <(anotherprocess)
:
#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE
def kill(process):
if process.poll() is None: # still running
process.kill()
with ExitStack() as stack: # for proper cleanup
processes = []
for command in [[''someprocess''], [''anotherprocess'']]: # start child processes
processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
stack.callback(kill, processes[-1]) # kill on someprogram exit
fds = [p.stdout.fileno() for p in processes]
someprogram = stack.enter_context(
Popen([''someprogram''] + [''/dev/fd/%d'' % fd for fd in fds], pass_fds=fds))
for p in processes: # close pipes in the parent
p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn''t go unnoticed
raise CalledProcessError(someprogram.returncode, someprogram.args)
Nota: en mi máquina Ubuntu, el código del
subprocess
solo funciona en Python 3.4+, a pesar de que
pass_fds
está disponible desde Python 3.2.
Si bien JF Sebastian ha proporcionado una respuesta utilizando canalizaciones con nombre, es posible hacerlo con canalizaciones anónimas.
import shlex
from subprocess import Popen, PIPE
inputcmd0 = "zcat hello.gz" # gzipped file containing "hello"
inputcmd1 = "zcat world.gz" # gzipped file containing "world"
def get_filename(file_):
return "/dev/fd/{}".format(file_.fileno())
def get_stdout_fds(*processes):
return tuple(p.stdout.fileno() for p in processes)
# setup producer processes
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE)
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE)
# setup consumer process
# pass input processes pipes by "filename" eg. /dev/fd/5
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout),
file1=get_filename(inputproc1.stdout))
print("command is:", cmd)
# pass_fds argument tells Popen to let the child process inherit the pipe''s fds
someprogram = Popen(shlex.split(cmd), stdout=PIPE,
pass_fds=get_stdout_fds(inputproc0, inputproc1))
output, error = someprogram.communicate()
for p in [inputproc0, inputproc1, someprogram]:
p.wait()
assert output == b"hello/nworld/n"