vectorassembler stringindexer stopwords standardscaler apache-spark apache-spark-sql pyspark pca apache-spark-ml

apache spark - stringindexer - Pyspark y PCA: ¿cómo puedo extraer los vectores propios de este PCA? ¿Cómo puedo calcular la varianza que están explicando?



stopwords pyspark (3)

Estoy reduciendo la dimensionalidad de un Spark DataFrame con modelo PCA con pyspark (usando la biblioteca spark ml ) de la siguiente manera:

pca = PCA(k=3, inputCol="features", outputCol="pca_features") model = pca.fit(data)

donde los data son un Spark DataFrame con una columna labed features que es un DenseVector de 3 dimensiones:

data.take(1) Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u''class1'')

Después de la adaptación, transformo los datos:

transformed = model.transform(data) transformed.first() Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u''class1'', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))

Mi pregunta es: ¿cómo puedo extraer los vectores propios de esta PCA? ¿Cómo puedo calcular la varianza que están explicando?


La respuesta más fácil a su pregunta es ingresar una matriz de identidad a su modelo.

identity_input = [(Vectors.dense([1.0, .0, 0.0, .0, 0.0]),),(Vectors.dense([.0, 1.0, .0, .0, .0]),), / (Vectors.dense([.0, 0.0, 1.0, .0, .0]),),(Vectors.dense([.0, 0.0, .0, 1.0, .0]),), (Vectors.dense([.0, 0.0, .0, .0, 1.0]),)] df_identity = sqlContext.createDataFrame(identity_input,["features"]) identity_features = model.transform(df_identity)

Esto debería darte componentes principales.

Creo que la respuesta de eliasah es mejor en términos del marco Spark porque desertnaut está resolviendo el problema usando las funciones numpy en lugar de las acciones de Spark. Sin embargo, falta la respuesta de eliasah para normalizar los datos. Entonces, agregaría las siguientes líneas a la respuesta de eliasah:

from pyspark.ml.feature import StandardScaler standardizer = StandardScaler(withMean=True, withStd=False, inputCol=''features'', outputCol=''std_features'') model = standardizer.fit(df) output = model.transform(df) pca_features = output.select("std_features").rdd.map(lambda row : row[0]) mat = RowMatrix(pca_features) svd = computeSVD(mat,5,True)

Evaluativamente, svd.V e identity_features.select ("pca_features"). Collect () deberían tener valores idénticos.

Editar: resumí PCA y su uso en Spark y sklearn en esto aquí


Bueno, parece increíble, pero de hecho no hay una forma de extraer esa información de una descomposición de PCA (al menos a partir de Spark 1.5). Pero, nuevamente, ha habido muchas "quejas" similares; consulte aquí , por ejemplo, para no poder extraer los mejores parámetros de un CrossValidatorModel .

Afortunadamente, hace algunos meses, asistí al MOOC ''Scalable Machine Learning'' de AMPLab (Berkeley) & Databricks, es decir, los creadores de Spark, donde implementamos un pipeline completo de PCA ''a mano'' como parte de las asignaciones de tareas. Modifiqué mis funciones desde entonces (puede estar seguro, recibí todo el crédito :-), para trabajar con dataframes como entradas (en lugar de RDD), del mismo formato que el suyo (es decir, Filas de DenseVectors contienen las características numéricas) .

Primero tenemos que definir una función intermedia, estimatedCovariance , de la siguiente manera:

import numpy as np def estimateCovariance(df): """Compute the covariance matrix for a given dataframe. Note: The multi-dimensional covariance array should be calculated using outer products. Don''t forget to normalize the data by first subtracting the mean. Args: df: A Spark dataframe with a column named ''features'', which (column) consists of DenseVectors. Returns: np.ndarray: A multi-dimensional array where the number of rows and columns both equal the length of the arrays in the input dataframe. """ m = df.select(df[''features'']).map(lambda x: x[0]).mean() dfZeroMean = df.select(df[''features'']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

Entonces, podemos escribir una función pca principal de la siguiente manera:

from numpy.linalg import eigh def pca(df, k=2): """Computes the top `k` principal components, corresponding scores, and all eigenvalues. Note: All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns each eigenvectors as a column. This function should also return eigenvectors as columns. Args: df: A Spark dataframe with a ''features'' column, which (column) consists of DenseVectors. k (int): The number of principal components to return. Returns: tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of rows equals the length of the arrays in the input `RDD` and the number of columns equals `k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays of length `k`. Eigenvalues is an array of length d (the number of features). """ cov = estimateCovariance(df) col = cov.shape[1] eigVals, eigVecs = eigh(cov) inds = np.argsort(eigVals) eigVecs = eigVecs.T[inds[-1:-(col+1):-1]] components = eigVecs[0:k] eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvals score = df.select(df[''features'']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) ) # Return the `k` principal components, `k` scores, and all eigenvalues return components.T, score, eigVals

