spark scala tutorial
¿Cómo crear un codificador personalizado en Spark 2.X Datasets? (3)
Los conjuntos de datos de chispas se alejan de las filas de los Encoder
para las primitivas / de Pojo. El motor Catalyst
usa un ExpressionEncoder
para convertir columnas en una expresión SQL. Sin embargo, no parece haber otras subclases de Encoder
disponibles para usar como plantilla para nuestras propias implementaciones.
Aquí hay un ejemplo de código que está contento en Spark 1.X / DataFrames que no se compila en el nuevo régimen:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
Obtenemos un error de compilación de
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
Entonces de alguna manera / en algún lugar debería haber un medio para
- Definir / implementar nuestro codificador personalizado.
- Aplíquelo cuando realice una asignación en el
DataFrame
(que ahora es un conjunto de datos de tipoRow
) - Registrar el codificador para su uso por otro código personalizado
Estoy buscando el código que realiza con éxito estos pasos.
¿Importaste los encoders implícitos?
importar spark.implicits._
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder
Importé spark.implicits._ Donde spark es SparkSession y resolvió el error y se importaron los codificadores personalizados.
Además, escribir un codificador personalizado es una salida que no he probado.
Solución de trabajo: - Cree SparkSession e importe lo siguiente
importar spark.implicits._
Por lo que sé, nada cambió realmente desde 1.6 y las soluciones descritas en ¿Cómo almacenar objetos personalizados en Dataset? Son las únicas opciones disponibles. Sin embargo, su código actual debería funcionar bien con los codificadores predeterminados para los tipos de productos.
Para obtener una idea de por qué su código funcionó en 1.x y es posible que no funcione en 2.0.0, tendrá que verificar las firmas. En 1.x, DataFrame.map
es un método que toma la función Row => T
y transforma RDD[Row]
en RDD[T]
.
En 2.0.0 DataFrame.map
tiene una función de tipo Row => T
, pero transforma Dataset[Row]
(también DataFrame
como DataFrame
) en Dataset[T]
tanto, T
requiere un Encoder
. Si desea obtener el comportamiento "antiguo", debe usar RDD
explícitamente:
df.rdd.map(row => ???)
Para el map
Dataset[Row]
vea Error de codificador al intentar asignar la fila del marco de datos a la fila actualizada