threading set_start_method ejemplos python multiprocessing

set_start_method - python multithreading



Multiproceso de Python y un contador compartido (3)

Clase Counter más rápida sin usar el bloqueo incorporado de Value dos veces

class Counter(object): def __init__(self, initval=0): self.val = multiprocessing.RawValue(''i'', initval) self.lock = multiprocessing.Lock() def increment(self): with self.lock: self.val.value += 1 @property def value(self): return self.val.value

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

Tengo problemas con el módulo de multiprocesamiento. Estoy usando un grupo de trabajadores con su método de mapa para cargar datos de muchos archivos y para cada uno de ellos analizo datos con una función personalizada. Cada vez que se procesa un archivo, me gustaría tener un contador actualizado para que pueda hacer un seguimiento de cuántos archivos quedan por procesar. Aquí hay un código de muestra:

def analyze_data( args ): # do something counter += 1 print counter if __name__ == ''__main__'': list_of_files = os.listdir(some_directory) global counter counter = 0 p = Pool() p.map(analyze_data, list_of_files)

No puedo encontrar una solución para esto.


Clase contraria sin el error de condición de carrera:

class Counter(object): def __init__(self): self.val = multiprocessing.Value(''i'', 0) def increment(self, n=1): with self.val.get_lock(): self.val.value += n @property def value(self): return self.val.value


El problema es que la variable de counter no se comparte entre sus procesos: cada proceso por separado crea su propia instancia local y la incrementa.

Consulte esta sección de la documentación para conocer algunas técnicas que puede emplear para compartir estados entre sus procesos. En su caso, es posible que desee compartir una instancia de Value entre sus trabajadores

Aquí hay una versión de trabajo de su ejemplo (con algunos datos de entrada ficticios). Tenga en cuenta que utiliza valores globales que realmente trataría de evitar en la práctica:

from multiprocessing import Pool, Value from time import sleep counter = None def init(args): '''''' store the counter for later use '''''' global counter counter = args def analyze_data(args): '''''' increment the global counter, do something with the input '''''' global counter # += operation is not atomic, so we need to get a lock: with counter.get_lock(): counter.value += 1 print counter.value return args * 10 if __name__ == ''__main__'': #inputs = os.listdir(some_directory) # # initialize a cross-process counter and the input lists # counter = Value(''i'', 0) inputs = [1, 2, 3, 4] # # create the pool of workers, ensuring each one receives the counter # as it starts. # p = Pool(initializer = init, initargs = (counter, )) i = p.map_async(analyze_data, inputs, chunksize = 1) i.wait() print i.get()