vectores una transponer producto norma multiplicación matriz matrices inversa codigo clase python numpy multiprocessing shared-memory caffe

una - Compartiendo matrices de números contiguos entre procesos en Python



producto de matrices en python (1)

Envuelve el ndarray de ndarray alrededor de ndarray multiprocesamiento RawArray()

Hay varias formas de compartir matrices numpy en memoria a través de procesos. Veamos cómo puede hacerlo utilizando el módulo de multiprocesamiento .

La primera observación importante es que numpy proporciona la función np.frombuffer() para envolver una interfaz ndarray alrededor de un objeto preexistente que admite el protocolo de búfer (como bytes() , bytearray() , array() etc.). Esto crea matrices de solo lectura a partir de objetos de solo lectura y matrices de escritura de objetos grabables.

Podemos combinar eso con la memoria compartida RawArray() que proporciona el multiprocesamiento . Tenga en cuenta que Array() no funciona para ese propósito, ya que es un objeto proxy con un bloqueo y no expone directamente la interfaz del búfer. Por supuesto, eso significa que debemos proporcionar la sincronización adecuada de nuestros RawArrays numpificados nosotros mismos.

Hay un problema que se complica con respecto a los arrays RawArrays envueltos con ndarray : cuando el multiprocesamiento envía una matriz de este tipo entre procesos, y de hecho, deberá enviar nuestros arrays, una vez creados, a ambos trabajadores: los escabeche y luego los elimina. Desafortunadamente, eso hace que se creen copias de las ndarrays en lugar de compartirlas en la memoria.

La solución, aunque un poco fea, es mantener los RawArrays tal como están hasta que se transfieran a los trabajadores y solo envolverlos en ndarrays una vez que haya comenzado el proceso de cada trabajador .

Además, habría sido preferible comunicar matrices, ya sea una simple RawArray o una ndarray- wrapped, directamente a través de un multiprocessing.Queue , pero eso tampoco funciona. Un RawArray no se puede colocar dentro de una Cola de este tipo y un ndarray -wrapped uno habría sido decapado y no recogido, por lo que, en efecto, copiado.

La solución consiste en enviar una lista de todas las matrices asignadas previamente a los procesos de trabajo y comunicar los índices a esa lista a través de las colas . Es muy parecido a pasar tokens (los índices) y quien tenga el token puede operar en la matriz asociada.

La estructura del programa principal podría verse así:

#!/usr/bin/env python3 # -*- coding: utf-8 -*- import numpy as np import queue from multiprocessing import freeze_support, set_start_method from multiprocessing import Event, Process, Queue from multiprocessing.sharedctypes import RawArray def create_shared_arrays(size, dtype=np.int32, num=2): dtype = np.dtype(dtype) if dtype.isbuiltin and dtype.char in ''bBhHiIlLfd'': typecode = dtype.char else: typecode, size = ''B'', size * dtype.itemsize return [RawArray(typecode, size) for _ in range(num)] def main(): my_dtype = np.float32 # 125000000 (size) * 4 (dtype) * 2 (num) ~= 1 GB memory usage arrays = create_shared_arrays(125000000, dtype=my_dtype) q_free = Queue() q_used = Queue() bail = Event() for arr_id in range(len(arrays)): q_free.put(arr_id) # pre-fill free queue with allocated array indices pr1 = MyDataLoader(arrays, q_free, q_used, bail, dtype=my_dtype, step=1024) pr2 = MyDataProcessor(arrays, q_free, q_used, bail, dtype=my_dtype, step=1024) pr1.start() pr2.start() pr2.join() print("/n{} joined.".format(pr2.name)) pr1.join() print("{} joined.".format(pr1.name)) if __name__ == ''__main__'': freeze_support() # On Windows, only "spawn" is available. # Also, this tests proper sharing of the arrays without "cheating". set_start_method(''spawn'') main()

Esto prepara una lista de dos matrices, dos colas : una cola "libre" donde MyDataProcessor coloca los índices de matriz con los que se realiza y MyDataLoader las obtiene, así como una cola "usada" donde MyDataLoader coloca índices de las matrices que se llenan fácilmente y MyDataProcessor las busca de - y un multiprocessing.Event para iniciar un rescate concertado de todos los trabajadores. Podríamos eliminar este último por ahora, ya que solo tenemos un productor y un consumidor de arreglos, pero no está de más estar preparado para más trabajadores.

Luego, rellenamos la cola "vacía" con todos los índices de nuestros RawArrays en la lista y creamos una instancia de cada tipo de trabajadores, pasándoles los objetos de comunicación necesarios. Empezamos ambos y solo esperamos a que se join() .

