parallel new manager lock how current python function class callback multiprocessing

python - new - ¿Cómo pasar la instancia multiprocessing.Pool a la función de devolución de llamada apply_async?



python new process (3)

Aquí está mi programa de factorización principal, agregué una función de devolución de llamada en pool.apply_async(findK, args=(N,begin,end)) , un mensaje prime factorization is over cuando la factorización principal finaliza, la operación está bien.

import math import multiprocessing def findK(N,begin,end): for k in range(begin,end): if N% k == 0: print(N,"=" ,k ,"*", N/k) return True return False def prompt(result): if result: print("prime factorization is over") def mainFun(N,process_num): pool = multiprocessing.Pool(process_num) for i in range(process_num): if i ==0 : begin =2 else: begin = int(math.sqrt(N)/process_num*i)+1 end = int(math.sqrt(N)/process_num*(i+1)) pool.apply_async(findK, args=(N,begin,end) , callback = prompt) pool.close() pool.join() if __name__ == "__main__": N = 684568031001583853 process_num = 16 mainFun(N,process_num)

Ahora quiero cambiar la función de devolución de llamada en apply_async, para cambiar el indicador en una función de apagado para eliminar todos los demás procesos.

def prompt(result): if result: pool.terminate()

La instancia de la agrupación no se define en el alcance de solicitud o se pasa a solicitud
pool.terminate() no puede funcionar en la función de solicitud.
¿Cómo pasar la instancia multiprocessing.Pool a apply_async''callback?
(Lo he hecho en formato de clase, solo para agregar un método de clase y llamar self.pool.terminate puede matar todos los demás procesos, ¿cómo hacer el trabajo en formato de función?)

si no se configura el grupo como variable global, ¿se puede pasar al grupo a la función de devolución de llamada?


Es necesario que la pool termine en el entorno del sistema. Una posibilidad es mover el pool al ámbito global (aunque esto no es realmente la mejor práctica). Esto parece funcionar:

import math import multiprocessing pool = None def findK(N,begin,end): for k in range(begin,end): if N% k == 0: print(N,"=" ,k ,"*", N/k) return True return False def prompt(result): if result: print("prime factorization is over") pool.terminate() def mainFun(N,process_num): global pool pool = multiprocessing.Pool(process_num) for i in range(process_num): if i ==0 : begin =2 else: begin = int(math.sqrt(N)/process_num*i)+1 end = int(math.sqrt(N)/process_num*(i+1)) pool.apply_async(findK, args=(N,begin,end) , callback = prompt) pool.close() pool.join() if __name__ == "__main__": N = 684568031001583853 process_num = 16 mainFun(N,process_num)


Pasar argumentos adicionales a la función de devolución de llamada no es compatible. Sin embargo, tienes muchas formas elegantes de solucionarlo.

Puede encapsular su lógica de grupo en un objeto:

class Executor: def __init__(self, process_num): self.pool = multiprocessing.Pool(process_num) def prompt(self, result): if result: print("prime factorization is over") self.pool.terminate() def schedule(self, function, args): self.pool.apply_async(function, args=args, callback=self.prompt) def wait(self): self.pool.close() self.pool.join() def main(N,process_num): executor = Executor(process_num) for i in range(process_num): ... executor.schedule(findK, (N,begin,end)) executor.wait()

O puede usar la implementación concurrent.futures.Executor que devuelve un objeto Future . Simplemente agregue el grupo al objeto Future antes de configurar la devolución de llamada.

def prompt(future): if future.result(): print("prime factorization is over") future.pool_executor.shutdown(wait=False) def main(N,process_num): executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num) for i in range(process_num): ... future = executor.submit(findK, N,begin,end) future.pool_executor = executor future.add_done_callback(prompt)


Simplemente puede definir una función de close local como una devolución de llamada:

import math import multiprocessing def findK(N, begin, end): for k in range(begin, end): if N % k == 0: print(N, "=", k, "*", N / k) return True return False def mainFun(N, process_num): pool = multiprocessing.Pool(process_num) def close(result): if result: print("prime factorization is over") pool.terminate() for i in range(process_num): if i == 0: begin = 2 else: begin = int(math.sqrt(N) / process_num * i) + 1 end = int(math.sqrt(N) / process_num * (i + 1)) pool.apply_async(findK, args=(N, begin, end), callback=close) pool.close() pool.join() if __name__ == "__main__": N = 684568031001583853 process_num = 16 mainFun(N, process_num)

También puede utilizar una función partial de functool , con

import functools def close_pool(pool, results): if result: pool.terminate() def mainFun(N, process_num): pool = multiprocessing.Pool(process_num) close = funtools.partial(close_pool, pool) ....