python - guia - qgis manual
¿La forma correcta de limitar el número máximo de subprocesos que se ejecutan a la vez? (5)
He visto que lo más comúnmente escrito como:
threads = [threading.Thread(target=f) for _ in range(8)]
for thread in threads:
thread.start()
...
for thread in threads:
thread.join()
Si desea mantener un grupo de tamaño fijo de subprocesos en ejecución que procesan tareas de corta duración que pedir un nuevo trabajo, considere una solución basada en colas, como " Cómo esperar hasta que solo se termine el primer subproceso en Python ".
Me gustaría crear un programa que ejecute varios subprocesos ligeros, pero que se limite a un número constante y predefinido de tareas simultáneas en ejecución, como esta (pero sin riesgo de condición de carrera):
import threading
def f(arg):
global running
running += 1
print("Spawned a thread. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
running -= 1
print("Done")
running = 0
while True:
if running < 8:
arg = get_task()
threading.Thread(target=f, args=[arg]).start()
¿Cuál es la forma más segura / rápida de implementar esto?
Para aplicar la limitación en la creación de subprocesos , siga este ejemplo (realmente funciona):
import threading
import time
def some_process(thread_num):
count = 0
while count < 5:
time.sleep(0.5)
count += 1
print "%s: %s" % (thread_num, time.ctime(time.time()))
print ''number of alive threads:{}''.format(threading.active_count())
def create_thread():
try:
for i in range(1, 555): # trying to spawn 555 threads.
thread = threading.Thread(target=some_process, args=(i,))
thread.start()
if threading.active_count() == 100: # set maximum threads.
thread.join()
print threading.active_count() # number of alive threads.
except Exception as e:
print "Error: unable to start thread {}".format(e)
if __name__ == ''__main__'':
create_thread()
O:
Otra forma de configurar un comprobador de número de hilo mutex / lock como el siguiente ejemplo:
import threading
import time
def some_process(thread_num):
count = 0
while count < 5:
time.sleep(0.5)
count += 1
# print "%s: %s" % (thread_num, time.ctime(time.time()))
print ''number of alive threads:{}''.format(threading.active_count())
def create_thread2(number_of_desire_thread ):
try:
for i in range(1, 555):
thread = threading.Thread(target=some_process, args=(i,)).start()
while number_of_desire_thread <= threading.active_count():
''''''mutex for avoiding to additional thread creation.''''''
pass
print ''unlock''
print threading.active_count() # number of alive threads.
except Exception as e:
print "Error: unable to start thread {}".format(e)
if __name__ == ''__main__'':
create_thread2(100)
Parece que desea implementar el patrón productor / consumidor con ocho trabajadores. Python tiene una clase de Queue
para este propósito, y es seguro para subprocesos.
Cada trabajador debe llamar a get()
en la cola para recuperar una tarea. Esta llamada se bloqueará si no hay tareas disponibles, lo que hará que el trabajador quede inactivo hasta que haya una disponible. Luego, el trabajador debe ejecutar la tarea y finalmente llamar a task_done()
en la cola.
Usted pondría las tareas en la cola llamando a put()
en la cola.
Desde el hilo principal, puede llamar a join()
en la cola para esperar hasta que todas las tareas pendientes se hayan completado.
Este enfoque tiene la ventaja de que no está creando y destruyendo subprocesos, lo cual es costoso. Los subprocesos de trabajo se ejecutarán de forma continua, pero estarán inactivos cuando no haya tareas en la cola, sin tiempo de CPU.
(La página de documentación vinculada tiene un ejemplo de este mismo patrón.)
Sería mucho más fácil implementar esto como un grupo de subprocesos o ejecutor, usando multiprocessing.dummy.Pool
, o concurrent.futures.ThreadPoolExecutor
(o, si usa Python 2.x, los futures
backport). Por ejemplo:
import concurrent
def f(arg):
print("Started a task. running=%s, arg=%s" % (running, arg))
for i in range(100000):
pass
print("Done")
with concurrent.futures.ThreadPoolExecutor(8) as executor:
while True:
arg = get_task()
executor.submit(f, arg)
Por supuesto, si puede cambiar el get_task
modelo de get_task
a un get_tasks
modelo de get_tasks
que, por ejemplo, produce tareas de una en una, esto es aún más simple:
with concurrent.futures.ThreadPoolExecutor(8) as executor:
for arg in get_tasks():
executor.submit(f, arg)
Cuando te quedas sin tareas (por ejemplo, get_task
genera una excepción, o get_tasks
ejecuta en seco), esto le indicará automáticamente al ejecutor que se detenga después de que drene la cola, espere a que se detenga y limpie todo.
semaphore es un tipo de datos variable o abstracto que se utiliza para controlar el acceso a un recurso común mediante múltiples procesos en un sistema concurrente, como un sistema operativo de multiprogramación; Esto puede ayudarte aquí.
threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)
class MyThread(threading.Thread):
def run(self):
threadLimiter.acquire()
try:
self.Executemycode()
finally:
threadLimiter.release()
def Executemycode(self):
print(" Hello World!")
# <your code here>
De esta manera, puede limitar fácilmente el número de subprocesos que se ejecutarán simultáneamente durante la ejecución del programa. Variable, ''maximumNumberOfThreads'' se puede usar para definir un límite superior en el valor máximo de hilos.