vectores universidad tutorial multiplicar matrices graficar ejemplos como alicante python scipy apache-spark-mllib dask joblib

universidad - tutorial python numpy



Procesamiento fuera del nĂșcleo de matrices CSR dispersas (1)

Así que no sé nada sobre joblib o dask, y mucho menos sobre el formato de datos específico de su aplicación. Pero en realidad es posible leer matrices dispersas del disco en fragmentos al tiempo que se conserva la estructura de datos dispersa.

Si bien el artículo de Wikipedia para el formato CSR hace un gran trabajo explicando cómo funciona, voy a hacer una breve recapitulación:

Algunas Matriz dispersa, por ejemplo:

1 0 2 0 0 3 4 5 6

se almacena recordando cada valor distinto de cero y la columna en la que reside:

sparse.data = 1 2 3 4 5 6 # acutal value sparse.indices = 0 2 2 0 1 2 # number of column (0-indexed)

Ahora todavía nos faltan las filas. El formato comprimido simplemente almacena cuántos valores distintos de cero hay en cada fila, en lugar de almacenar cada fila de valores individuales.

Tenga en cuenta que el recuento distinto de cero también se acumula, por lo que la siguiente matriz contiene el número de valores distintos de cero hasta e incluyendo esta fila. Para complicar aún más las cosas, la matriz siempre comienza con un 0 y, por lo tanto, contiene num_rows + 1 entradas:

sparse.indptr = 0 2 3 6

así que hasta e incluyendo la segunda fila hay 3 valores distintos de cero, a saber, 1, 2 y 3.

Ya que hemos solucionado esto, podemos comenzar a ''rebanar'' la matriz. El objetivo es construir los datos, los índices y las matrices indptr para algunos fragmentos. Supongamos que la enorme matriz original se almacena en tres archivos binarios, que leeremos de forma incremental. Usamos un generador para producir repetidamente algo de espacio.

Para esto, necesitamos saber cuántos valores distintos de cero hay en cada fragmento y leer la cantidad de valores e índices de columna correspondientes. El recuento distinto de cero se puede leer convenientemente desde la matriz indptr. Esto se logra leyendo cierta cantidad de entradas del enorme archivo indptr que corresponde al tamaño de fragmento deseado. La última entrada de esa parte del archivo indptr menos la cantidad de valores distintos de cero proporciona el número de no ceros en ese fragmento. Por lo tanto, los conjuntos de datos e índices de fragmentos se recortan de los archivos de datos e índices grandes. La matriz indptr debe ir precedida artificialmente por un cero (eso es lo que quiere el formato, no me preguntes: D).

Entonces podemos simplemente construir una matriz dispersa con los datos de fragmentos, índices e indptr para obtener una nueva matriz dispersa.

Se debe tener en cuenta que el tamaño real de la matriz no se puede reconstruir directamente a partir de las tres matrices. Es el índice de columna máximo de la matriz, o si tiene mala suerte y no hay datos en el trozo indeterminados. Entonces también necesitamos pasar el conteo de columnas.

Probablemente explique las cosas de una manera bastante complicada, así que solo lea esto como una pieza opaca de código, que implementa dicho generador:

import numpy as np import scipy.sparse def gen_batches(batch_size, sparse_data_path, sparse_indices_path, sparse_indptr_path, dtype=np.float32, column_size=None): data_item_size = dtype().itemsize with open(sparse_data_path, ''rb'') as data_file, / open(sparse_indices_path, ''rb'') as indices_file, / open(sparse_indptr_path, ''rb'') as indptr_file: nnz_before = np.fromstring(indptr_file.read(4), dtype=np.int32) while True: indptr_batch = np.frombuffer(nnz_before.tobytes() + indptr_file.read(4*batch_size), dtype=np.int32) if len(indptr_batch) == 1: break batch_indptr = indptr_batch - nnz_before nnz_before = indptr_batch[-1] batch_nnz = np.asscalar(batch_indptr[-1]) batch_data = np.frombuffer(data_file.read( data_item_size * batch_nnz), dtype=dtype) batch_indices = np.frombuffer(indices_file.read( 4 * batch_nnz), dtype=np.int32) dimensions = (len(indptr_batch)-1, column_size) matrix = scipy.sparse.csr_matrix((batch_data, batch_indices, batch_indptr), shape=dimensions) yield matrix if __name__ == ''__main__'': sparse = scipy.sparse.random(5, 4, density=0.1, format=''csr'', dtype=np.float32) sparse.data.tofile(''sparse.data'') # dtype as specified above ^^^^^^^^^^ sparse.indices.tofile(''sparse.indices'') # dtype=int32 sparse.indptr.tofile(''sparse.indptr'') # dtype=int32 print(sparse.toarray()) print(''========'') for batch in gen_batches(2, ''sparse.data'', ''sparse.indices'', ''sparse.indptr'', column_size=4): print(batch.toarray())

