procesos multitarea ejemplos crear concurrentes con python multithreading missing-features

ejemplos - multitarea en python



¿Grupo de subprocesos similar al grupo de multiprocesamiento? (9)

Acabo de descubrir que en realidad hay una interfaz de grupo basada en subprocesos en el módulo de multiprocessing , sin embargo, está oculta en cierta medida y no está bien documentada.

Se puede importar vía

from multiprocessing.pool import ThreadPool

Se implementa utilizando una clase de proceso ficticia que envuelve un hilo de python. Esta clase de proceso basada en subprocesos se puede encontrar en multiprocessing.dummy que se menciona brevemente en los multiprocessing.dummy . Este módulo ficticio supuestamente proporciona toda la interfaz de multiprocesamiento basada en subprocesos.

¿Hay una clase de grupo para subprocesos de trabajo, similar a la clase de grupo del módulo de multiprocesamiento?

Me gusta, por ejemplo, la manera fácil de paralelizar una función de mapa.

def long_running_func(p): c_func_no_gil(p) p = multiprocessing.Pool(4) xs = p.map(long_running_func, range(100))

sin embargo, me gustaría hacerlo sin la sobrecarga de crear nuevos procesos.

Sé sobre el GIL. Sin embargo, en mi caso de uso, la función será una función C unida a IO para la cual la envoltura python liberará el GIL antes de la llamada a la función real.

¿Tengo que escribir mi propio grupo de subprocesos?


Aquí está el resultado que finalmente termine usando. Es una versión modificada de las clases por dgorissen arriba.

Archivo: threadpool.py

from queue import Queue, Empty import threading from threading import Thread class Worker(Thread): _TIMEOUT = 2 """ Thread executing tasks from a given tasks queue. Thread is signalable, to exit """ def __init__(self, tasks, th_num): Thread.__init__(self) self.tasks = tasks self.daemon, self.th_num = True, th_num self.done = threading.Event() self.start() def run(self): while not self.done.is_set(): try: func, args, kwargs = self.tasks.get(block=True, timeout=self._TIMEOUT) try: func(*args, **kwargs) except Exception as e: print(e) finally: self.tasks.task_done() except Empty as e: pass return def signal_exit(self): """ Signal to thread to exit """ self.done.set() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads, tasks=[]): self.tasks = Queue(num_threads) self.workers = [] self.done = False self._init_workers(num_threads) for task in tasks: self.tasks.put(task) def _init_workers(self, num_threads): for i in range(num_threads): self.workers.append(Worker(self.tasks, i)) def add_task(self, func, *args, **kwargs): """Add a task to the queue""" self.tasks.put((func, args, kwargs)) def _close_all_threads(self): """ Signal all threads to exit and lose the references to them """ for workr in self.workers: workr.signal_exit() self.workers = [] def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() def __del__(self): self._close_all_threads() def create_task(func, *args, **kwargs): return (func, args, kwargs)

Usar la piscina

from random import randrange from time import sleep delays = [randrange(1, 10) for i in range(30)] def wait_delay(d): print(''sleeping for (%d)sec'' % d) sleep(d) pool = ThreadPool(20) for i, d in enumerate(delays): pool.add_task(wait_delay, d) pool.wait_completion()



En Python 3 puedes usar concurrent.futures.ThreadPoolExecutor , es decir:

executor = ThreadPoolExecutor(max_workers=10) a = executor.submit(my_function)

Consulte la docs para obtener más información y ejemplos.


Hola, para usar el grupo de hilos en Python puedes usar esta biblioteca:

from multiprocessing.dummy import Pool as ThreadPool

y luego para usar, a esta biblioteca le gusta eso:

pool = ThreadPool(threads) results = pool.map(service, tasks) pool.close() pool.join() return results

Los subprocesos son el número de subprocesos que desea y las tareas son una lista de tareas que la mayoría se asigna al servicio.


La sobrecarga de crear los nuevos procesos es mínima, especialmente cuando solo son 4 de ellos. Dudo que este sea un punto caliente de rendimiento de su aplicación. Manténgalo simple, optimice dónde tiene que hacerlo y hacia dónde apuntan los resultados.


No hay construido en el grupo basado en subproceso. Sin embargo, puede ser muy rápido implementar una cola de productor / consumidor con la clase Queue .

Desde: https://docs.python.org/2/library/queue.html

from threading import Thread from Queue import Queue def worker(): while True: item = q.get() do_work(item) q.task_done() q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # block until all tasks are done


Para algo muy simple y ligero (ligeramente modificado desde here ):

from Queue import Queue from threading import Thread class Worker(Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception, e: print e finally: self.tasks.task_done() class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.tasks = Queue(num_threads) for _ in range(num_threads): Worker(self.tasks) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs)) def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join() if __name__ == ''__main__'': from random import randrange from time import sleep delays = [randrange(1, 10) for i in range(100)] def wait_delay(d): print ''sleeping for (%d)sec'' % d sleep(d) pool = ThreadPool(20) for i, d in enumerate(delays): pool.add_task(wait_delay, d) pool.wait_completion()

Para admitir las devoluciones de llamada al finalizar la tarea, simplemente puede agregar la devolución de llamada a la tupla de tareas.


Sí, y parece tener (más o menos) la misma API.

import multiprocessing def worker(lnk): .... def start_process(): ..... .... if(PROCESS): pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process) else: pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, initializer=start_process) pool.map(worker, inputs) ....