withcolumn spark scala apache-spark apache-spark-sql case-class

scala - withcolumn - ¿Cómo definir el esquema para el tipo personalizado en Spark SQL?



spark dataframe filter (1)

El siguiente código de ejemplo intenta colocar algunos objetos de caso en un marco de datos. El código incluye la definición de una jerarquía de objetos de caso y una clase de caso que utiliza este rasgo:

import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.SQLContext sealed trait Some case object AType extends Some case object BType extends Some case class Data( name : String, t: Some) object Example { def main(args: Array[String]) : Unit = { val conf = new SparkConf() .setAppName( "Example" ) .setMaster( "local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF() df.show() } }

Cuando ejecuto el código, desafortunadamente encuentro la siguiente excepción:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

Preguntas

  • ¿Existe la posibilidad de agregar o definir un esquema para ciertos tipos (aquí escriba Some )?
  • ¿Existe otro enfoque para representar este tipo de enumeraciones?
    • Traté de usar Enumeration directamente, pero también sin éxito. (vea abajo)

Código de Enumeration :

object Some extends Enumeration { type Some = Value val AType, BType = Value }

Gracias por adelantado. Espero que el mejor enfoque sea no usar cadenas en su lugar.


Spark 2.0.0+ :

UserDefinedType se ha hecho privado en Spark 2.0.0 y, por ahora, no tiene un reemplazo UserDefinedType con el Dataset .

Ver: SPARK-14155 (Ocultar tipo definido por el usuario en Spark 2.0)

La mayoría de las veces el Dataset tipo estático puede servir como reemplazo. Hay un Jira SPARK-7768 pendiente para hacer que la API UDT vuelva a ser pública con la versión 2.4 de destino.

Consulte también ¿Cómo almacenar objetos personalizados en el conjunto de datos?

Chispa <2.0.0

¿Existe la posibilidad de agregar o definir un esquema para ciertos tipos (aquí escriba Some)?

Supongo que la respuesta depende de cuánto necesites esto. Parece que es posible crear un UserDefinedType pero requiere acceso a DeveloperApi y no es exactamente sencillo o está bien documentado.

import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[SomeUDT]) sealed trait Some case object AType extends Some case object BType extends Some class SomeUDT extends UserDefinedType[Some] { override def sqlType: DataType = IntegerType override def serialize(obj: Any) = { obj match { case AType => 0 case BType => 1 } } override def deserialize(datum: Any): Some = { datum match { case 0 => AType case 1 => BType } } override def userClass: Class[Some] = classOf[Some] }

Probablemente deberías anular hashCode y equals también.

Su contraparte de PySpark puede verse así:

from enum import Enum, unique from pyspark.sql.types import UserDefinedType, IntegerType class SomeUDT(UserDefinedType): @classmethod def sqlType(self): return IntegerType() @classmethod def module(cls): return cls.__module__ @classmethod def scalaUDT(cls): # Required in Spark < 1.5 return ''net.zero323.enum.SomeUDT'' def serialize(self, obj): return obj.value def deserialize(self, datum): return {x.value: x for x in Some}[datum] @unique class Some(Enum): __UDT__ = SomeUDT() AType = 0 BType = 1

En Spark <1.5 Python UDT requiere un UDT de Scala emparejado, pero parece que ya no es el caso en 1.5.

Para un UDT simple como puede usar tipos simples (por ejemplo, IntegerType lugar de Struct completo).