tutorial spark scala apache-spark apache-spark-dataset apache-spark-encoders

spark scala tutorial



¿Cómo crear un codificador personalizado en Spark 2.X Datasets? (3)

Los conjuntos de datos de chispas se alejan de las filas de los Encoder para las primitivas / de Pojo. El motor Catalyst usa un ExpressionEncoder para convertir columnas en una expresión SQL. Sin embargo, no parece haber otras subclases de Encoder disponibles para usar como plantilla para nuestras propias implementaciones.

Aquí hay un ejemplo de código que está contento en Spark 1.X / DataFrames que no se compila en el nuevo régimen:

//mapping each row to RDD tuple df.map(row => { var id: String = if (!has_id) "" else row.getAs[String]("id") var label: String = row.getAs[String]("label") val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels") val height : Int = if (!has_height) 0 else row.getAs[Int]("height") val width : Int = if (!has_width) 0 else row.getAs[Int]("width") val data : Array[Byte] = row.getAs[Any]("data") match { case str: String => str.getBytes case arr: Array[Byte@unchecked] => arr case _ => { log.error("Unsupport value type") null } } (id, label, channels, height, width, data) }).persist(StorageLevel.DISK_ONLY)

}

Obtenemos un error de compilación de

Error:(56, 11) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. df.map(row => { ^

Entonces de alguna manera / en algún lugar debería haber un medio para

  • Definir / implementar nuestro codificador personalizado.
  • Aplíquelo cuando realice una asignación en el DataFrame (que ahora es un conjunto de datos de tipo Row )
  • Registrar el codificador para su uso por otro código personalizado

Estoy buscando el código que realiza con éxito estos pasos.



Importé spark.implicits._ Donde spark es SparkSession y resolvió el error y se importaron los codificadores personalizados.

Además, escribir un codificador personalizado es una salida que no he probado.

Solución de trabajo: - Cree SparkSession e importe lo siguiente

importar spark.implicits._


Por lo que sé, nada cambió realmente desde 1.6 y las soluciones descritas en ¿Cómo almacenar objetos personalizados en Dataset? Son las únicas opciones disponibles. Sin embargo, su código actual debería funcionar bien con los codificadores predeterminados para los tipos de productos.

Para obtener una idea de por qué su código funcionó en 1.x y es posible que no funcione en 2.0.0, tendrá que verificar las firmas. En 1.x, DataFrame.map es un método que toma la función Row => T y transforma RDD[Row] en RDD[T] .

En 2.0.0 DataFrame.map tiene una función de tipo Row => T , pero transforma Dataset[Row] (también DataFrame como DataFrame ) en Dataset[T] tanto, T requiere un Encoder . Si desea obtener el comportamiento "antiguo", debe usar RDD explícitamente:

df.rdd.map(row => ???)

Para el map Dataset[Row] vea Error de codificador al intentar asignar la fila del marco de datos a la fila actualizada