ejemplos - multitarea en python
¿Cómo generar procesos secundarios paralelos en un sistema multiprocesador? (4)
Aquí está la solución que encontré, basada en los comentarios de Nadia y Jim. No estoy seguro de si es la mejor manera, pero funciona. El script secundario original al que se llama debe ser un script de shell porque necesito usar algunas aplicaciones de terceros, incluido Matlab. Así que tuve que sacarlo de Python y codificarlo en bash.
import sys
import os
import multiprocessing
import subprocess
def work(staname):
print ''Processing station:'',staname
print ''Parent process:'', os.getppid()
print ''Process id:'', os.getpid()
cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
return subprocess.call(cmd, shell=False)
if __name__ == ''__main__'':
my_list = [ ''XYZ'', ''ABC'', ''NYU'' ]
my_list.sort()
print my_list
# Get the number of processors available
num_processes = multiprocessing.cpu_count()
threads = []
len_stas = len(my_list)
print "+++ Number of stations to process: %s" % (len_stas)
# run until all the threads are done, and there is no data left
for list_item in my_list:
# if we aren''t using all the processors AND there is still data left to
# compute, then spawn another thread
if( len(threads) < num_processes ):
p = multiprocessing.Process(target=work,args=[list_item])
p.start()
print p, p.is_alive()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
¿Parece esto una solución razonable? Intenté usar el formato de bucle while de Jim, pero mi script simplemente no devolvió nada. No estoy seguro de por qué sería eso. Aquí está la salida cuando ejecuto el script con el bucle ''while'' de Jim reemplazando el bucle ''for'':
hostname{me}2% controller.py
[''ABC'', ''NYU'', ''XYZ'']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%
Cuando lo ejecuto con el bucle ''for'', obtengo algo más significativo:
hostname{me}6% controller.py
[''ABC'', ''NYU'', ''XYZ'']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%
Así que esto funciona, y estoy feliz. Sin embargo, todavía no entiendo por qué no puedo usar el bucle de estilo ''while'' de Jim en lugar del bucle ''for'' que estoy usando. Gracias por toda la ayuda. Estoy impresionado con la amplitud de know-stackoverflow.
Tengo un script de Python que quiero usar como controlador para otro script de Python. Tengo un servidor con 64 procesadores, por lo que quiero generar hasta 64 procesos secundarios de este segundo script de Python. El script hijo se llama:
$ python create_graphs.py --name=NAME
donde NOMBRE es algo como XYZ, ABC, NYU, etc.
En mi script del controlador principal recupero la variable de nombre de una lista:
my_list = [ ''XYZ'', ''ABC'', ''NYU'' ]
Entonces, mi pregunta es, ¿cuál es la mejor manera de generar estos procesos como niños? Quiero limitar el número de niños a 64 a la vez, así que necesito hacer un seguimiento del estado (si el proceso hijo ha finalizado o no) para poder mantener a toda la generación en funcionamiento.
Estudié el uso del paquete de subproceso, pero lo rechacé porque solo genera un hijo a la vez. Finalmente encontré el paquete multiprocesador, pero admito que me siento abrumado por la documentación de todos los subprocesos y subprocesos.
En este momento, mi script usa subprocess.call
para generar solo un hijo a la vez y se ve así:
#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process
my_list = [ ''XYZ'', ''ABC'', ''NYU'' ]
if __name__ == ''__main__'':
processors = multiprocessing.cpu_count()
for i in range(len(my_list)):
if( i < processors ):
cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
child = subprocess.call( cmd, shell=False )
Realmente quiero engendrar 64 niños a la vez. En otras preguntas de stackoverflow vi a gente usando la cola, pero parece que eso crea un impacto en el rendimiento.
Definitivamente usaría multiprocessing lugar de lanzar mi propia solución usando subproceso.
Lo que está buscando es la clase de grupo de procesos en multiprocesamiento.
import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == ''__main__'':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, [''ls''] * count)
Y aquí hay un ejemplo de cálculo para que sea más fácil de entender. Lo siguiente dividirá 10000 tareas en N procesos donde N es el recuento de CPU. Tenga en cuenta que estoy pasando Ninguno como el número de procesos. Esto hará que la clase Pool use cpu_count para el número de procesos ( reference )
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == ''__main__'':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
No creo que necesites una cola a menos que tengas la intención de sacar datos de las aplicaciones (lo cual, si quieres datos, creo que podría ser más fácil agregarlos a una base de datos)
pero intente esto en tamaño
ponga el contenido de su script create_graphs.py en una función llamada "create_graphs"
import threading
from create_graphs import create_graphs
num_processes = 64
my_list = [ ''XYZ'', ''ABC'', ''NYU'' ]
threads = []
# run until all the threads are done, and there is no data left
while threads or my_list:
# if we aren''t using all the processors AND there is still data left to
# compute, then spawn another thread
if (len(threads) < num_processes) and my_list:
t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
t.setDaemon(True)
t.start()
threads.append(t)
# in the case that we have the maximum number of threads check if any of them
# are done. (also do this when we run out of data, until all the threads are done)
else:
for thread in threads:
if not thread.isAlive():
threads.remove(thread)
Sé que esto resultará en 1 menos subprocesos que procesadores, lo que probablemente sea bueno, deja un procesador para administrar los subprocesos, el disco de E / S y otras cosas que suceden en la computadora. Si decides utilizar el último núcleo, solo tienes que añadir uno.
edición : creo que puedo haber malinterpretado el propósito de my_list. No necesita my_list
para realizar un seguimiento de los subprocesos (ya que todos los elementos de la lista de threads
referencia a ellos). Pero esta es una buena forma de alimentar la entrada de procesos, o incluso mejor: usar una función de generador;)
El propósito de my_list
y threads
my_list
contiene los datos que necesita procesar en su función
threads
es solo una lista de los hilos actualmente en ejecución
el bucle while hace dos cosas, inicia nuevos subprocesos para procesar los datos y comprueba si se ha ejecutado algún subproceso.
Entonces, siempre que tenga (a) más datos para procesar o (b) subprocesos que no hayan terminado de ejecutarse ... desea que el programa continúe ejecutándose. Una vez que ambas listas estén vacías, se evaluarán como False
y el bucle while saldrá