set_start_method - python multithreading
Multiproceso de Python y un contador compartido (3)
Clase Counter más rápida sin usar el bloqueo incorporado de Value dos veces
class Counter(object):
def __init__(self, initval=0):
self.val = multiprocessing.RawValue(''i'', initval)
self.lock = multiprocessing.Lock()
def increment(self):
with self.lock:
self.val.value += 1
@property
def value(self):
return self.val.value
https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue
Tengo problemas con el módulo de multiprocesamiento. Estoy usando un grupo de trabajadores con su método de mapa para cargar datos de muchos archivos y para cada uno de ellos analizo datos con una función personalizada. Cada vez que se procesa un archivo, me gustaría tener un contador actualizado para que pueda hacer un seguimiento de cuántos archivos quedan por procesar. Aquí hay un código de muestra:
def analyze_data( args ):
# do something
counter += 1
print counter
if __name__ == ''__main__'':
list_of_files = os.listdir(some_directory)
global counter
counter = 0
p = Pool()
p.map(analyze_data, list_of_files)
No puedo encontrar una solución para esto.
Clase contraria sin el error de condición de carrera:
class Counter(object):
def __init__(self):
self.val = multiprocessing.Value(''i'', 0)
def increment(self, n=1):
with self.val.get_lock():
self.val.value += n
@property
def value(self):
return self.val.value
El problema es que la variable de counter
no se comparte entre sus procesos: cada proceso por separado crea su propia instancia local y la incrementa.
Consulte esta sección de la documentación para conocer algunas técnicas que puede emplear para compartir estados entre sus procesos. En su caso, es posible que desee compartir una instancia de Value
entre sus trabajadores
Aquí hay una versión de trabajo de su ejemplo (con algunos datos de entrada ficticios). Tenga en cuenta que utiliza valores globales que realmente trataría de evitar en la práctica:
from multiprocessing import Pool, Value
from time import sleep
counter = None
def init(args):
'''''' store the counter for later use ''''''
global counter
counter = args
def analyze_data(args):
'''''' increment the global counter, do something with the input ''''''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10
if __name__ == ''__main__'':
#inputs = os.listdir(some_directory)
#
# initialize a cross-process counter and the input lists
#
counter = Value(''i'', 0)
inputs = [1, 2, 3, 4]
#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()