python - new - ¿Cómo pasar la instancia multiprocessing.Pool a la función de devolución de llamada apply_async?
python new process (3)
Aquí está mi programa de factorización principal, agregué una función de devolución de llamada en pool.apply_async(findK, args=(N,begin,end))
, un mensaje prime factorization is over
cuando la factorización principal finaliza, la operación está bien.
import math
import multiprocessing
def findK(N,begin,end):
for k in range(begin,end):
if N% k == 0:
print(N,"=" ,k ,"*", N/k)
return True
return False
def prompt(result):
if result:
print("prime factorization is over")
def mainFun(N,process_num):
pool = multiprocessing.Pool(process_num)
for i in range(process_num):
if i ==0 :
begin =2
else:
begin = int(math.sqrt(N)/process_num*i)+1
end = int(math.sqrt(N)/process_num*(i+1))
pool.apply_async(findK, args=(N,begin,end) , callback = prompt)
pool.close()
pool.join()
if __name__ == "__main__":
N = 684568031001583853
process_num = 16
mainFun(N,process_num)
Ahora quiero cambiar la función de devolución de llamada en apply_async, para cambiar el indicador en una función de apagado para eliminar todos los demás procesos.
def prompt(result):
if result:
pool.terminate()
La instancia de la agrupación no se define en el alcance de solicitud o se pasa a solicitud
pool.terminate()
no puede funcionar en la función de solicitud.
¿Cómo pasar la instancia multiprocessing.Pool a apply_async''callback?
(Lo he hecho en formato de clase, solo para agregar un método de clase y llamar self.pool.terminate puede matar todos los demás procesos, ¿cómo hacer el trabajo en formato de función?)
si no se configura el grupo como variable global, ¿se puede pasar al grupo a la función de devolución de llamada?
Es necesario que la pool
termine en el entorno del sistema. Una posibilidad es mover el pool
al ámbito global (aunque esto no es realmente la mejor práctica). Esto parece funcionar:
import math
import multiprocessing
pool = None
def findK(N,begin,end):
for k in range(begin,end):
if N% k == 0:
print(N,"=" ,k ,"*", N/k)
return True
return False
def prompt(result):
if result:
print("prime factorization is over")
pool.terminate()
def mainFun(N,process_num):
global pool
pool = multiprocessing.Pool(process_num)
for i in range(process_num):
if i ==0 :
begin =2
else:
begin = int(math.sqrt(N)/process_num*i)+1
end = int(math.sqrt(N)/process_num*(i+1))
pool.apply_async(findK, args=(N,begin,end) , callback = prompt)
pool.close()
pool.join()
if __name__ == "__main__":
N = 684568031001583853
process_num = 16
mainFun(N,process_num)
Pasar argumentos adicionales a la función de devolución de llamada no es compatible. Sin embargo, tienes muchas formas elegantes de solucionarlo.
Puede encapsular su lógica de grupo en un objeto:
class Executor:
def __init__(self, process_num):
self.pool = multiprocessing.Pool(process_num)
def prompt(self, result):
if result:
print("prime factorization is over")
self.pool.terminate()
def schedule(self, function, args):
self.pool.apply_async(function, args=args, callback=self.prompt)
def wait(self):
self.pool.close()
self.pool.join()
def main(N,process_num):
executor = Executor(process_num)
for i in range(process_num):
...
executor.schedule(findK, (N,begin,end))
executor.wait()
O puede usar la implementación concurrent.futures.Executor que devuelve un objeto Future
. Simplemente agregue el grupo al objeto Future
antes de configurar la devolución de llamada.
def prompt(future):
if future.result():
print("prime factorization is over")
future.pool_executor.shutdown(wait=False)
def main(N,process_num):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num)
for i in range(process_num):
...
future = executor.submit(findK, N,begin,end)
future.pool_executor = executor
future.add_done_callback(prompt)
Simplemente puede definir una función de close
local como una devolución de llamada:
import math
import multiprocessing
def findK(N, begin, end):
for k in range(begin, end):
if N % k == 0:
print(N, "=", k, "*", N / k)
return True
return False
def mainFun(N, process_num):
pool = multiprocessing.Pool(process_num)
def close(result):
if result:
print("prime factorization is over")
pool.terminate()
for i in range(process_num):
if i == 0:
begin = 2
else:
begin = int(math.sqrt(N) / process_num * i) + 1
end = int(math.sqrt(N) / process_num * (i + 1))
pool.apply_async(findK, args=(N, begin, end), callback=close)
pool.close()
pool.join()
if __name__ == "__main__":
N = 684568031001583853
process_num = 16
mainFun(N, process_num)
También puede utilizar una función partial
de functool
, con
import functools
def close_pool(pool, results):
if result:
pool.terminate()
def mainFun(N, process_num):
pool = multiprocessing.Pool(process_num)
close = funtools.partial(close_pool, pool)
....