paralelizar multiprocesamiento python multiprocessing

paralelizar - multiprocesamiento en python



Uso del módulo de multiprocesamiento de Python para ejecutar ejecuciones simultáneas e independientes del modelo SEAWAT/MODFLOW (2)

Esta es mi manera de mantener el mínimo número de subprocesos en la memoria. Es una combinación de módulos de subprocesos y multiprocesamiento. Puede ser inusual que otras técnicas, como los miembros respetados que han explicado anteriormente, PERO puedan valer mucho. En aras de la explicación, me arriesgo a rastrear un mínimo de 5 sitios web a la vez.

asi que aqui esta:-

#importing dependencies. from multiprocessing import Process from threading import Thread import threading # Crawler function def crawler(domain): # define crawler technique here. output.write(scrapeddata + "/n") pass

La siguiente es la función threadController. Esta función controlará el flujo de hilos a la memoria principal. Seguirá activando los subprocesos para mantener el límite "mínimo" de threadNum, es decir. 5. Tampoco se cerrará hasta que todos los subprocesos activos (acitveCount) hayan finalizado.

Mantendrá un mínimo de subprocesos de la función threadNum (5) startProcess (estos subprocesos eventualmente iniciarán los Procesos desde la lista de procesos mientras se unen a ellos con un tiempo de 60 segundos). Después de iniciar el threadController, habría 2 hilos que no están incluidos en el límite anterior de 5, es decir. El hilo principal y el hilo threadController en sí. por eso se ha utilizado threading.activeCount ()! = 2.

def threadController(): print "Thread count before child thread starts is:-", threading.activeCount(), len(processList) # staring first thread. This will make the activeCount=3 Thread(target = startProcess).start() # loop while thread List is not empty OR active threads have not finished up. while len(processList) != 0 or threading.activeCount() != 2: if (threading.activeCount() < (threadNum + 2) and # if count of active threads are less than the Minimum AND len(processList) != 0): # processList is not empty Thread(target = startProcess).start() # This line would start startThreads function as a seperate thread **

La función startProcess, como un hilo separado, iniciaría los procesos desde la lista de procesos. El propósito de esta función (** comenzó como un hilo diferente) es que se convertiría en un hilo padre para los Procesos. Por lo tanto, cuando se unirá a ellos con un tiempo de espera de 60 segundos, esto detendrá el subproceso startProcess para avanzar, pero esto no detendrá la ejecución del threadController. De esta manera, threadController funcionará según sea necesario.

def startProcess(): pr = processList.pop(0) pr.start() pr.join(60.00) # joining the thread with time out of 60 seconds as a float. if __name__ == ''__main__'': # a file holding a list of domains domains = open("Domains.txt", "r").read().split("/n") output = open("test.txt", "a") processList = [] # thread list threadNum = 5 # number of thread initiated processes to be run at one time # making process List for r in range(0, len(domains), 1): domain = domains[r].strip() p = Process(target = crawler, args = (domain,)) processList.append(p) # making a list of performer threads. # starting the threadController as a seperate thread. mt = Thread(target = threadController) mt.start() mt.join() # won''t let go next until threadController thread finishes. output.close() print "Done"

Además de mantener un número mínimo de subprocesos en la memoria, mi objetivo era tener también algo que pudiera evitar subprocesos atascados o procesos en la memoria. Hice esto usando la función de tiempo de espera. Mis disculpas por cualquier error de escritura.

Espero que esta construcción ayude a cualquiera en este mundo. Saludos, Vikas Gautam

Estoy tratando de completar 100 ejecuciones del modelo en mi máquina con Windows 7 de 64 bits y 8 procesadores. Me gustaría ejecutar 7 instancias del modelo simultáneamente para disminuir el tiempo total de ejecución (aproximadamente 9,5 minutos por ejecución del modelo). He visto varios hilos relacionados con el módulo de multiprocesamiento de Python, pero todavía me falta algo.

Utilizando el módulo multiprocesamiento.

¿Cómo generar procesos secundarios paralelos en un sistema multiprocesador?

Cola de multiprocesamiento de Python

Mi proceso