numpy.ndarray.tofile () solo almacena matrices binarias, por lo que debe recordar el formato de datos. scipy.sparse representa los índices y indptr como int32, por lo que es una limitación para el tamaño total de la matriz.

También comparé el código con la evaluación comparativa y encontré que el constructor de la matriz scsy csr es el cuello de botella para matrices pequeñas. Su millaje puede variar aunque esto es solo una ''prueba de principio''.

Si hay una necesidad de una implementación más sofisticada, o algo es demasiado obsceno, solo dame un golpe :)

¿Cómo se puede aplicar alguna función en paralelo en fragmentos de una matriz CSR dispersa guardada en el disco usando Python? Secuencialmente, esto podría hacerse, por ejemplo, guardando el arreglo CSR con joblib.dump abriéndolo con joblib.load(.., mmap_mode="r") y procesando los fragmentos de filas uno por uno. ¿Podría hacerse esto de manera más eficiente con dask ?

En particular, suponiendo que uno no necesita todas las operaciones posibles de núcleo en matrices dispersas, sino solo la capacidad de cargar bloques de fila en paralelo (cada fragmento es una matriz de CSR) y aplicarles alguna función (en mi caso, sería ser, por ejemplo, estimator.predict(X) de scikit-learn).

Además, ¿hay algún formato de archivo en el disco que sea adecuado para esta tarea? Joblib funciona pero no estoy seguro del rendimiento (paralelo) de las matrices CSR cargadas como mapas de memoria; spark.mllib parece usar algún formato de almacenamiento disperso personalizado (que no parece tener un analizador de Python puro) o formato LIBSVM (el analizador en scikit-learn es, en mi experiencia, mucho más lento que joblib.dump ) .. .

Nota: He leído documentation , varios problemas al respecto en https://github.com/dask/dask/ pero todavía no estoy seguro de cómo abordar este problema de la mejor manera.

Editar: para dar un ejemplo más práctico, a continuación se muestra el código que funciona en dask para matrices densas pero falla al usar matrices dispersas con este error ,

import numpy as np import scipy.sparse import joblib import dask.array as da from sklearn.utils import gen_batches np.random.seed(42) joblib.dump(np.random.rand(100000, 1000), ''X_dense.pkl'') joblib.dump(scipy.sparse.random(10000, 1000000, format=''csr''), ''X_csr.pkl'') fh = joblib.load(''X_dense.pkl'', mmap_mode=''r'') # computing the results without dask results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size)) # computing the results with dask x = da.from_array(fh, chunks=(2000)) results = x.sum(axis=1).compute()

Editar2: siguiendo la discusión a continuación, el ejemplo a continuación supera el error anterior pero obtiene los de IndexError: tuple index out of range en dask/array/core.py:L3413 ,

import dask # +imports from the example above dask.set_options(get=dask.get) # disable multiprocessing fh = joblib.load(''X_csr.pkl'', mmap_mode=''r'') def func(x): if x.ndim == 0: # dask does some heuristics with dummy data, if the x is a 0d array # the sum command would fail return x res = np.asarray(x.sum(axis=1, keepdims=True)) return res Xd = da.from_array(fh, chunks=(2000)) results_new = Xd.map_blocks(func).compute()