una - Compartiendo matrices de números contiguos entre procesos en Python
producto de matrices en python (1)
Envuelve el ndarray de ndarray
alrededor de ndarray
multiprocesamiento RawArray()
Hay varias formas de compartir matrices numpy en memoria a través de procesos. Veamos cómo puede hacerlo utilizando el módulo de multiprocesamiento .
La primera observación importante es que numpy proporciona la función np.frombuffer()
para envolver una interfaz ndarray alrededor de un objeto preexistente que admite el protocolo de búfer (como bytes()
, bytearray()
, array()
etc.). Esto crea matrices de solo lectura a partir de objetos de solo lectura y matrices de escritura de objetos grabables.
Podemos combinar eso con la memoria compartida RawArray()
que proporciona el multiprocesamiento . Tenga en cuenta que Array()
no funciona para ese propósito, ya que es un objeto proxy con un bloqueo y no expone directamente la interfaz del búfer. Por supuesto, eso significa que debemos proporcionar la sincronización adecuada de nuestros RawArrays numpificados nosotros mismos.
Hay un problema que se complica con respecto a los arrays RawArrays envueltos con ndarray : cuando el multiprocesamiento envía una matriz de este tipo entre procesos, y de hecho, deberá enviar nuestros arrays, una vez creados, a ambos trabajadores: los escabeche y luego los elimina. Desafortunadamente, eso hace que se creen copias de las ndarrays en lugar de compartirlas en la memoria.
La solución, aunque un poco fea, es mantener los RawArrays tal como están hasta que se transfieran a los trabajadores y solo envolverlos en ndarrays una vez que haya comenzado el proceso de cada trabajador .
Además, habría sido preferible comunicar matrices, ya sea una simple RawArray o una ndarray- wrapped, directamente a través de un multiprocessing.Queue
, pero eso tampoco funciona. Un RawArray no se puede colocar dentro de una Cola de este tipo y un ndarray -wrapped uno habría sido decapado y no recogido, por lo que, en efecto, copiado.
La solución consiste en enviar una lista de todas las matrices asignadas previamente a los procesos de trabajo y comunicar los índices a esa lista a través de las colas . Es muy parecido a pasar tokens (los índices) y quien tenga el token puede operar en la matriz asociada.
La estructura del programa principal podría verse así:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import queue
from multiprocessing import freeze_support, set_start_method
from multiprocessing import Event, Process, Queue
from multiprocessing.sharedctypes import RawArray
def create_shared_arrays(size, dtype=np.int32, num=2):
dtype = np.dtype(dtype)
if dtype.isbuiltin and dtype.char in ''bBhHiIlLfd'':
typecode = dtype.char
else:
typecode, size = ''B'', size * dtype.itemsize
return [RawArray(typecode, size) for _ in range(num)]
def main():
my_dtype = np.float32
# 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage
arrays = create_shared_arrays(125000000, dtype=my_dtype)
q_free = Queue()
q_used = Queue()
bail = Event()
for arr_id in range(len(arrays)):
q_free.put(arr_id) # pre-fill free queue with allocated array indices
pr1 = MyDataLoader(arrays, q_free, q_used, bail,
dtype=my_dtype, step=1024)
pr2 = MyDataProcessor(arrays, q_free, q_used, bail,
dtype=my_dtype, step=1024)
pr1.start()
pr2.start()
pr2.join()
print("/n{} joined.".format(pr2.name))
pr1.join()
print("{} joined.".format(pr1.name))
if __name__ == ''__main__'':
freeze_support()
# On Windows, only "spawn" is available.
# Also, this tests proper sharing of the arrays without "cheating".
set_start_method(''spawn'')
main()
Esto prepara una lista de dos matrices, dos colas : una cola "libre" donde MyDataProcessor coloca los índices de matriz con los que se realiza y MyDataLoader las obtiene, así como una cola "usada" donde MyDataLoader coloca índices de las matrices que se llenan fácilmente y MyDataProcessor las busca de - y un multiprocessing.Event
para iniciar un rescate concertado de todos los trabajadores. Podríamos eliminar este último por ahora, ya que solo tenemos un productor y un consumidor de arreglos, pero no está de más estar preparado para más trabajadores.
Luego, rellenamos la cola "vacía" con todos los índices de nuestros RawArrays en la lista y creamos una instancia de cada tipo de trabajadores, pasándoles los objetos de comunicación necesarios. Empezamos ambos y solo esperamos a que se join()
.
Así es como podría verse MyDataProcessor , que consume índices de matriz de la cola "usada" y envía los datos a alguna caja negra externa ( debugio.output
en el ejemplo):
class MyDataProcessor(Process):
def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
super().__init__()
self.arrays = arrays
self.q_free = q_free
self.q_used = q_used
self.bail = bail
self.dtype = dtype
self.step = step
def run(self):
# wrap RawArrays inside ndarrays
arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]
from debugio import output as writer
while True:
arr_id = self.q_used.get()
if arr_id is None:
break
arr = arrays[arr_id]
print(''('', end='''', flush=True) # just visualizing activity
for j in range(0, len(arr), self.step):
writer.write(str(arr[j]) + ''/n'')
print('')'', end='''', flush=True) # just visualizing activity
self.q_free.put(arr_id)
writer.flush()
self.bail.set() # tell loaders to bail out ASAP
self.q_free.put(None, timeout=1) # wake up loader blocking on get()
try:
while True:
self.q_used.get_nowait() # wake up loader blocking on put()
except queue.Empty:
pass
Lo primero que hace es envolver los RawArrays recibidos en ndarrays usando ''np.frombuffer ()'' y mantener la nueva lista, por lo que se pueden usar como matrices numpy durante el tiempo de ejecución del proceso y no tiene que envolverlos una y otra vez .
Tenga en cuenta también que MyDataProcessor solo escribe en el evento self.bail
, nunca lo verifica. En cambio, si necesita que se le diga que salga, encontrará una marca de None
en la cola en lugar de un índice de matriz. Esto se hace cuando MyDataLoader no tiene más datos disponibles e inicia el proceso de desmontaje , MyDataProcessor todavía puede procesar todas las matrices válidas que están en la cola sin salir prematuramente.
Así es como se vería MyDataLoader :
class MyDataLoader(Process):
def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1):
super().__init__()
self.arrays = arrays
self.q_free = q_free
self.q_used = q_used
self.bail = bail
self.dtype = dtype
self.step = step
def run(self):
# wrap RawArrays inside ndarrays
arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays]
from debugio import input as reader
for _ in range(10): # for testing we end after a set amount of passes
if self.bail.is_set():
# we were asked to bail out while waiting on put()
return
arr_id = self.q_free.get()
if arr_id is None:
# we were asked to bail out while waiting on get()
self.q_free.put(None, timeout=1) # put it back for next loader
return
if self.bail.is_set():
# we were asked to bail out while we got a normal array
return
arr = arrays[arr_id]
eof = False
print(''<'', end='''', flush=True) # just visualizing activity
for j in range(0, len(arr), self.step):
line = reader.readline()
if not line:
eof = True
break
arr[j] = np.fromstring(line, dtype=self.dtype, sep=''/n'')
if eof:
print(''EOF>'', end='''', flush=True) # just visualizing activity
break
print(''>'', end='''', flush=True) # just visualizing activity
if self.bail.is_set():
# we were asked to bail out while we filled the array
return
self.q_used.put(arr_id) # tell processor an array is filled
if not self.bail.is_set():
self.bail.set() # tell other loaders to bail out ASAP
# mark end of data for processor as we are the first to bail out
self.q_used.put(None)
Es muy similar en estructura al otro trabajador. La razón por la que se infla un poco es porque controla el evento self.bail
en muchos puntos, a fin de reducir la posibilidad de atascarse. (No es completamente infalible, ya que hay una pequeña posibilidad de que el Evento se pueda configurar entre la verificación y el acceso a la Cola . Si ese es un problema, uno necesita usar un acceso de arbitraje primitivo de sincronización tanto al Evento como a la Cola combinados).
También envuelve los RawArrays recibidos en ndarrays al principio y lee datos de una caja negra externa ( debugio.input
en el ejemplo).
Tenga en cuenta que al jugar con los argumentos step=
para ambos trabajadores en la función main()
, podemos cambiar la proporción de la lectura y la escritura (estrictamente para fines de prueba: en un entorno de producción, step=
sería 1
, leyendo y escribiendo todos los miembros de la matriz numpy ).
El aumento de ambos valores hace que los trabajadores solo accedan a algunos de los valores de las matrices numpy , lo que acelera significativamente todo, lo que demuestra que el rendimiento no está limitado por la comunicación entre los procesos de los trabajadores. Si hubiéramos colocado matrices numpy directamente en las colas , copiándolos entre los procesos en su totalidad, aumentando el tamaño del paso no habría mejorado significativamente el rendimiento, habría permanecido lento.
Para referencia, aquí está el módulo de debugio
que usé para probar:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from ast import literal_eval
from io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapper
class DebugInput(RawIOBase):
def __init__(self, end=None):
if end is not None and end < 0:
raise ValueError("end must be non-negative")
super().__init__()
self.pos = 0
self.end = end
def readable(self):
return True
def read(self, size=-1):
if self.end is None:
if size < 0:
raise NotImplementedError("size must be non-negative")
end = self.pos + size
elif size < 0:
end = self.end
else:
end = min(self.pos + size, self.end)
lines = []
while self.pos < end:
offset = self.pos % 400
pos = self.pos - offset
if offset < 18:
i = (offset + 2) // 2
pos += i * 2 - 2
elif offset < 288:
i = (offset + 12) // 3
pos += i * 3 - 12
else:
i = (offset + 112) // 4
pos += i * 4 - 112
line = str(i).encode(''ascii'') + b''/n''
line = line[self.pos - pos:end - pos]
self.pos += len(line)
size -= len(line)
lines.append(line)
return b''''.join(lines)
def readinto(self, b):
data = self.read(len(b))
b[:len(data)] = data
return len(data)
def seekable(self):
return True
def seek(self, offset, whence=0):
if whence == 0:
pos = offset
elif whence == 1:
pos = self.pos + offset
elif whence == 2:
if self.end is None:
raise ValueError("cannot seek to end of infinite stream")
pos = self.end + offset
else:
raise NotImplementedError("unknown whence value")
self.pos = max((pos if self.end is None else min(pos, self.end)), 0)
return self.pos
class DebugOutput(RawIOBase):
def __init__(self):
super().__init__()
self.buf = b''''
self.num = 1
def writable(self):
return True
def write(self, b):
*lines, self.buf = (self.buf + b).split(b''/n'')
for line in lines:
value = literal_eval(line.decode(''ascii''))
if value != int(value) or int(value) & 255 != self.num:
raise ValueError("expected {}, got {}".format(self.num, value))
self.num = self.num % 127 + 1
return len(b)
input = TextIOWrapper(BufferedReader(DebugInput()), encoding=''ascii'')
output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding=''ascii'')
Si bien he encontrado numerosas respuestas a preguntas similares a las mías, no creo que se haya abordado directamente aquí, y tengo varias preguntas adicionales. La motivación para compartir matrices de números contiguos es la siguiente:
- Estoy usando una red neuronal convolucional que se ejecuta en Caffe para realizar una regresión en imágenes a una serie de etiquetas de valor continuo.
- Las imágenes requieren preprocesamiento específico y aumento de datos.
- Las restricciones de (1) la naturaleza continua de las etiquetas (son flotantes) y (2) el aumento de datos significa que estoy preprocesando los datos en python y luego los sirvo como matrices numpy contiguas utilizando los datos en memoria Capa en Caffe.
- Cargar los datos de entrenamiento en la memoria es comparativamente lento. Me gustaría paralelizarlo de tal manera que:
(1) El python que estoy escribiendo crea una clase de "manejador de datos" que crea instancias de dos matrices de números contiguos. (2) Un proceso de trabajo alterna entre esas matrices numpy, cargando los datos del disco, realizando el preprocesamiento e insertando los datos en la matriz numpy. (3) Mientras tanto, las envolturas Python Caffe envían datos de la otra matriz a la GPU para que se ejecuten a través de la red.
Tengo algunas preguntas:
¿Es posible asignar memoria en una matriz de números contiguos y luego envolverla en un objeto de memoria compartida (no estoy seguro de que ''objeto'' sea el término correcto aquí) usando algo como la clase Array del multiprocesamiento de python?
Los arrays de Numpy tienen un atributo .ctypes, supongo que esto es útil para la creación de instancias de arrays de memoria compartida desde Array (), pero parece que no puede determinar con precisión cómo usarlos.
Si la memoria compartida se crea una instancia sin la matriz numpy, ¿permanece contigua? Si no es así, ¿hay alguna manera de asegurar que permanezca contiguo?
¿Es posible hacer algo como:
import numpy as np
from multiprocessing import Array
contArr = np.ascontiguousarray(np.zeros((n_images, n_channels, img_height, img_width)), dtype=np.float32)
sm_contArr = Array(contArr.ctypes.?, contArr?)
Luego ejemplifique al trabajador con
p.append(Process(target=some_worker_function, args=(data_to_load, sm_contArr)))
p.start()
¡Gracias!
Edición: Soy consciente de que hay varias bibliotecas que tienen funciones similares en diferentes estados de mantenimiento. Preferiría restringirlo a python y numpy puros, pero si eso no fuera posible, por supuesto estaría dispuesto a usar uno.