tutorial spark read functions example español create scala apache-spark dataframe apache-spark-sql

scala - read - spark sql functions



¿Cómo dividir un marco de datos en marcos de datos con los mismos valores de columna? (2)

Es muy simple (si la versión de chispa es 2) si crea el marco de datos como una tabla temporal.

df1.createOrReplaceTempView("df1")

Y ahora puedes hacer las consultas,

var df2 = spark.sql("select * from df1 where state = ''FL''") var df3 = spark.sql("select * from df1 where state = ''MN''") var df4 = spark.sql("select * from df1 where state = ''AL''")

Ahora tienes el df2, df3, df4. Si desea tenerlos como lista, puede usar,

df2.collect() df3.collect()

o incluso mapa / función de filtro. Consulte https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

Ceniza

Usando Scala, ¿cómo puedo dividir dataFrame en múltiples dataFrame (ya sea matriz o colección) con el mismo valor de columna. Por ejemplo, quiero dividir el siguiente DataFrame:

ID Rate State 1 24 AL 2 35 MN 3 46 FL 4 34 AL 5 78 MN 6 99 FL

a:

conjunto de datos 1

ID Rate State 1 24 AL 4 34 AL

conjunto de datos 2

ID Rate State 2 35 MN 5 78 MN

conjunto de datos 3

ID Rate State 3 46 FL 6 99 FL


Puede recopilar valores de estado únicos y simplemente asignar sobre la matriz resultante:

val states = df.select("State").distinct.collect.flatMap(_.toSeq) val byStateArray = states.map(state => df.where($"State" <=> state))

o para mapear:

val byStateMap = states .map(state => (state -> df.where($"State" <=> state))) .toMap

Lo mismo en Python:

from itertools import chain from pyspark.sql.functions import col states = chain(*df.select("state").distinct().collect()) # PySpark 2.3 and later # In 2.2 and before col("state") == state) # should give the same outcome, ignoring NULLs # if NULLs are important # (lit(state).isNull() & col("state").isNull()) | (col("state") == state) df_by_state = {state: df.where(col("state").eqNullSafe(state)) for state in states}

El problema obvio aquí es que requiere un escaneo de datos completo para cada nivel, por lo que es una operación costosa. Si está buscando una manera de dividir la salida, consulte también ¿Cómo divido un RDD en dos o más RDD?

En particular, puede escribir un Dataset particionado por la columna de interés:

val path: String = ??? df.write.partitionBy("State").parquet(path)

y lea de nuevo si es necesario:

// Depend on partition prunning for { state <- states } yield spark.read.parquet(path).where($"State" === state) // or explicitly read the partition for { state <- states } yield spark.read.parquet(s"$path/State=$state")

Dependiendo del tamaño de los datos, el número de niveles de división, almacenamiento y nivel de persistencia de la entrada puede ser más rápido o más lento que los filtros múltiples.