Prueba

Veamos primero los resultados con el método existente, usando los datos de ejemplo de la documentación de Spark ML PCA (modificándolos para que sean todos DenseVectors ):

from pyspark.ml.feature import * from pyspark.mllib.linalg import Vectors data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] df = sqlContext.createDataFrame(data,["features"]) pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features") model = pca_extracted.fit(df) model.transform(df).collect() [Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])), Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])), Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]

Entonces, con nuestro método:

comp, score, eigVals = pca(df) score.collect() [array([ 1.64857282, 4.0132827 ]), array([-4.64510433, 1.11679727]), array([-6.42888054, 5.33795143])]

Permítanme enfatizar que no usamos ningún método de collect() en las funciones que hemos definido, la score es un RDD , como debería ser.

Tenga en cuenta que los signos de nuestra segunda columna son todos opuestos a los derivados por el método existente; pero esto no es un problema: según la Introducción al aprendizaje estadístico (libremente descargable), co-autor de Hastie & Tibshirani, p. 382

Cada vector de carga de componente principal es único, hasta un cambio de signo. Esto significa que dos paquetes de software diferentes producirán los mismos vectores de carga del componente principal, aunque los signos de esos vectores de carga pueden diferir. Los signos pueden diferir porque cada vector de carga del componente principal especifica una dirección en el espacio p-dimensional: voltear el signo no tiene ningún efecto ya que la dirección no cambia. [...] Del mismo modo, los vectores de puntaje son únicos hasta una vuelta de signo, ya que la varianza de Z es la misma que la varianza de -Z.

Finalmente, ahora que tenemos los valores propios disponibles, es trivial escribir una función para el porcentaje de la varianza explicada:

def varianceExplained(df, k=1): """Calculate the fraction of variance explained by the top `k` eigenvectors. Args: df: A Spark dataframe with a ''features'' column, which (column) consists of DenseVectors. k: The number of principal components to consider. Returns: float: A number between 0 and 1 representing the percentage of variance explained by the top `k` eigenvectors. """ components, scores, eigenvalues = pca(df, k) return sum(eigenvalues[0:k])/sum(eigenvalues) varianceExplained(df,1) # 0.79439325322305299

Como prueba, también verificamos si la varianza explicada en nuestros datos de ejemplo es 1.0, para k = 5 (dado que los datos originales son 5 dimensiones):

varianceExplained(df,5) # 1.0

Esto debería hacer su trabajo de manera eficiente ; no dude en presentar cualquier aclaración que pueda necesitar.

[Desarrollado y probado con Spark 1.5.0 y 1.5.1]


EDITAR:

PCA y SVD finalmente están disponibles en pyspark starting spark 2.2.0 de acuerdo con este boleto resuelto de JIRA SPARK-6227 .

Respuesta original:

La respuesta dada por @desertnaut es realmente excelente desde una perspectiva teórica, pero quería presentar otro enfoque sobre cómo calcular la SVD y extraer los autovectores.

from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.mllib.linalg.distributed import RowMatrix class SVD(JavaModelWrapper): """Wrapper around the SVD scala case class""" @property def U(self): """ Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True.""" u = self.call("U") if u is not None: return RowMatrix(u) @property def s(self): """Returns a DenseVector with singular values in descending order.""" return self.call("s") @property def V(self): """ Returns a DenseMatrix whose columns are the right singular vectors of the SVD.""" return self.call("V")

Esto define nuestro objeto SVD. Ahora podemos definir nuestro método computeSVD utilizando Java Wrapper.

def computeSVD(row_matrix, k, computeU=False, rCond=1e-9): """ Computes the singular value decomposition of the RowMatrix. The given row matrix A of dimension (m X n) is decomposed into U * s * V''T where * s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order. * U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A'') * v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A'' X A) :param k: number of singular values to keep. We might return less than k if there are numerically zero singular values. :param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1 :param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value. :returns: SVD object """ java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond)) return SVD(java_model)

Ahora, apliquemos eso a un ejemplo:

from pyspark.ml.feature import * from pyspark.mllib.linalg import Vectors data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] df = sqlContext.createDataFrame(data,["features"]) pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features") model = pca_extracted.fit(df) features = model.transform(df) # this create a DataFrame with the regular features and pca_features # We can now extract the pca_features to prepare our RowMatrix. pca_features = features.select("pca_features").rdd.map(lambda row : row[0]) mat = RowMatrix(pca_features) # Once the RowMatrix is ready we can compute our Singular Value Decomposition svd = computeSVD(mat,2,True) svd.s # DenseVector([9.491, 4.6253]) svd.U.rows.collect() # [DenseVector([0.1129, -0.909]), DenseVector([0.463, 0.4055]), DenseVector([0.8792, -0.0968])] svd.V # DenseMatrix(2, 2, [-0.8025, -0.5967, -0.5967, 0.8025], 0)