Así es como podría verse MyDataProcessor , que consume índices de matriz de la cola "usada" y envía los datos a alguna caja negra externa ( debugio.output en el ejemplo):

class MyDataProcessor(Process): def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1): super().__init__() self.arrays = arrays self.q_free = q_free self.q_used = q_used self.bail = bail self.dtype = dtype self.step = step def run(self): # wrap RawArrays inside ndarrays arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays] from debugio import output as writer while True: arr_id = self.q_used.get() if arr_id is None: break arr = arrays[arr_id] print(''('', end='''', flush=True) # just visualizing activity for j in range(0, len(arr), self.step): writer.write(str(arr[j]) + ''/n'') print('')'', end='''', flush=True) # just visualizing activity self.q_free.put(arr_id) writer.flush() self.bail.set() # tell loaders to bail out ASAP self.q_free.put(None, timeout=1) # wake up loader blocking on get() try: while True: self.q_used.get_nowait() # wake up loader blocking on put() except queue.Empty: pass

Lo primero que hace es envolver los RawArrays recibidos en ndarrays usando ''np.frombuffer ()'' y mantener la nueva lista, por lo que se pueden usar como matrices numpy durante el tiempo de ejecución del proceso y no tiene que envolverlos una y otra vez .

Tenga en cuenta también que MyDataProcessor solo escribe en el evento self.bail , nunca lo verifica. En cambio, si necesita que se le diga que salga, encontrará una marca de None en la cola en lugar de un índice de matriz. Esto se hace cuando MyDataLoader no tiene más datos disponibles e inicia el proceso de desmontaje , MyDataProcessor todavía puede procesar todas las matrices válidas que están en la cola sin salir prematuramente.

Así es como se vería MyDataLoader :

class MyDataLoader(Process): def __init__(self, arrays, q_free, q_used, bail, dtype=np.int32, step=1): super().__init__() self.arrays = arrays self.q_free = q_free self.q_used = q_used self.bail = bail self.dtype = dtype self.step = step def run(self): # wrap RawArrays inside ndarrays arrays = [np.frombuffer(arr, dtype=self.dtype) for arr in self.arrays] from debugio import input as reader for _ in range(10): # for testing we end after a set amount of passes if self.bail.is_set(): # we were asked to bail out while waiting on put() return arr_id = self.q_free.get() if arr_id is None: # we were asked to bail out while waiting on get() self.q_free.put(None, timeout=1) # put it back for next loader return if self.bail.is_set(): # we were asked to bail out while we got a normal array return arr = arrays[arr_id] eof = False print(''<'', end='''', flush=True) # just visualizing activity for j in range(0, len(arr), self.step): line = reader.readline() if not line: eof = True break arr[j] = np.fromstring(line, dtype=self.dtype, sep=''/n'') if eof: print(''EOF>'', end='''', flush=True) # just visualizing activity break print(''>'', end='''', flush=True) # just visualizing activity if self.bail.is_set(): # we were asked to bail out while we filled the array return self.q_used.put(arr_id) # tell processor an array is filled if not self.bail.is_set(): self.bail.set() # tell other loaders to bail out ASAP # mark end of data for processor as we are the first to bail out self.q_used.put(None)

Es muy similar en estructura al otro trabajador. La razón por la que se infla un poco es porque controla el evento self.bail en muchos puntos, a fin de reducir la posibilidad de atascarse. (No es completamente infalible, ya que hay una pequeña posibilidad de que el Evento se pueda configurar entre la verificación y el acceso a la Cola . Si ese es un problema, uno necesita usar un acceso de arbitraje primitivo de sincronización tanto al Evento como a la Cola combinados).

También envuelve los RawArrays recibidos en ndarrays al principio y lee datos de una caja negra externa ( debugio.input en el ejemplo).

Tenga en cuenta que al jugar con los argumentos step= para ambos trabajadores en la función main() , podemos cambiar la proporción de la lectura y la escritura (estrictamente para fines de prueba: en un entorno de producción, step= sería 1 , leyendo y escribiendo todos los miembros de la matriz numpy ).

El aumento de ambos valores hace que los trabajadores solo accedan a algunos de los valores de las matrices numpy , lo que acelera significativamente todo, lo que demuestra que el rendimiento no está limitado por la comunicación entre los procesos de los trabajadores. Si hubiéramos colocado matrices numpy directamente en las colas , copiándolos entre los procesos en su totalidad, aumentando el tamaño del paso no habría mejorado significativamente el rendimiento, habría permanecido lento.

Para referencia, aquí está el módulo de debugio que usé para probar:

