tutorial started spark getting scala apache-spark apache-spark-sql

scala - started - spark python



Acoplar DataFrame de forma automática y elegante en Spark SQL (7)

Todos,

¿Existe una forma elegante y aceptada de aplanar una tabla Spark SQL (parquet) con columnas que estén anidadas en StructType

Por ejemplo

Si mi esquema es:

foo |_bar |_baz x y z

¿Cómo lo selecciono en una forma tabular aplanada sin tener que recurrir a la ejecución manual?

df.select("foo.bar","foo.baz","x","y","z")

En otras palabras, ¿cómo StructType el resultado del código anterior programáticamente dado solo un StructType y un DataFrame


Aquí hay una función que está haciendo lo que usted quiere y que puede tratar con varias columnas anidadas que contienen columnas con el mismo nombre, con un prefijo:

from pyspark.sql import functions as F def flatten_df(nested_df): flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != ''struct''] nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == ''struct''] flat_df = nested_df.select(flat_cols + [F.col(nc+''.''+c).alias(nc+''_''+c) for nc in nested_cols for c in nested_df.select(nc+''.*'').columns]) return flat_df

Antes de:

root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true) |-- bar: struct (nullable = true) | |-- a: float (nullable = true) | |-- b: float (nullable = true) | |-- c: integer (nullable = true)

Después:

root |-- x: string (nullable = true) |-- y: string (nullable = true) |-- foo_a: float (nullable = true) |-- foo_b: float (nullable = true) |-- foo_c: integer (nullable = true) |-- bar_a: float (nullable = true) |-- bar_b: float (nullable = true) |-- bar_c: integer (nullable = true)


Estoy mejorando mi respuesta anterior y ofrezco una solución a mi propio problema expresado en los comentarios de la respuesta aceptada.

Esta solución aceptada crea una matriz de objetos de columna y la utiliza para seleccionar estas columnas. En Spark, si tiene un DataFrame anidado, puede seleccionar la columna secundaria como esta: df.select("Parent.Child") y esto devuelve un DataFrame con los valores de la columna secundaria y se denomina Hijo . Pero si tiene nombres idénticos para los atributos de diferentes estructuras padre, pierde la información sobre el padre y puede terminar con nombres de columna idénticos y ya no puede acceder a ellos por su nombre, ya que no son ambiguos.

Este fue mi problema

Encontré una solución a mi problema, quizás también pueda ayudar a alguien más. Llamé al flattenSchema separado:

val flattenedSchema = flattenSchema(df.schema)

y esto devolvió una matriz de objetos de columna. En lugar de usar esto en el select() , que devolvería un DataFrame con columnas nombradas por el hijo del último nivel, asigné los nombres de las columnas originales como cadenas, luego, después de seleccionar la columna Parent.Child , lo renombrará como Parent.Child lugar de Child (también reemplazé los puntos con guiones bajos para mi conveniencia):

val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))

Y luego puede usar la función de selección como se muestra en la respuesta original:

var newDf = df.select(renamedCols:_*)


He estado usando liners que resultan en un esquema aplanado con 5 columnas de barra, baz, x, y, z:

df.select("foo.*", "x", "y", "z")

En cuanto a explode : normalmente reservo explode para aplanar una lista. Por ejemplo, si tiene una columna idList que es una lista de cadenas, podría hacer:

df.withColumn("flattenedId", functions.explode(col("idList"))) .drop("idList")

Eso dará como resultado un nuevo Dataframe con una columna llamada flattenedId (ya no es una lista)


La respuesta corta es que no hay una forma "aceptada" de hacer esto, pero puede hacerlo de manera muy elegante con una función recursiva que genere su declaración select(...) recorriendo el DataFrame.schema .

La función recursiva debe devolver una Array[Column] . Cada vez que la función golpea un StructType , se llama a sí misma y agrega la Array[Column] devuelta a su propia Array[Column] .

Algo como:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) }

Entonces lo usarías así:

df.select(flattenSchema(df.schema):_*)


Solo quería compartir mi solución para Pyspark: es más o menos una traducción de la solución de @David Griffin, por lo que admite cualquier nivel de objetos anidados.

from pyspark.sql.types import StructType, ArrayType def flatten(schema, prefix=None): fields = [] for field in schema.fields: name = prefix + ''.'' + field.name if prefix else field.name dtype = field.dataType if isinstance(dtype, ArrayType): dtype = dtype.elementType if isinstance(dtype, StructType): fields += flatten(dtype, prefix=name) else: fields.append(name) return fields df.select(flatten(df.schema)).show()


También puede usar SQL para seleccionar columnas como planas.

  1. Obtener el esquema de marco de datos original
  2. Generar cadena SQL, navegando por el esquema
  3. Consulta tu marco de datos original

Hice una implementación en Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

(Use el método recursivo también, prefiero el modo SQL, para que pueda probarlo fácilmente a través de Spark-shell).


DataFrame#flattenSchema un método DataFrame#flattenSchema al proyecto de fuente abierta spark-daria .

Aquí es cómo puede utilizar la función con su código.

import com.github.mrpowers.spark.daria.sql.DataFrameExt._ df.flattenSchema().show() +-------+-------+---------+----+---+ |foo.bar|foo.baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+

También puede especificar diferentes delimitadores de nombre de columna con el método flattenSchema() .

df.flattenSchema(delimiter = "_").show() +-------+-------+---------+----+---+ |foo_bar|foo_baz| x| y| z| +-------+-------+---------+----+---+ | this| is|something|cool| ;)| +-------+-------+---------+----+---+

Este parámetro delimitador es sorprendentemente importante. Si está aplanando su esquema para cargar la tabla en Redshift, no podrá usar puntos como delimitador.

Aquí está el fragmento de código completo para generar esta salida.

val data = Seq( Row(Row("this", "is"), "something", "cool", ";)") ) val schema = StructType( Seq( StructField( "foo", StructType( Seq( StructField("bar", StringType, true), StructField("baz", StringType, true) ) ), true ), StructField("x", StringType, true), StructField("y", StringType, true), StructField("z", StringType, true) ) ) val df = spark.createDataFrame( spark.sparkContext.parallelize(data), StructType(schema) ) df.flattenSchema().show()

El código subyacente es similar al código de David Griffin (en caso de que no quiera agregar la dependencia de spark-daria a su proyecto).

object StructTypeHelpers { def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = { schema.fields.flatMap(structField => { val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name structField.dataType match { case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName) case _ => Array(col(codeColName).alias(colName)) } }) } } object DataFrameExt { implicit class DataFrameMethods(df: DataFrame) { def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = { df.select( StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _* ) } } }