scala - puede - Error de codificador al intentar asignar la fila del marco de datos a la fila actualizada
new data frame arcgis (2)
Cuando estoy tratando de hacer lo mismo en mi código como se menciona a continuación
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
He tomado la referencia anterior desde aquí: Scala: ¿Cómo puedo reemplazar el valor en Dataframs usando scala? Pero recibo un error de codificador como
No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Los tipos primitivos (Int, S tring, etc.) y los tipos de productos (clases de casos) se admiten al importar spark.im plicits._ El soporte para serializar otros tipos se agregará en futuras versiones.
Nota: ¡Estoy usando spark 2.0!
No hay nada inesperado aquí. Está intentando utilizar el código que se ha escrito con Spark 1.xy ya no es compatible con Spark 2.0:
-
en 1.x
DataFrame.map
es((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
-
en 2.x
Dataset[Row].map
es((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
Para ser sincero, tampoco tenía mucho sentido en 1.x.
Independientemente de la versión, simplemente puede usar la API
DataFrame
:
import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
Si realmente desea usar el
map
, debe usar un
Dataset
estáticamente tipado:
import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
case rec => rec
}
o al menos devolver un objeto que tendrá codificador implícito:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == "tesla") "S" else make, model)
}
Finalmente, si por alguna razón
completamente loca
realmente desea mapear sobre
Dataset[Row]
, debe proporcionar el codificador requerido:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
StructField("year", IntegerType),
StructField("make", StringType),
StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == "tesla" =>
Row(year, "S", model)
case row => row
} (encoder)
Para el escenario donde el esquema de trama de datos se conoce de antemano, la respuesta dada por @ zero323 es la solución
pero para un escenario con esquema dinámico / o pasar un marco de datos múltiple a una función genérica: el siguiente código nos ha funcionado al migrar desde 1.6.1 desde 2.2.0
import org.apache.spark.sql.Row
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
val data = df.rdd.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
})
Este código se ejecuta en ambas versiones de spark.
desventaja: la optimización proporcionada por la chispa en el marco de datos / conjuntos de datos api no se aplicará.