#!/usr/bin/env python3 # -*- coding: utf-8 -*- from ast import literal_eval from io import RawIOBase, BufferedReader, BufferedWriter, TextIOWrapper class DebugInput(RawIOBase): def __init__(self, end=None): if end is not None and end < 0: raise ValueError("end must be non-negative") super().__init__() self.pos = 0 self.end = end def readable(self): return True def read(self, size=-1): if self.end is None: if size < 0: raise NotImplementedError("size must be non-negative") end = self.pos + size elif size < 0: end = self.end else: end = min(self.pos + size, self.end) lines = [] while self.pos < end: offset = self.pos % 400 pos = self.pos - offset if offset < 18: i = (offset + 2) // 2 pos += i * 2 - 2 elif offset < 288: i = (offset + 12) // 3 pos += i * 3 - 12 else: i = (offset + 112) // 4 pos += i * 4 - 112 line = str(i).encode(''ascii'') + b''/n'' line = line[self.pos - pos:end - pos] self.pos += len(line) size -= len(line) lines.append(line) return b''''.join(lines) def readinto(self, b): data = self.read(len(b)) b[:len(data)] = data return len(data) def seekable(self): return True def seek(self, offset, whence=0): if whence == 0: pos = offset elif whence == 1: pos = self.pos + offset elif whence == 2: if self.end is None: raise ValueError("cannot seek to end of infinite stream") pos = self.end + offset else: raise NotImplementedError("unknown whence value") self.pos = max((pos if self.end is None else min(pos, self.end)), 0) return self.pos class DebugOutput(RawIOBase): def __init__(self): super().__init__() self.buf = b'''' self.num = 1 def writable(self): return True def write(self, b): *lines, self.buf = (self.buf + b).split(b''/n'') for line in lines: value = literal_eval(line.decode(''ascii'')) if value != int(value) or int(value) & 255 != self.num: raise ValueError("expected {}, got {}".format(self.num, value)) self.num = self.num % 127 + 1 return len(b) input = TextIOWrapper(BufferedReader(DebugInput()), encoding=''ascii'') output = TextIOWrapper(BufferedWriter(DebugOutput()), encoding=''ascii'')

Si bien he encontrado numerosas respuestas a preguntas similares a las mías, no creo que se haya abordado directamente aquí, y tengo varias preguntas adicionales. La motivación para compartir matrices de números contiguos es la siguiente:

  • Estoy usando una red neuronal convolucional que se ejecuta en Caffe para realizar una regresión en imágenes a una serie de etiquetas de valor continuo.
  • Las imágenes requieren preprocesamiento específico y aumento de datos.
  • Las restricciones de (1) la naturaleza continua de las etiquetas (son flotantes) y (2) el aumento de datos significa que estoy preprocesando los datos en python y luego los sirvo como matrices numpy contiguas utilizando los datos en memoria Capa en Caffe.
  • Cargar los datos de entrenamiento en la memoria es comparativamente lento. Me gustaría paralelizarlo de tal manera que:

(1) El python que estoy escribiendo crea una clase de "manejador de datos" que crea instancias de dos matrices de números contiguos. (2) Un proceso de trabajo alterna entre esas matrices numpy, cargando los datos del disco, realizando el preprocesamiento e insertando los datos en la matriz numpy. (3) Mientras tanto, las envolturas Python Caffe envían datos de la otra matriz a la GPU para que se ejecuten a través de la red.

Tengo algunas preguntas:

  1. ¿Es posible asignar memoria en una matriz de números contiguos y luego envolverla en un objeto de memoria compartida (no estoy seguro de que ''objeto'' sea el término correcto aquí) usando algo como la clase Array del multiprocesamiento de python?

  2. Los arrays de Numpy tienen un atributo .ctypes, supongo que esto es útil para la creación de instancias de arrays de memoria compartida desde Array (), pero parece que no puede determinar con precisión cómo usarlos.

  3. Si la memoria compartida se crea una instancia sin la matriz numpy, ¿permanece contigua? Si no es así, ¿hay alguna manera de asegurar que permanezca contiguo?

¿Es posible hacer algo como:

import numpy as np from multiprocessing import Array contArr = np.ascontiguousarray(np.zeros((n_images, n_channels, img_height, img_width)), dtype=np.float32) sm_contArr = Array(contArr.ctypes.?, contArr?)

Luego ejemplifique al trabajador con

p.append(Process(target=some_worker_function, args=(data_to_load, sm_contArr))) p.start()

¡Gracias!

Edición: Soy consciente de que hay varias bibliotecas que tienen funciones similares en diferentes estados de mantenimiento. Preferiría restringirlo a python y numpy puros, pero si eso no fuera posible, por supuesto estaría dispuesto a usar uno.