Tengo 100 conjuntos de parámetros diferentes que me gustaría ejecutar a través de SEAWAT / MODFLOW para comparar los resultados. He creado previamente los archivos de entrada del modelo para cada ejecución del modelo y los he almacenado en sus propios directorios. Lo que me gustaría poder hacer es tener 7 modelos funcionando a la vez hasta que se hayan completado todas las realizaciones. No es necesario que haya comunicación entre procesos o visualización de resultados. Hasta ahora solo he podido generar los modelos de forma secuencial:

import os,subprocess import multiprocessing as mp ws = r''D:/Data/Users/jbellino/Project/stJohnsDeepening/model/xsec_a'' files = [] for f in os.listdir(ws + r''/fieldgen/reals''): if f.endswith(''.npy''): files.append(f) ## def work(cmd): ## return subprocess.call(cmd, shell=False) def run(f,def_param=ws): real = f.split(''_'')[2].split(''.'')[0] print ''Realization %s'' % real mf2k = r''c:/modflow/mf2k.1_19/bin/mf2k.exe '' mf2k5 = r''c:/modflow/MF2005_1_8/bin/mf2005.exe '' seawatV4 = r''c:/modflow/swt_v4_00_04/exe/swt_v4.exe '' seawatV4x64 = r''c:/modflow/swt_v4_00_04/exe/swt_v4x64.exe '' exe = seawatV4x64 swt_nam = ws + r''/reals/real%s/ss/ss.nam_swt'' % real os.system( exe + swt_nam ) if __name__ == ''__main__'': p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes tasks = range(len(files)) results = [] for f in files: r = p.map_async(run(f), tasks, callback=results.append)

Cambié el if __name__ == ''main'': a lo siguiente con la esperanza de que solucione la falta de paralelismo que siento que el for loop está impartiendo en el script anterior. Sin embargo, el modelo no puede ejecutarse (sin error de Python):

if __name__ == ''__main__'': p = mp.Pool(processes=mp.cpu_count()-1) #-leave 1 processor available for system and other processes p.map_async(run,((files[f],) for f in range(len(files))))

Cualquier ayuda es muy apreciada!

EDITAR 26/03/2012 13:31 EST

Al utilizar el método "Manual Pool" en la respuesta de @JF Sebastian, obtengo una ejecución paralela de mi .exe externo. Las realizaciones del modelo se invocan en lotes de 8 a la vez, pero no espera a que esas 8 ejecuciones se completen antes de llamar al siguiente lote y así sucesivamente:

from __future__ import print_function import os,subprocess,sys import multiprocessing as mp from Queue import Queue from threading import Thread def run(f,ws): real = f.split(''_'')[-1].split(''.'')[0] print(''Realization %s'' % real) seawatV4x64 = r''c:/modflow/swt_v4_00_04/exe/swt_v4x64.exe '' swt_nam = ws + r''/reals/real%s/ss/ss.nam_swt'' % real subprocess.check_call([seawatV4x64, swt_nam]) def worker(queue): """Process files from the queue.""" for args in iter(queue.get, None): try: run(*args) except Exception as e: # catch exceptions to avoid exiting the # thread prematurely print(''%r failed: %s'' % (args, e,), file=sys.stderr) def main(): # populate files ws = r''D:/Data/Users/jbellino/Project/stJohnsDeepening/model/xsec_a'' wdir = os.path.join(ws, r''fieldgen/reals'') q = Queue() for f in os.listdir(wdir): if f.endswith(''.npy''): q.put_nowait((os.path.join(wdir, f), ws)) # start threads threads = [Thread(target=worker, args=(q,)) for _ in range(8)] for t in threads: t.daemon = True # threads die if the program dies t.start() for _ in threads: q.put_nowait(None) # signal no more files for t in threads: t.join() # wait for completion if __name__ == ''__main__'': mp.freeze_support() # optional if the program is not frozen main()

No hay error de rastreo está disponible. La función run() cumple con su deber cuando se le solicita un archivo de realización de un solo modelo, como ocurre con los archivos múltiples. La única diferencia es que con varios archivos, se llama veces len(files) , aunque cada una de las instancias se cierra de inmediato y solo se permite que una ejecución del modelo finalice en el momento en que el script salga correctamente (código de salida 0).

