tutorial spark software learn for data big scala apache-spark apache-spark-dataset apache-spark-encoders

software - spark scala tutorial



¿Por qué es "No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos" al crear un conjunto de datos de clase de caso personalizado? (3)

Aclararía con una respuesta a mi propia pregunta, que si el objetivo es definir un marco SparkData literal simple, en lugar de usar tuplas de Scala y conversión implícita, la ruta más simple es usar la API de Spark directamente así:

import org.apache.spark.sql._ import org.apache.spark.sql.types._ import scala.collection.JavaConverters._ val simpleSchema = StructType( StructField("a", StringType) :: StructField("b", IntegerType) :: StructField("c", IntegerType) :: StructField("d", IntegerType) :: StructField("e", IntegerType) :: Nil) val data = List( Row("001", 1, 0, 3, 4), Row("001", 3, 4, 1, 7), Row("001", null, 0, 6, 4), Row("003", 1, 4, 5, 7), Row("003", 5, 4, null, 2), Row("003", 4, null, 9, 2), Row("003", 2, 3, 0, 1) ) val df = spark.createDataFrame(data.asJava, simpleSchema)

Spark 2.0 (final) con Scala 2.11.8. El siguiente código súper simple produce el error de compilación Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession case class SimpleTuple(id: Int, desc: String) object DatasetTest { val dataList = List( SimpleTuple(5, "abc"), SimpleTuple(6, "bcd") ) def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder. master("local") .appName("example") .getOrCreate() val dataset = sparkSession.createDataset(dataList) } }


Los Datasets Spark requieren Encoders para el tipo de datos que está a punto de almacenarse. Para los tipos comunes (atómicos, tipos de productos) hay varios codificadores predefinidos disponibles, pero primero debe importarlos desde SparkSession.implicits para que funcione:

val sparkSession: SparkSession = ??? import sparkSession.implicits._ val dataset = sparkSession.createDataset(dataList)

Alternativamente, puede proporcionar directamente un explícito

import org.apache.spark.sql.{Encoder, Encoders} val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])

o implícito

implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple] val dataset = sparkSession.createDataset(dataList)

Encoder para el tipo almacenado.

Tenga en cuenta que los Enocders también proporcionan una serie de Encoders predefinidos para tipos atómicos, y los Encoders para los complejos, pueden derivarse con ExpressionEncoder .

Otras lecturas:

  • Para los objetos personalizados que no están cubiertos por los codificadores integrados, consulte ¿Cómo almacenar objetos personalizados en el conjunto de datos?
  • Para los objetos de Row , debe proporcionar el Encoder explícitamente como se muestra en el error del codificador al intentar asignar la fila del marco de datos a la fila actualizada

Para otros usuarios (el suyo es correcto), tenga en cuenta que también es importante que la case class se defina fuera del alcance del object . Entonces:

Falla:

object DatasetTest { case class SimpleTuple(id: Int, desc: String) val dataList = List( SimpleTuple(5, "abc"), SimpleTuple(6, "bcd") ) def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .master("local") .appName("example") .getOrCreate() val dataset = sparkSession.createDataset(dataList) } }

Agregue los implicits, todavía falla con el mismo error:

object DatasetTest { case class SimpleTuple(id: Int, desc: String) val dataList = List( SimpleTuple(5, "abc"), SimpleTuple(6, "bcd") ) def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .master("local") .appName("example") .getOrCreate() import sparkSession.implicits._ val dataset = sparkSession.createDataset(dataList) } }

Trabajos:

case class SimpleTuple(id: Int, desc: String) object DatasetTest { val dataList = List( SimpleTuple(5, "abc"), SimpleTuple(6, "bcd") ) def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder .master("local") .appName("example") .getOrCreate() import sparkSession.implicits._ val dataset = sparkSession.createDataset(dataList) } }

Aquí está el error relevante: https://issues.apache.org/jira/browse/SPARK-13540 , por lo que esperamos que se solucione en la próxima versión de Spark 2.

(Editar: Parece que esa corrección de errores está realmente en Spark 2.0.0 ... Así que no estoy seguro de por qué esto todavía falla).