structtype spark createdataframe create apache-spark apache-spark-sql

apache spark - createdataframe - Generar un Spark StructType/Schema a partir de una clase de caso



spark sql pyspark (4)

Si quisiera crear un StructType (es decir, un DataFrame.schema ) a partir de una case class , ¿hay alguna forma de hacerlo sin crear un DataFrame ? Puedo hacer fácilmente:

case class TestCase(id: Long) val schema = Seq[TestCase]().toDF.schema

Pero parece excesivo crear un DataFrame cuando todo lo que quiero es el esquema.

(Si tiene curiosidad, la razón detrás de la pregunta es que estoy definiendo una función UserDefinedAggregateFunction , y para hacerlo, anula un par de métodos que devuelven StructTypes y uso clases de casos).


En lugar de reproducir manualmente la lógica para crear el objeto Encoder implícito que se pasa a toDF , se puede usar eso directamente (o, más precisamente, implícitamente de la misma manera que toDF ):

// spark: SparkSession import spark.implicits._ implicitly[Encoder[MyCaseClass]].schema

Desafortunadamente, esto en realidad sufre el mismo problema que usar org.apache.spark.sql.catalyst o Encoders como en las otras respuestas: el rasgo Encoder es experimental.

¿Como funciona esto? El método toDF en Seq proviene de un DatasetHolder , que se crea a través del implícito localSeqToDatasetHolder que se importa a través de spark.implicits._ . Esa función se define como:

implicit def localSeqToDatasetHolder[T](s: Seq[T])(implicit arg0: Encoder[T]): DatasetHolder[T]

Como puede ver, se necesita un argumento implicit Encoder[T] , que, para una case class , se puede calcular a través de newProductEncoder (también importado a través de spark.implicits._ ). Podemos reproducir esta lógica implícita para obtener un Encoder para nuestra clase de caso, a través de la conveniencia scala.Predef.implicitly (en el alcance por defecto, porque es de Predef ) que solo devolverá su argumento implícito solicitado:

def implicitly[T](implicit e: T): T


Puede hacerlo de la misma manera que lo hace SQLContext.createDataFrame :

import org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[TestCase].dataType.asInstanceOf[StructType]


Sé que esta pregunta tiene casi un año de antigüedad, pero la encontré y pensé que otros que también querrían saber que acabo de aprender a usar este enfoque:

import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema


en caso de que alguien quiera hacer esto para un bean Java personalizado:

ExpressionEncoder.javaBean(Event.class).schema().json()