Agregar algunas declaraciones de impresión a main() revela cierta información sobre el recuento de subprocesos activos, así como el estado de los subprocesos (tenga en cuenta que esta es una prueba en solo 8 de los archivos de realización para hacer que la captura de pantalla sea más manejable; Sin embargo, el comportamiento continúa donde se generan y mueren inmediatamente, excepto uno:

def main(): # populate files ws = r''D:/Data/Users/jbellino/Project/stJohnsDeepening/model/xsec_a'' wdir = os.path.join(ws, r''fieldgen/test'') q = Queue() for f in os.listdir(wdir): if f.endswith(''.npy''): q.put_nowait((os.path.join(wdir, f), ws)) # start threads threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count())] for t in threads: t.daemon = True # threads die if the program dies t.start() print(''Active Count a'',threading.activeCount()) for _ in threads: print(_) q.put_nowait(None) # signal no more files for t in threads: print(t) t.join() # wait for completion print(''Active Count b'',threading.activeCount())

** La línea que dice " D://Data//Users... " es la información de error que se produce cuando detengo manualmente el modelo para que no se ejecute. Una vez que detengo la ejecución del modelo, se informan las líneas de estado de subproceso restantes y se cierra el script.

EDITAR 26/03/2012 16:24 EST

SEAWAT permite la ejecución concurrente como lo he hecho en el pasado, generando instancias manualmente utilizando iPython y ejecutando desde cada carpeta de archivos modelo. Esta vez, estoy lanzando todas las ejecuciones del modelo desde una única ubicación, a saber, el directorio donde reside mi script. Parece que el culpable puede estar en la forma en que SEAWAT está guardando parte de la salida. Cuando se ejecuta SEAWAT, inmediatamente crea archivos pertenecientes a la ejecución del modelo. Uno de estos archivos no se está guardando en el directorio en el que se encuentra la realización del modelo, sino en el directorio superior donde se encuentra el script. Esto impide que cualquier subproceso posterior guarde el mismo nombre de archivo en la misma ubicación (lo que todos quieren hacer, ya que estos nombres de archivo son genéricos y no específicos para cada realización). Las ventanas de SEAWAT no se mantuvieron abiertas el tiempo suficiente para que yo pudiera leer o incluso ver que había un mensaje de error. Solo me di cuenta de esto cuando volví e intenté ejecutar el código con iPython, que muestra directamente la impresión de SEAWAT en lugar de abrir un Nueva ventana para ejecutar el programa.

Estoy aceptando la respuesta de @JF Sebastian, ya que es probable que una vez que resuelva este problema del modelo ejecutable, el código de subprocesamiento que me ha proporcionado me llevará a donde necesito estar.

CÓDIGO FINAL

Se agregó el argumento cwd en subprocess.check_call para iniciar cada instancia de SEAWAT en su propio directorio. Muy clave

from __future__ import print_function import os,subprocess,sys import multiprocessing as mp from Queue import Queue from threading import Thread import threading def run(f,ws): real = f.split(''_'')[-1].split(''.'')[0] print(''Realization %s'' % real) seawatV4x64 = r''c:/modflow/swt_v4_00_04/exe/swt_v4x64.exe '' cwd = ws + r''/reals/real%s/ss'' % real swt_nam = ws + r''/reals/real%s/ss/ss.nam_swt'' % real subprocess.check_call([seawatV4x64, swt_nam],cwd=cwd) def worker(queue): """Process files from the queue.""" for args in iter(queue.get, None): try: run(*args) except Exception as e: # catch exceptions to avoid exiting the # thread prematurely print(''%r failed: %s'' % (args, e,), file=sys.stderr) def main(): # populate files ws = r''D:/Data/Users/jbellino/Project/stJohnsDeepening/model/xsec_a'' wdir = os.path.join(ws, r''fieldgen/reals'') q = Queue() for f in os.listdir(wdir): if f.endswith(''.npy''): q.put_nowait((os.path.join(wdir, f), ws)) # start threads threads = [Thread(target=worker, args=(q,)) for _ in range(mp.cpu_count()-1)] for t in threads: t.daemon = True # threads die if the program dies t.start() for _ in threads: q.put_nowait(None) # signal no more files for t in threads: t.join() # wait for completion if __name__ == ''__main__'': mp.freeze_support() # optional if the program is not frozen main()


No veo ningún cálculo en el código de Python. Si solo necesita ejecutar varios programas externos en paralelo, es suficiente usar el subprocess para ejecutar los programas y el módulo de threading para mantener una cantidad constante de procesos en ejecución, pero el código más simple es el uso de multiprocessing.Pool .

#!/usr/bin/env python import os import multiprocessing as mp def run(filename_def_param): filename, def_param = filename_def_param # unpack arguments ... # call external program on `filename` def safe_run(*args, **kwargs): """Call run(), catch exceptions.""" try: run(*args, **kwargs) except Exception as e: print("error: %s run(*%r, **%r)" % (e, args, kwargs)) def main(): # populate files ws = r''D:/Data/Users/jbellino/Project/stJohnsDeepening/model/xsec_a'' workdir = os.path.join(ws, r''fieldgen/reals'') files = ((os.path.join(workdir, f), ws) for f in os.listdir(workdir) if f.endswith(''.npy'')) # start processes pool = mp.Pool() # use all available CPUs pool.map(safe_run, files) if __name__=="__main__": mp.freeze_support() # optional if the program is not frozen main()

Si hay muchos archivos, entonces pool.map() podría reemplazarse por for _ in pool.imap_unordered(safe_run, files): pass .

También hay mutiprocessing.dummy.Pool que proporciona la misma interfaz que multiprocessing.Pool pero utiliza subprocesos en lugar de procesos que podrían ser más apropiados en este caso.

No necesitas mantener algunas CPU libres. Simplemente use un comando que inicie sus ejecutables con una prioridad baja (en Linux es un nice programa).

Ejemplo de ThreadPoolExecutor

concurrent.futures.ThreadPoolExecutor sería a la vez simple y suficiente, pero requiere la dependencia de terceros en Python 2.x (está en el stdlib desde Python 3.2).

#!/usr/bin/env python import os import concurrent.futures def run(filename, def_param): ... # call external program on `filename` # populate files ws = r''D:/Data/Users/jbellino/Project/stJohnsDeepening/model/xsec_a'' wdir = os.path.join(ws, r''fieldgen/reals'') files = (os.path.join(wdir, f) for f in os.listdir(wdir) if f.endswith(''.npy'')) # start threads with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: future_to_file = dict((executor.submit(run, f, ws), f) for f in files) for future in concurrent.futures.as_completed(future_to_file): f = future_to_file[future] if future.exception() is not None: print(''%r generated an exception: %s'' % (f, future.exception())) # run() doesn''t return anything so `future.result()` is always `None`

O si ignoramos las excepciones generadas por run() :

from itertools import repeat ... # the same # start threads with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: executor.map(run, files, repeat(ws)) # run() doesn''t return anything so `map()` results can be ignored

Solución de subprocess + threading (agrupación manual)

#!/usr/bin/env python from __future__ import print_function import os import subprocess import sys from Queue import Queue from threading import Thread def run(filename, def_param): ... # define exe, swt_nam subprocess.check_call([exe, swt_nam]) # run external program def worker(queue): """Process files from the queue.""" for args in iter(queue.get, None): try: run(*args) except Exception as e: # catch exceptions to avoid exiting the # thread prematurely print(''%r failed: %s'' % (args, e,), file=sys.stderr) # start threads q = Queue() threads = [Thread(target=worker, args=(q,)) for _ in range(8)] for t in threads: t.daemon = True # threads die if the program dies t.start() # populate files ws = r''D:/Data/Users/jbellino/Project/stJohnsDeepening/model/xsec_a'' wdir = os.path.join(ws, r''fieldgen/reals'') for f in os.listdir(wdir): if f.endswith(''.npy''): q.put_nowait((os.path.join(wdir, f), ws)) for _ in threads: q.put_nowait(None) # signal no more files for t in threads: t.join() # wait for completion