scala - withcolumn - ¿Cómo definir el esquema para el tipo personalizado en Spark SQL?
spark dataframe filter (1)
El siguiente código de ejemplo intenta colocar algunos objetos de caso en un marco de datos. El código incluye la definición de una jerarquía de objetos de caso y una clase de caso que utiliza este rasgo:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
Cuando ejecuto el código, desafortunadamente encuentro la siguiente excepción:
java.lang.UnsupportedOperationException: Schema for type Some is not supported
Preguntas
-
¿Existe la posibilidad de agregar o definir un esquema para ciertos tipos (aquí escriba
Some
)? -
¿Existe otro enfoque para representar este tipo de enumeraciones?
-
Traté de usar
Enumeration
directamente, pero también sin éxito. (vea abajo)
-
Traté de usar
Código de
Enumeration
:
object Some extends Enumeration {
type Some = Value
val AType, BType = Value
}
Gracias por adelantado. Espero que el mejor enfoque sea no usar cadenas en su lugar.
Spark 2.0.0+ :
UserDefinedType
se ha hecho privado en Spark 2.0.0 y, por ahora, no tiene un reemplazo
UserDefinedType
con el
Dataset
.
Ver: SPARK-14155 (Ocultar tipo definido por el usuario en Spark 2.0)
La mayoría de las veces el
Dataset
tipo estático puede servir como reemplazo. Hay un Jira
SPARK-7768
pendiente para hacer que la API UDT vuelva a ser pública con la versión 2.4 de destino.
Consulte también ¿Cómo almacenar objetos personalizados en el conjunto de datos?
Chispa <2.0.0
¿Existe la posibilidad de agregar o definir un esquema para ciertos tipos (aquí escriba Some)?
Supongo que la respuesta depende de cuánto necesites esto.
Parece que es posible crear un
UserDefinedType
pero requiere acceso a
DeveloperApi
y no es exactamente sencillo o está bien documentado.
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some
class SomeUDT extends UserDefinedType[Some] {
override def sqlType: DataType = IntegerType
override def serialize(obj: Any) = {
obj match {
case AType => 0
case BType => 1
}
}
override def deserialize(datum: Any): Some = {
datum match {
case 0 => AType
case 1 => BType
}
}
override def userClass: Class[Some] = classOf[Some]
}
Probablemente deberías anular
hashCode
y
equals
también.
Su contraparte de PySpark puede verse así:
from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType
class SomeUDT(UserDefinedType):
@classmethod
def sqlType(self):
return IntegerType()
@classmethod
def module(cls):
return cls.__module__
@classmethod
def scalaUDT(cls): # Required in Spark < 1.5
return ''net.zero323.enum.SomeUDT''
def serialize(self, obj):
return obj.value
def deserialize(self, datum):
return {x.value: x for x in Some}[datum]
@unique
class Some(Enum):
__UDT__ = SomeUDT()
AType = 0
BType = 1
En Spark <1.5 Python UDT requiere un UDT de Scala emparejado, pero parece que ya no es el caso en 1.5.
Para un UDT simple como puede usar tipos simples (por ejemplo,
IntegerType
lugar de
Struct
completo).