spark gbt forest example classifier r scala apache-spark apache-spark-mllib

gbt - spark random forest regression



Ejecutar más de 3000 modelos de bosque aleatorio por grupo usando Spark MLlib Scala API (1)

Estoy intentando crear modelos de bosque aleatorios por grupo (School_ID, más de 3 mil) en un archivo csv de entrada de modelo grande utilizando la API de Spark Scala. Cada uno del grupo contiene aproximadamente 3000-4000 registros. Los recursos que tengo a disposición son 20-30 instancias aws m3.2xlarge.

En R, puedo construir modelos por grupo y guardarlos en una lista como esta-

library(dplyr);library(randomForest); Rf_model <- train %>% group_by(School_ID) %>% do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

La lista puede almacenarse en algún lugar y puedo llamarlos cuando necesito usarlos como a continuación:

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat")) load(file=paste0(Modelpath,"Rf_model.dat")) pred <- predict(Rf_model.school$school[school_index][[1]], newdata=test)

Me preguntaba cómo hacer eso en Spark, si necesito o no dividir los datos por grupo primero y cómo hacerlo de manera eficiente si es necesario.

Pude dividir el archivo por School_ID según el siguiente código, pero parece que crea un trabajo individual para subconjuntos para cada iteración y tarda mucho tiempo en terminar los trabajos. ¿Hay alguna manera de hacerlo en un solo pase?

model_input.cache() val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq) val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID)) for( i <- 0 to programs.length - 1 ){ bySchoolArray(i). write.format("com.databricks.spark.csv"). option("header", "true"). save("model_input_bySchool/model_input_"+ schools(i)) }

Fuente: ¿Cómo puedo dividir un marco de datos en marcos de datos con los mismos valores de columna en SCALA y SPARK?

Editar 8/24/2015 Estoy tratando de convertir mi marco de datos en un formato que sea aceptado por el modelo de bosque aleatorio. Estoy siguiendo las instrucciones en este hilo Cómo crear un marco de datos correcto para la clasificación en Spark ML

Básicamente, creo una nueva variable "etiqueta" y almaceno mi clase en Doble. Luego combino todas mis características usando la función VectorAssembler y transformo mis datos de entrada de la siguiente manera-

val assembler = new VectorAssembler(). setInputCols(Array("COL1", "COL2", "COL3")). setOutputCol("features") val model_input = assembler.transform(model_input_raw). select("SCHOOL_ID", "label", "features")

Mensaje de error parcial (hágamelo saber si necesita el mensaje de registro completo) -

scala.MatchError: StringType (de la clase org.apache.spark.sql.types.StringType $) en org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ 2.apply (VectorAssembler.scala: 57)

Esto se resuelve después de convertir todas las variables a tipos numéricos.

Editar 25/8/2015 El modelo ml no acepta la etiqueta que codifiqué manualmente, así que necesito usar StringIndexer para resolver el problema como se indica aquí . De acuerdo con la documentación oficial , la etiqueta más frecuente obtiene 0. Causa etiquetas inconsistentes en School_ID. Me preguntaba si hay una forma de crear las etiquetas sin restablecer el orden de los valores.

val indexer = new StringIndexer(). setInputCol("label_orig"). setOutputCol("label")

Cualquier sugerencia o dirección sería útil y puede plantear cualquier pregunta. ¡Gracias!


Como ya tienes un marco de datos separado para cada escuela, no hay mucho que hacer aquí. Desde sus marcos de datos, supongo que quiere usar ml.classification.RandomForestClassifier . Si es así, puedes probar algo como esto:

  1. Extrae la lógica de la tubería. Ajuste los parámetros y transformadores RandomForestClassifier según sus requisitos

    import org.apache.spark.sql.DataFrame import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.{Pipeline, PipelineModel} def trainModel(df: DataFrame): PipelineModel = { val rf = new RandomForestClassifier() val pipeline = new Pipeline().setStages(Array(rf)) pipeline.fit(df) }

  2. Entrene modelos en cada subconjunto

    val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))

  3. Guardar modelos

    import java.io._ def saveModel(name: String, model: PipelineModel) = { val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name")) oos.writeObject(model) oos.close } schools.zip(bySchoolArrayModels).foreach{ case (name, model) => saveModel(name, Model) }

  4. Opcional : dado que los subconjuntos individuales son bastante pequeños, puede intentar un enfoque similar al que he descrito aquí para enviar múltiples tareas al mismo tiempo.

  5. Si usa mllib.tree.model.RandomForestModel puede omitir 3. y usar model.save directamente. Dado que parece haber algunos problemas con la deserialización ( ¿Cómo deserializar el modelo de Pipeline en spark.ml? - por lo que puedo ver, funciona bien, pero es mejor evitarlo, supongo) podría ser un enfoque preferido.

Editar

De acuerdo con la documentación oficial :

VectorAssembler acepta los siguientes tipos de columna de entrada: todos los tipos numéricos, tipo booleano y tipo de vector.

Como el error indica que su columna es una String , debe transformarla primero, por ejemplo, utilizando StringIndexer .