software - spark scala tutorial
¿Por qué es "No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos" al crear un conjunto de datos de clase de caso personalizado? (3)
Aclararía con una respuesta a mi propia pregunta, que si el objetivo es definir un marco SparkData literal simple, en lugar de usar tuplas de Scala y conversión implícita, la ruta más simple es usar la API de Spark directamente así:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val simpleSchema = StructType(
StructField("a", StringType) ::
StructField("b", IntegerType) ::
StructField("c", IntegerType) ::
StructField("d", IntegerType) ::
StructField("e", IntegerType) :: Nil)
val data = List(
Row("001", 1, 0, 3, 4),
Row("001", 3, 4, 1, 7),
Row("001", null, 0, 6, 4),
Row("003", 1, 4, 5, 7),
Row("003", 5, 4, null, 2),
Row("003", 4, null, 9, 2),
Row("003", 2, 3, 0, 1)
)
val df = spark.createDataFrame(data.asJava, simpleSchema)
Spark 2.0 (final) con Scala 2.11.8.
El siguiente código súper simple produce el error de compilación
Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
import org.apache.spark.sql.SparkSession
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Los
Datasets
Spark requieren
Encoders
para el tipo de datos que está a punto de almacenarse.
Para los tipos comunes (atómicos, tipos de productos) hay varios codificadores predefinidos disponibles, pero primero debe importarlos desde
SparkSession.implicits
para que funcione:
val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
Alternativamente, puede proporcionar directamente un explícito
import org.apache.spark.sql.{Encoder, Encoders}
val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
o implícito
implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)
Encoder
para el tipo almacenado.
Tenga en cuenta que los
Enocders
también proporcionan una serie de
Encoders
predefinidos para tipos atómicos, y los
Encoders
para los complejos, pueden derivarse con
ExpressionEncoder
.
Otras lecturas:
- Para los objetos personalizados que no están cubiertos por los codificadores integrados, consulte ¿Cómo almacenar objetos personalizados en el conjunto de datos?
-
Para los objetos de
Row
, debe proporcionar elEncoder
explícitamente como se muestra en el error del codificador al intentar asignar la fila del marco de datos a la fila actualizada
Para otros usuarios (el suyo es correcto), tenga en cuenta que también es importante que la
case class
se defina fuera del alcance del
object
.
Entonces:
Falla:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Agregue los implicits, todavía falla con el mismo error:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
Trabajos:
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
Aquí está el error relevante: https://issues.apache.org/jira/browse/SPARK-13540 , por lo que esperamos que se solucione en la próxima versión de Spark 2.
(Editar: Parece que esa corrección de errores está realmente en Spark 2.0.0 ... Así que no estoy seguro de por qué esto todavía falla).