tutorial spark software que examples example scala apache-spark apache-spark-sql apache-spark-mllib

software - examples spark scala



Cómo crear el marco de datos correcto para la clasificación en Spark ML (3)

Estoy intentando ejecutar la clasificación aleatoria de bosques mediante el uso de la API Spark ML, pero estoy teniendo problemas para crear la entrada correcta de marcos de datos en la canalización.

Aquí hay datos de muestra:

age,hours_per_week,education,sex,salaryRange 38,40,"hs-grad","male","A" 28,40,"bachelors","female","A" 52,45,"hs-grad","male","B" 31,50,"masters","female","B" 42,40,"bachelors","male","B"

age y hours_per_week son enteros, mientras que otras funciones, incluida la etiqueta salarialRange, son categóricas (String)

Cargar este archivo csv (vamos a llamarlo sample.csv) se puede hacer mediante la biblioteca Spark csv de esta manera:

val data = sqlContext.csvFile("/home/dusan/sample.csv")

De manera predeterminada, todas las columnas se importan como cadenas, por lo que debemos cambiar "edad" y "horas_por_semanas" a Int:

val toInt = udf[Int, String]( _.toInt) val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))

Solo para comprobar cómo se ve el esquema ahora:

scala> dataFixed.printSchema root |-- age: integer (nullable = true) |-- hours_per_week: integer (nullable = true) |-- education: string (nullable = true) |-- sex: string (nullable = true) |-- salaryRange: string (nullable = true)

Luego, configuremos el validador cruzado y la tubería:

val rf = new RandomForestClassifier() val pipeline = new Pipeline().setStages(Array(rf)) val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)

Se muestra un error al ejecutar esta línea:

val cmModel = cv.fit(dataFixed)

java.lang.IllegalArgumentException: las "características" de campo no existen.

Es posible configurar la columna de etiquetas y la columna de características en RandomForestClassifier; sin embargo, tengo 4 columnas como predictores (características), no solo una.

¿Cómo debería organizar mi marco de datos para que tenga columnas de etiquetas y características organizadas correctamente?

Para su conveniencia, aquí está el código completo:

import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.tuning.CrossValidator import org.apache.spark.ml.Pipeline import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.mllib.linalg.{Vector, Vectors} object SampleClassification { def main(args: Array[String]): Unit = { //set spark context val conf = new SparkConf().setAppName("Simple Application").setMaster("local"); val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ import com.databricks.spark.csv._ //load data by using databricks "Spark CSV Library" val data = sqlContext.csvFile("/home/dusan/sample.csv") //by default all columns are imported as string so we need to change "age" and "hours_per_week" to Int val toInt = udf[Int, String]( _.toInt) val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week"))) val rf = new RandomForestClassifier() val pipeline = new Pipeline().setStages(Array(rf)) val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) // this fails with error //java.lang.IllegalArgumentException: Field "features" does not exist. val cmModel = cv.fit(dataFixed) } }

¡Gracias por la ayuda!


A partir de Spark 1.4, puede usar Transformer org.apache.spark.ml.feature.VectorAssembler . Simplemente proporcione los nombres de las columnas que desea que sean características.

val assembler = new VectorAssembler() .setInputCols(Array("col1", "col2", "col3")) .setOutputCol("features")

y agrégalo a tu canalización.


Según la documentación de chispa sobre mllib - árboles aleatorios, me parece que debe definir el mapa de características que está utilizando y los puntos deben ser un punto etiquetado.

Esto le dirá al algoritmo qué columna debe usarse como predicción y cuáles son las características.

https://spark.apache.org/docs/latest/mllib-decision-tree.html


Simplemente necesita asegurarse de tener una columna de "features" en su marco de datos que sea de tipo VectorUDF como se muestra a continuación:

scala> val df2 = dataFixed.withColumnRenamed("age", "features") df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string] scala> val cmModel = cv.fit(df2) java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType. at scala.Predef$.require(Predef.scala:233) at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50) at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164) at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142) at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)

EDIT1

Esencialmente, debe haber dos campos en las "características" de su marco de datos para el vector de características y "etiqueta" para las etiquetas de instancia. La instancia debe ser del tipo Double .

Para crear un campo de "características" con Tipo de Vector primero cree un udf como se muestra a continuación:

val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) => val e3 = c match { case "hs-grad" => 0 case "bachelors" => 1 case "masters" => 2 } val e4 = d match {case "male" => 0 case "female" => 1} Vectors.dense(a, b, e3, e4) }

Ahora, para codificar también el campo "etiqueta", cree otro udf como se muestra a continuación:

val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )

Ahora transformamos el marco de datos original usando estos dos udf :

val df = dataFixed.withColumn( "features", toVec4( dataFixed("age"), dataFixed("hours_per_week"), dataFixed("education"), dataFixed("sex") ) ).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")

Tenga en cuenta que puede haber columnas / campos adicionales presentes en el marco de datos, pero en este caso he seleccionado solo las features y la label :

scala> df.show() +-------------------+-----+ | features|label| +-------------------+-----+ |[38.0,40.0,0.0,0.0]| 0.0| |[28.0,40.0,1.0,1.0]| 0.0| |[52.0,45.0,0.0,0.0]| 1.0| |[31.0,50.0,2.0,1.0]| 1.0| |[42.0,40.0,1.0,0.0]| 1.0| +-------------------+-----+

Ahora depende de usted establecer los parámetros correctos para su algoritmo de aprendizaje para que funcione.