software - examples spark scala
Cómo crear el marco de datos correcto para la clasificación en Spark ML (3)
Estoy intentando ejecutar la clasificación aleatoria de bosques mediante el uso de la API Spark ML, pero estoy teniendo problemas para crear la entrada correcta de marcos de datos en la canalización.
Aquí hay datos de muestra:
age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
age y hours_per_week son enteros, mientras que otras funciones, incluida la etiqueta salarialRange, son categóricas (String)
Cargar este archivo csv (vamos a llamarlo sample.csv) se puede hacer mediante la biblioteca Spark csv de esta manera:
val data = sqlContext.csvFile("/home/dusan/sample.csv")
De manera predeterminada, todas las columnas se importan como cadenas, por lo que debemos cambiar "edad" y "horas_por_semanas" a Int:
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
Solo para comprobar cómo se ve el esquema ahora:
scala> dataFixed.printSchema
root
|-- age: integer (nullable = true)
|-- hours_per_week: integer (nullable = true)
|-- education: string (nullable = true)
|-- sex: string (nullable = true)
|-- salaryRange: string (nullable = true)
Luego, configuremos el validador cruzado y la tubería:
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
Se muestra un error al ejecutar esta línea:
val cmModel = cv.fit(dataFixed)
java.lang.IllegalArgumentException: las "características" de campo no existen.
Es posible configurar la columna de etiquetas y la columna de características en RandomForestClassifier; sin embargo, tengo 4 columnas como predictores (características), no solo una.
¿Cómo debería organizar mi marco de datos para que tenga columnas de etiquetas y características organizadas correctamente?
Para su conveniencia, aquí está el código completo:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object SampleClassification {
def main(args: Array[String]): Unit = {
//set spark context
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import com.databricks.spark.csv._
//load data by using databricks "Spark CSV Library"
val data = sqlContext.csvFile("/home/dusan/sample.csv")
//by default all columns are imported as string so we need to change "age" and "hours_per_week" to Int
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
// this fails with error
//java.lang.IllegalArgumentException: Field "features" does not exist.
val cmModel = cv.fit(dataFixed)
}
}
¡Gracias por la ayuda!
A partir de Spark 1.4, puede usar Transformer org.apache.spark.ml.feature.VectorAssembler . Simplemente proporcione los nombres de las columnas que desea que sean características.
val assembler = new VectorAssembler()
.setInputCols(Array("col1", "col2", "col3"))
.setOutputCol("features")
y agrégalo a tu canalización.
Según la documentación de chispa sobre mllib - árboles aleatorios, me parece que debe definir el mapa de características que está utilizando y los puntos deben ser un punto etiquetado.
Esto le dirá al algoritmo qué columna debe usarse como predicción y cuáles son las características.
https://spark.apache.org/docs/latest/mllib-decision-tree.html
Simplemente necesita asegurarse de tener una columna de "features"
en su marco de datos que sea de tipo VectorUDF
como se muestra a continuación:
scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]
scala> val cmModel = cv.fit(df2)
java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
EDIT1
Esencialmente, debe haber dos campos en las "características" de su marco de datos para el vector de características y "etiqueta" para las etiquetas de instancia. La instancia debe ser del tipo Double
.
Para crear un campo de "características" con Tipo de Vector
primero cree un udf
como se muestra a continuación:
val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>
val e3 = c match {
case "hs-grad" => 0
case "bachelors" => 1
case "masters" => 2
}
val e4 = d match {case "male" => 0 case "female" => 1}
Vectors.dense(a, b, e3, e4)
}
Ahora, para codificar también el campo "etiqueta", cree otro udf
como se muestra a continuación:
val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )
Ahora transformamos el marco de datos original usando estos dos udf
:
val df = dataFixed.withColumn(
"features",
toVec4(
dataFixed("age"),
dataFixed("hours_per_week"),
dataFixed("education"),
dataFixed("sex")
)
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")
Tenga en cuenta que puede haber columnas / campos adicionales presentes en el marco de datos, pero en este caso he seleccionado solo las features
y la label
:
scala> df.show()
+-------------------+-----+
| features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]| 0.0|
|[28.0,40.0,1.0,1.0]| 0.0|
|[52.0,45.0,0.0,0.0]| 1.0|
|[31.0,50.0,2.0,1.0]| 1.0|
|[42.0,40.0,1.0,0.0]| 1.0|
+-------------------+-----+
Ahora depende de usted establecer los parámetros correctos para su algoritmo de aprendizaje para que funcione.