spark org mllib kmeans examples example clustering scala apache-spark apache-spark-mllib

scala - org - spark apache 2018



Forma óptima de crear una tubería de ml en Apache Spark para el conjunto de datos con un gran número de columnas (2)

Estoy trabajando con Spark 2.1.1 en un conjunto de datos con ~ 2000 características y tratando de crear una tubería ML básica, que consta de algunos transformadores y un clasificador.

Supongamos por simplicidad que el Pipeline con el que estoy trabajando consiste en un VectorAssembler, un StringIndexer y un Classifier, que sería un uso común.

// Pipeline elements val assmbleFeatures: VectorAssembler = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("featuresRaw") val labelIndexer: StringIndexer = new StringIndexer() .setInputCol("TARGET") .setOutputCol("indexedLabel") // Train a RandomForest model. val rf: RandomForestClassifier = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("featuresRaw") .setMaxBins(30) // add the params, unique to this classifier val paramGrid = new ParamGridBuilder() .addGrid(rf.numTrees, Array(5)) .addGrid(rf.maxDepth, Array(5)) .build() // Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages. val evaluator = new BinaryClassificationEvaluator() .setMetricName("areaUnderROC") .setLabelCol("indexedLabel")

Si los pasos de la tubería están separados en una tubería de transformador (VectorAssembler + StringIndexer) y una segunda tubería de clasificador, y si las columnas innecesarias se dejan caer entre ambas tuberías, el entrenamiento tiene éxito. Esto significa que para reutilizar los modelos, se deben guardar dos PipelineModels después del entrenamiento y se debe introducir un paso de preprocesamiento intermedio.

// Split indexers and forest in two Pipelines. val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain) // Transform data and drop all columns, except those needed for training val dfTrainT = prePipeline.transform(dfTrain) val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col)) val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*) val mainPipeline = new Pipeline().setStages(Array(rf)) val cv = new CrossValidator() .setEstimator(mainPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

La solución (mucho) más limpia sería fusionar todas las etapas de tuberías en una tubería.

val pipeline = new Pipeline() .setStages(Array(labelIndexer, assmbleFeatures, rf)) val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // This will fail! val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

Sin embargo, poner todos PipelineStages en un solo oleoducto conduce a la siguiente excepción, probablemente debido al problema que finalmente resolverá this RP:

ERROR CodeGenerator: no se pudo compilar: org.codehaus.janino.JaninoRuntimeException: grupo constante para la clase org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection ha crecido más allá del límite JVM de 0xFFFF

La razón de esto es que VectorAssembler duplica de manera efectiva (en este ejemplo) la cantidad de datos en el DataFrame, ya que no hay ningún transformador que pueda eliminar las columnas innecesarias. (Ver el ensamblador de vectores de la tubería de chispa, soltar otras columnas )

Para el ejemplo funciona en el conjunto de datos golub y son necesarios los siguientes pasos de preprocesamiento:

import org.apache.spark.sql.types.DoubleType import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature._ import org.apache.spark.sql._ import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100) // Those steps are necessary, otherwise training would fail either way val colsToDrop = df.columns.take(5000) val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*) // Split df in train and test sets val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3)) // Feature columns are columns except "TARGET" val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

Como soy nuevo en Spark, no estoy seguro de cuál sería la mejor manera de resolver este problema. ¿Sugerirías ...

  1. para crear un nuevo transformador, que cae columnas y que se puede incorporar a la tubería?
  2. dividir ambas tuberías e introducir el paso intermedio
  3. ¿Algo más? :)

¿O me estoy perdiendo algo importante (pasos de tubería, relaciones públicas, etc.) que resolvería este problema?

Editar:

Implementé un nuevo Transformer DroppingVectorAssembler , que elimina columnas innecesarias, sin embargo, se lanza la misma excepción.

Además de eso, establecer spark.sql.codegen.wholeStage en false no resuelve el problema.


El error de janino que está obteniendo se debe a que, según el conjunto de características, el código generado se vuelve más grande.

Yo separaría los pasos en diferentes tuberías y StringIndexer las características innecesarias, StringIndexer los modelos intermedios como StringIndexer y OneHotEncoder y los cargaría durante la etapa de predicción, lo que también es útil porque las transformaciones serían más rápidas para los datos que se deben pronosticar.

Finalmente, no necesita mantener las columnas de características después de ejecutar la etapa VectorAssembler ya que transforma las características en un feature vector y una columna de label , y eso es todo lo que necesita para ejecutar las predicciones.

Ejemplo de canalización en Scala con guardado de pasos intermedios- (API de chispa anterior)

Además, si está utilizando una versión anterior de chispa como 1.6.0, debe verificar la versión parcheada, es decir, 2.1.1 o 2.2.0 o 1.6.4 o, de lo contrario, golpearía el error de Janino incluso con alrededor de 400 columnas de características.


El error de janino se debe al número de variables constantes creadas durante el proceso del optimizador. El límite máximo de variables constantes permitido en la JVM es ((2 ^ 16) -1). Si se excede este límite, obtiene el Constant pool for class ... has grown past JVM limit of 0xFFFF

La JIRA que solucionará este problema es SPARK-18016 , pero todavía está en progreso en este momento.

Es probable que su código falle durante la etapa VectorAssembler , cuando tiene que funcionar contra miles de columnas durante una única tarea de optimización.

La solución alternativa que desarrollé para este problema es crear un "vector de vectores" trabajando contra subconjuntos de las columnas y luego uniendo los resultados al final para crear un vector de característica singular. Esto evita que cualquier tarea de optimización individual exceda el límite constante de JVM. No es elegante, pero lo he usado en datasets que alcanzan el rango de 10k columnas.

Este método también le permite mantener una única canalización, aunque requiere algunos pasos adicionales para que funcione (creando los sub-vectores). Después de que haya creado el vector de características de los subvectores, puede soltar las columnas originales si lo desea.

Código de ejemplo:

// IMPORT DEPENDENCIES import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column} import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.{Pipeline, PipelineModel} // Create first example dataframe val exampleDF = spark.createDataFrame(Seq( (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5), (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8), (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6), (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5), (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3), (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4) )).toDF("uid", "col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "colA", "colB", "colC", "colD", "colE", "colF", "colG", "colH", "colI", "colJ", "colK") // Create multiple column lists using the sliding method val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray // Create a vector assembler for each column list val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec") val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec") val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec") val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec") // Create a vector assembler using column list vectors as input val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features") // Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler)) // Fit and transform the data val featuresDF = pipeline.fit(exampleDF).transform(exampleDF) // Get the number of features in "features" vector val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs")) // Print number of features in "features vector" print(featureLength)

(Nota: El método de creación de las listas de columnas realmente debería hacerse de manera programática, pero he mantenido este ejemplo simple por el bien de entender el concepto).