sistema puede proyección new grave error encontro definicion data coordenadas continuar cambiar bloquear aplicación scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

scala - puede - Error de codificador al intentar asignar la fila del marco de datos a la fila actualizada



new data frame arcgis (2)

Cuando estoy tratando de hacer lo mismo en mi código como se menciona a continuación

dataframe.map(row => { val row1 = row.getAs[String](1) val make = if (row1.toLowerCase == "tesla") "S" else row1 Row(row(0),make,row(2)) })

He tomado la referencia anterior desde aquí: Scala: ¿Cómo puedo reemplazar el valor en Dataframs usando scala? Pero recibo un error de codificador como

No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Los tipos primitivos (Int, S tring, etc.) y los tipos de productos (clases de casos) se admiten al importar spark.im plicits._ El soporte para serializar otros tipos se agregará en futuras versiones.

Nota: ¡Estoy usando spark 2.0!


No hay nada inesperado aquí. Está intentando utilizar el código que se ha escrito con Spark 1.xy ya no es compatible con Spark 2.0:

  • en 1.x DataFrame.map es ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • en 2.x Dataset[Row].map es ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

Para ser sincero, tampoco tenía mucho sentido en 1.x. Independientemente de la versión, simplemente puede usar la API DataFrame :

import org.apache.spark.sql.functions.{when, lower} val df = Seq( (2012, "Tesla", "S"), (1997, "Ford", "E350"), (2015, "Chevy", "Volt") ).toDF("year", "make", "model") df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

Si realmente desea usar el map , debe usar un Dataset estáticamente tipado:

import spark.implicits._ case class Record(year: Int, make: String, model: String) df.as[Record].map { case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S") case rec => rec }

o al menos devolver un objeto que tendrá codificador implícito:

df.map { case Row(year: Int, make: String, model: String) => (year, if(make.toLowerCase == "tesla") "S" else make, model) }

Finalmente, si por alguna razón completamente loca realmente desea mapear sobre Dataset[Row] , debe proporcionar el codificador requerido:

import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.Row // Yup, it would be possible to reuse df.schema here val schema = StructType(Seq( StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType) )) val encoder = RowEncoder(schema) df.map { case Row(year, make: String, model) if make.toLowerCase == "tesla" => Row(year, "S", model) case row => row } (encoder)


Para el escenario donde el esquema de trama de datos se conoce de antemano, la respuesta dada por @ zero323 es la solución

pero para un escenario con esquema dinámico / o pasar un marco de datos múltiple a una función genérica: el siguiente código nos ha funcionado al migrar desde 1.6.1 desde 2.2.0

import org.apache.spark.sql.Row val df = Seq( (2012, "Tesla", "S"), (1997, "Ford", "E350"), (2015, "Chevy", "Volt") ).toDF("year", "make", "model") val data = df.rdd.map(row => { val row1 = row.getAs[String](1) val make = if (row1.toLowerCase == "tesla") "S" else row1 Row(row(0),make,row(2)) })

Este código se ejecuta en ambas versiones de spark.

desventaja: la optimización proporcionada por la chispa en el marco de datos / conjuntos de datos api no se aplicará.