programming - python multiprocessing set_start_method
Fast Queue de solo leer matrices numpy (3)
Tengo un trabajo de multiprocesamiento en el que estoy en cola para leer solo matrices numpy, como parte de una cartera de consumidores productores.
Actualmente están en escabeche, porque este es el comportamiento predeterminado de multiprocessing.Queue
ralentiza el rendimiento.
¿Hay alguna manera pitónica de pasar referencias a la memoria compartida en lugar de develar las matrices?
Lamentablemente, las matrices se generan después de que se inicia el consumidor, y no hay una manera fácil de evitarlo. (Entonces, el enfoque de la variable global sería feo ...).
[Tenga en cuenta que en el siguiente código no esperamos que h (x0) y h (x1) se computen en paralelo. En su lugar, vemos h (x0) yg (h (x1)) calculados en paralelo (como una canalización en una CPU).]
from multiprocessing import Process, Queue
import numpy as np
class __EndToken(object):
pass
def parrallel_pipeline(buffer_size=50):
def parrallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())
def parallel_generator(f_xs):
q = Queue(buffer_size)
consumer_process = Process(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x
def f_wrapper(xs):
return parallel_generator(f(xs))
return f_wrapper
return parrallel_pipeline_with_args
@parrallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0
@parrallel_pipeline(3)
def g(xs):
for x in xs:
yield x * 3
@parrallel_pipeline(3)
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
if __name__ == "__main__":
rs = f(g(h(xs())))
for r in rs:
print r
Compartir memoria entre hilos o procesos
Use threading en lugar de multiprocesamiento
Como usa numpy, puede aprovechar el hecho de que el bloqueo de intérprete global se libera durante los cálculos numpy . Esto significa que puede hacer un procesamiento paralelo con hilos estándar y memoria compartida, en lugar de multiprocesamiento y comunicación entre procesos. Aquí hay una versión de su código, modificada para usar threading. Thread y Queue.Queue en lugar de multiprocesamiento. Procesar y multiprocesar.Queue. Esto pasa un ndarray numpy a través de una cola sin decapado. En mi computadora, esto funciona aproximadamente 3 veces más rápido que tu código. (Sin embargo, es solo aproximadamente un 20% más rápido que la versión en serie de su código. He sugerido algunos otros enfoques más abajo).
from threading import Thread
from Queue import Queue
import numpy as np
class __EndToken(object):
pass
def parallel_pipeline(buffer_size=50):
def parallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())
def parallel_generator(f_xs):
q = Queue(buffer_size)
consumer_process = Thread(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x
def f_wrapper(xs):
return parallel_generator(f(xs))
return f_wrapper
return parallel_pipeline_with_args
@parallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0
@parallel_pipeline(3)
def g(xs):
for x in xs:
yield x * 3
@parallel_pipeline(3)
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
rs = f(g(h(xs())))
%time print sum(r.sum() for r in rs) # 12.2s
Almacene matrices numpy en la memoria compartida
Otra opción, cercana a lo que solicitó, sería continuar utilizando el paquete de multiprocesamiento, pero pasar datos entre procesos utilizando matrices almacenadas en la memoria compartida. El siguiente código crea una nueva clase ArrayQueue para hacer eso. El objeto ArrayQueue se debe crear antes de los subprocesos de desove. Crea y administra un grupo de matrices numpy respaldadas por memoria compartida. Cuando se inserta una matriz de resultados en la cola, ArrayQueue copia los datos de esa matriz en una matriz de memoria compartida existente, y luego pasa la identificación de la matriz de memoria compartida a través de la cola. Esto es mucho más rápido que enviar la matriz completa a través de la cola, ya que evita el decapado de las matrices. Esto tiene un rendimiento similar al de la versión con hebras anterior (aproximadamente un 10% más lento), y puede escalar mejor si el bloqueo del intérprete global es un problema (es decir, se ejecuta un gran número de código python en las funciones).
from multiprocessing import Process, Queue, Array
import numpy as np
class ArrayQueue(object):
def __init__(self, template, maxsize=0):
if type(template) is not np.ndarray:
raise ValueError(''ArrayQueue(template, maxsize) must use a numpy.ndarray as the template.'')
if maxsize == 0:
# this queue cannot be infinite, because it will be backed by real objects
raise ValueError(''ArrayQueue(template, maxsize) must use a finite value for maxsize.'')
# find the size and data type for the arrays
# note: every ndarray put on the queue must be this size
self.dtype = template.dtype
self.shape = template.shape
self.byte_count = len(template.data)
# make a pool of numpy arrays, each backed by shared memory,
# and create a queue to keep track of which ones are free
self.array_pool = [None] * maxsize
self.free_arrays = Queue(maxsize)
for i in range(maxsize):
buf = Array(''c'', self.byte_count, lock=False)
self.array_pool[i] = np.frombuffer(buf, dtype=self.dtype).reshape(self.shape)
self.free_arrays.put(i)
self.q = Queue(maxsize)
def put(self, item, *args, **kwargs):
if type(item) is np.ndarray:
if item.dtype == self.dtype and item.shape == self.shape and len(item.data)==self.byte_count:
# get the ID of an available shared-memory array
id = self.free_arrays.get()
# copy item to the shared-memory array
self.array_pool[id][:] = item
# put the array''s id (not the whole array) onto the queue
new_item = id
else:
raise ValueError(
''ndarray does not match type or shape of template used to initialize ArrayQueue''
)
else:
# not an ndarray
# put the original item on the queue (as a tuple, so we know it''s not an ID)
new_item = (item,)
self.q.put(new_item, *args, **kwargs)
def get(self, *args, **kwargs):
item = self.q.get(*args, **kwargs)
if type(item) is tuple:
# unpack the original item
return item[0]
else:
# item is the id of a shared-memory array
# copy the array
arr = self.array_pool[item].copy()
# put the shared-memory array back into the pool
self.free_arrays.put(item)
return arr
class __EndToken(object):
pass
def parallel_pipeline(buffer_size=50):
def parallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())
def parallel_generator(f_xs):
q = ArrayQueue(template=np.zeros(0,1,(500,2000)), maxsize=buffer_size)
consumer_process = Process(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x
def f_wrapper(xs):
return parallel_generator(f(xs))
return f_wrapper
return parallel_pipeline_with_args
@parallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0
@parallel_pipeline(3)
def g(xs):
for x in xs:
yield x * 3
@parallel_pipeline(3)
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
print "multiprocessing with shared-memory arrays:"
%time print sum(r.sum() for r in f(g(h(xs())))) # 13.5s
Procesamiento paralelo de muestras en lugar de funciones
El código anterior es solo aproximadamente un 20% más rápido que una versión de un único subproceso (12,2 segundos frente a 14,8 segundos para la versión en serie que se muestra a continuación). Esto se debe a que cada función se ejecuta en un único subproceso o proceso, y la mayor parte del trabajo se realiza mediante xs (). El tiempo de ejecución para el ejemplo anterior es casi el mismo que si acaba de ejecutar %time print sum(1 for x in xs())
.
Si su proyecto real tiene muchas más funciones intermedias y / o son más complejas que las que mostró, entonces la carga de trabajo puede distribuirse mejor entre los procesadores, y esto puede no ser un problema. Sin embargo, si su carga de trabajo realmente se asemeja al código que proporcionó, entonces es posible que desee refactorizar su código para asignar una muestra a cada hilo en lugar de una función a cada hilo. Eso se vería como el siguiente código (se muestran las versiones de subprocesamiento y multiprocesamiento):
import multiprocessing
import threading, Queue
import numpy as np
def f(x):
return x + 1.0
def g(x):
return x * 3
def h(x):
return x * x
def final(i):
return f(g(h(x(i))))
def final_sum(i):
return f(g(h(x(i)))).sum()
def x(i):
# produce sample number i
return np.random.uniform(0, 1, (500, 2000))
def rs_serial(func, n):
for i in range(n):
yield func(i)
def rs_parallel_threaded(func, n):
todo = range(n)
q = Queue.Queue(2*n_workers)
def worker():
while True:
try:
# the global interpreter lock ensures only one thread does this at a time
i = todo.pop()
q.put(func(i))
except IndexError:
# none left to do
q.put(None)
break
threads = []
for j in range(n_workers):
t = threading.Thread(target=worker)
t.daemon=False
threads.append(t) # in case it''s needed later
t.start()
while True:
x = q.get()
if x is None:
break
else:
yield x
def rs_parallel_mp(func, n):
pool = multiprocessing.Pool(n_workers)
return pool.imap_unordered(func, range(n))
n_workers = 4
n_samples = 1000
print "serial:" # 14.8s
%time print sum(r.sum() for r in rs_serial(final, n_samples))
print "threaded:" # 10.1s
%time print sum(r.sum() for r in rs_parallel_threaded(final, n_samples))
print "mp return arrays:" # 19.6s
%time print sum(r.sum() for r in rs_parallel_mp(final, n_samples))
print "mp return results:" # 8.4s
%time print sum(r_sum for r_sum in rs_parallel_mp(final_sum, n_samples))
La versión enhebrada de este código es solo un poco más rápida que el primer ejemplo que di, y solo un 30% más rápido que la versión en serie. Esa no es una aceleración tan grande como hubiera esperado; tal vez Python todavía se está empantanado en parte por el GIL?
La versión de multiprocesamiento tiene un rendimiento significativamente más rápido que el código de multiprocesamiento original, principalmente porque todas las funciones se encadenan juntas en un único proceso, en lugar de poner en cola (y decapado) los resultados intermedios. Sin embargo, es aún más lenta que la versión en serie porque todas las matrices de resultados tienen que ser escabeche (en el proceso de trabajo) y deshechas (en el proceso principal) antes de ser devueltas por imap_unordered. Sin embargo, si puede organizarlo para que su canalización arroje resultados agregados en lugar de las matrices completas, entonces puede evitar la sobrecarga de decapado, y la versión de multiprocesamiento es la más rápida: aproximadamente un 43% más rápida que la versión en serie.
Bien, ahora, para completar, aquí hay una versión del segundo ejemplo que usa multiprocesamiento con las funciones originales de su generador en lugar de las funciones de escala más fina que se muestran arriba. Esto utiliza algunos trucos para separar las muestras entre múltiples procesos, lo que puede hacer que no sea adecuado para muchos flujos de trabajo. Pero el uso de generadores parece ser un poco más rápido que usar las funciones de escala más fina, y este método puede brindarle hasta un 54% de aceleración frente a la versión en serie que se muestra arriba. Sin embargo, eso solo está disponible si no necesita devolver las matrices completas de las funciones de trabajador.
import multiprocessing, itertools, math
import numpy as np
def f(xs):
for x in xs:
yield x + 1.0
def g(xs):
for x in xs:
yield x * 3
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
def final():
return f(g(h(xs())))
def final_sum():
for x in f(g(h(xs()))):
yield x.sum()
def get_chunk(args):
"""Retrieve n values (n=args[1]) from a generator function (f=args[0]) and return them as a list.
This runs in a worker process and does all the computation."""
return list(itertools.islice(args[0](), args[1]))
def parallelize(gen_func, max_items, n_workers=4, chunk_size=50):
"""Pull up to max_items items from several copies of gen_func, in small groups in parallel processes.
chunk_size should be big enough to improve efficiency (one copy of gen_func will be run for each chunk)
but small enough to avoid exhausting memory (each worker will keep chunk_size items in memory)."""
pool = multiprocessing.Pool(n_workers)
# how many chunks will be needed to yield at least max_items items?
n_chunks = int(math.ceil(float(max_items)/float(chunk_size)))
# generate a suitable series of arguments for get_chunk()
args_list = itertools.repeat((gen_func, chunk_size), n_chunks)
# chunk_gen will yield a series of chunks (lists of results) from the generator function,
# totaling n_chunks * chunk_size items (which is >= max_items)
chunk_gen = pool.imap_unordered(get_chunk, args_list)
# parallel_gen flattens the chunks, and yields individual items
parallel_gen = itertools.chain.from_iterable(chunk_gen)
# limit the output to max_items items
return itertools.islice(parallel_gen, max_items)
# in this case, the parallel version is slower than a single process, probably
# due to overhead of gathering numpy arrays in imap_unordered (via pickle?)
print "serial, return arrays:" # 15.3s
%time print sum(r.sum() for r in final())
print "parallel, return arrays:" # 24.2s
%time print sum(r.sum() for r in parallelize(final, max_items=1000))
# in this case, the parallel version is more than twice as fast as the single-thread version
print "serial, return result:" # 15.1s
%time print sum(r for r in final_sum())
print "parallel, return result:" # 6.8s
%time print sum(r for r in parallelize(final_sum, max_items=1000))
Consulte el proyecto Pathos-multiprocesamiento , que evita la dependencia estándar de multiprocessing
en el decapado. Esto debería permitirle evitar las ineficiencias del decapado y darle acceso a la memoria común para recursos compartidos de solo lectura. Tenga en cuenta que, si bien Pathos se está acercando al despliegue en un paquete de pip completo, en el ínterin recomendaría instalar con pip install git+https://github.com/uqfoundation/pathos
Parece que su ejemplo no se ejecuta en mi computadora, aunque eso puede tener que ver con el hecho de que estoy ejecutando Windows (problemas que __main__
cualquier cosa que no __main__
en el espacio de nombres __main__
(nada decorado)) ... ¿Algo como esto ayudaría? (Tendría que ponerlo y desempaquetar dentro de cada uno de f (), g () y h ())
Nota * No estoy seguro de que esto sea realmente más rápido ... Solo una puñalada de lo que otros han sugerido ..
from multiprocessing import Process, freeze_support
from multiprocessing.sharedctypes import Value, Array
import numpy as np
def package(arr):
shape = Array(''i'', arr.shape, lock=False)
if arr.dtype == float:
ctype = Value(''c'', b''d'') #d for double #f for single
if arr.dtype == int:
ctype = Value(''c'', b''i'') #if statements could be avoided if data is always the same
data = Array(ctype.value, arr.reshape(-1),lock=False)
return data, shape
def unpack(data, shape):
return np.array(data[:]).reshape(shape[:])
#test
def f(args):
print(unpack(*args))
if __name__ == ''__main__'':
freeze_support()
a = np.array([1,2,3,4,5])
a_packed = package(a)
print(''array has been packaged'')
p = Process(target=f, args=(a_packed,))
print(''passing to parallel process'')
p.start()
print(''joining to parent process'')
p.join()
print(''finished'')