set_start_method - Python Process Pool no daemonic?
python pool process (3)
El módulo de multiprocessing tiene una interfaz agradable para usar pools con procesos o hilos. Dependiendo de su caso de uso actual, puede considerar el uso de multiprocessing.pool.ThreadPool
para su grupo externo, lo que dará como resultado subprocesos (que permiten generar procesos desde dentro) en lugar de procesos.
Puede estar limitado por el GIL, pero en mi caso particular (probé ambos) , el tiempo de inicio para los procesos del Pool
externo como se creó here superó con creces la solución con ThreadPool
.
Es realmente fácil intercambiar Processes
por Threads
. Obtenga más información sobre cómo usar una solución ThreadPool
here o here .
¿Sería posible crear un grupo de Python que no sea demoníaco? Quiero que un grupo pueda llamar a una función que tiene otro grupo dentro.
Quiero esto porque los procesos de deamon no pueden crear procesos. Específicamente, causará el error:
AssertionError: daemonic processes are not allowed to have children
Por ejemplo, considere el escenario donde function_a
tiene un grupo que ejecuta function_b
que tiene un grupo que ejecuta function_c
. Esta cadena de funciones fallará, porque function_b
se está ejecutando en un proceso de daemon, y los procesos de daemon no pueden crear procesos.
El problema que encontré fue al tratar de importar elementos globales entre los módulos, lo que hace que la línea ProcessPool () se evalúe varias veces.
globals.py
from processing import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading import ThreadPool
class SingletonMeta(type):
def __new__(cls, name, bases, dict):
dict[''__deepcopy__''] = dict[''__copy__''] = lambda self, *args: self
return super(SingletonMeta, cls).__new__(cls, name, bases, dict)
def __init__(cls, name, bases, dict):
super(SingletonMeta, cls).__init__(name, bases, dict)
cls.instance = None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
return cls.instance
def __deepcopy__(self, item):
return item.__class__.instance
class Globals(object):
__metaclass__ = SingletonMeta
"""
This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
The root cause is that importing this file from different modules causes this file to be reevalutated each time,
thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
"""
def __init__(self):
print "%s::__init__()" % (self.__class__.__name__)
self.shared_manager = Manager()
self.shared_process_pool = ProcessPool()
self.shared_thread_pool = ThreadPool()
self.shared_lock = Lock() # BUG: Windows: global name ''lock'' is not defined | doesn''t affect cygwin
A continuación, importa de forma segura desde otro lugar en tu código
from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock
La clase multiprocessing.pool.Pool
crea los procesos de trabajo en su método __init__
, los convierte en daemonic y los inicia, y no es posible volver a establecer su atributo de daemon
en False
antes de que se inicien (y luego ya no se permite). Pero puede crear su propia subclase de multiprocesing.pool.Pool
( multiprocessing.Pool
es solo una función de envoltura) y sustituir su propia multiprocessing.Process
, que siempre es no-demoníaco, para ser utilizado en los procesos de trabajo .
Aquí hay un ejemplo completo de cómo hacer esto. Las partes importantes son las dos clases NoDaemonProcess
y MyPool
en la parte superior y para llamar a pool.close()
y pool.join()
en su instancia MyPool
al final.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make ''daemon'' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child''s pool are killed when the child is terminated, but it''s good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == ''__main__'':
test()