ejemplos - multitarea en python
¿Grupo de subprocesos similar al grupo de multiprocesamiento? (9)
Acabo de descubrir que en realidad hay una interfaz de grupo basada en subprocesos en el módulo de multiprocessing
, sin embargo, está oculta en cierta medida y no está bien documentada.
Se puede importar vía
from multiprocessing.pool import ThreadPool
Se implementa utilizando una clase de proceso ficticia que envuelve un hilo de python. Esta clase de proceso basada en subprocesos se puede encontrar en multiprocessing.dummy
que se menciona brevemente en los multiprocessing.dummy . Este módulo ficticio supuestamente proporciona toda la interfaz de multiprocesamiento basada en subprocesos.
¿Hay una clase de grupo para subprocesos de trabajo, similar a la clase de grupo del módulo de multiprocesamiento?
Me gusta, por ejemplo, la manera fácil de paralelizar una función de mapa.
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
sin embargo, me gustaría hacerlo sin la sobrecarga de crear nuevos procesos.
Sé sobre el GIL. Sin embargo, en mi caso de uso, la función será una función C unida a IO para la cual la envoltura python liberará el GIL antes de la llamada a la función real.
¿Tengo que escribir mi propio grupo de subprocesos?
Aquí está el resultado que finalmente termine usando. Es una versión modificada de las clases por dgorissen arriba.
Archivo: threadpool.py
from queue import Queue, Empty
import threading
from threading import Thread
class Worker(Thread):
_TIMEOUT = 2
""" Thread executing tasks from a given tasks queue. Thread is signalable,
to exit
"""
def __init__(self, tasks, th_num):
Thread.__init__(self)
self.tasks = tasks
self.daemon, self.th_num = True, th_num
self.done = threading.Event()
self.start()
def run(self):
while not self.done.is_set():
try:
func, args, kwargs = self.tasks.get(block=True,
timeout=self._TIMEOUT)
try:
func(*args, **kwargs)
except Exception as e:
print(e)
finally:
self.tasks.task_done()
except Empty as e:
pass
return
def signal_exit(self):
""" Signal to thread to exit """
self.done.set()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads, tasks=[]):
self.tasks = Queue(num_threads)
self.workers = []
self.done = False
self._init_workers(num_threads)
for task in tasks:
self.tasks.put(task)
def _init_workers(self, num_threads):
for i in range(num_threads):
self.workers.append(Worker(self.tasks, i))
def add_task(self, func, *args, **kwargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kwargs))
def _close_all_threads(self):
""" Signal all threads to exit and lose the references to them """
for workr in self.workers:
workr.signal_exit()
self.workers = []
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def __del__(self):
self._close_all_threads()
def create_task(func, *args, **kwargs):
return (func, args, kwargs)
Usar la piscina
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(30)]
def wait_delay(d):
print(''sleeping for (%d)sec'' % d)
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Aquí hay algo que parece prometedor en el libro de cocina de Python:
Receta 576519: grupo de subprocesos con la misma API que (multi) processing.Pool (Python)
En Python 3 puedes usar concurrent.futures.ThreadPoolExecutor
, es decir:
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
Consulte la docs para obtener más información y ejemplos.
Hola, para usar el grupo de hilos en Python puedes usar esta biblioteca:
from multiprocessing.dummy import Pool as ThreadPool
y luego para usar, a esta biblioteca le gusta eso:
pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results
Los subprocesos son el número de subprocesos que desea y las tareas son una lista de tareas que la mayoría se asigna al servicio.
La sobrecarga de crear los nuevos procesos es mínima, especialmente cuando solo son 4 de ellos. Dudo que este sea un punto caliente de rendimiento de su aplicación. Manténgalo simple, optimice dónde tiene que hacerlo y hacia dónde apuntan los resultados.
No hay construido en el grupo basado en subproceso. Sin embargo, puede ser muy rápido implementar una cola de productor / consumidor con la clase Queue
.
Desde: https://docs.python.org/2/library/queue.html
from threading import Thread
from Queue import Queue
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
Para algo muy simple y ligero (ligeramente modificado desde here ):
from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == ''__main__'':
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(100)]
def wait_delay(d):
print ''sleeping for (%d)sec'' % d
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Para admitir las devoluciones de llamada al finalizar la tarea, simplemente puede agregar la devolución de llamada a la tupla de tareas.
Sí, y parece tener (más o menos) la misma API.
import multiprocessing
def worker(lnk):
....
def start_process():
.....
....
if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)
pool.map(worker, inputs)
....