structtype spark saveastable read python apache-spark dataframe pyspark apache-spark-sql

python - saveastable - Pyspark: divide varias columnas de matriz en filas



structtype pyspark (2)

Tengo un marco de datos que tiene una fila y varias columnas. Algunas de las columnas son valores únicos y otras son listas. Todas las columnas de la lista tienen la misma longitud. Quiero dividir cada columna de la lista en una fila separada, manteniendo cualquier columna que no sea de la lista tal como está.

Muestra DF:

from pyspark import Row from pyspark.sql import SQLContext from pyspark.sql.functions import explode sqlc = SQLContext(sc) df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d=''foo'')]) # +---+---------+---------+---+ # | a| b| c| d| # +---+---------+---------+---+ # | 1|[1, 2, 3]|[7, 8, 9]|foo| # +---+---------+---------+---+

Lo que quiero:

+---+---+----+------+ | a| b| c | d | +---+---+----+------+ | 1| 1| 7 | foo | | 1| 2| 8 | foo | | 1| 3| 9 | foo | +---+---+----+------+

Si solo tuviera una columna de lista, esto sería fácil simplemente haciendo una explode :

df_exploded = df.withColumn(''b'', explode(''b'')) # >>> df_exploded.show() # +---+---+---------+---+ # | a| b| c| d| # +---+---+---------+---+ # | 1| 1|[7, 8, 9]|foo| # | 1| 2|[7, 8, 9]|foo| # | 1| 3|[7, 8, 9]|foo| # +---+---+---------+---+

Sin embargo, si trato de explode también la columna c , termino con un marco de datos con una longitud del cuadrado de lo que quiero:

df_exploded_again = df_exploded.withColumn(''c'', explode(''c'')) # >>> df_exploded_again.show() # +---+---+---+---+ # | a| b| c| d| # +---+---+---+---+ # | 1| 1| 7|foo| # | 1| 1| 8|foo| # | 1| 1| 9|foo| # | 1| 2| 7|foo| # | 1| 2| 8|foo| # | 1| 2| 9|foo| # | 1| 3| 7|foo| # | 1| 3| 8|foo| # | 1| 3| 9|foo| # +---+---+---+---+

Lo que quiero es: para cada columna, tomar el enésimo elemento de la matriz en esa columna y agregarlo a una nueva fila. Intenté mapear y explotar en todas las columnas del marco de datos, pero tampoco parece funcionar:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()


flatMap usar flatMap , no map ya que desea hacer múltiples filas de salida de cada fila de entrada.

from pyspark.sql import Row def dualExplode(r): rowDict = r.asDict() bList = rowDict.pop(''b'') cList = rowDict.pop(''c'') for b,c in zip(bList, cList): newDict = dict(rowDict) newDict[''b''] = b newDict[''c''] = c yield Row(**newDict) df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))


Chispa> = 2.4

Puede reemplazar zip_ udf con la función arrays_zip

from pyspark.sql.functions import arrays_zip, col (df .withColumn("tmp", arrays_zip("b", "c")) .withColumn("tmp", explode("tmp")) .select("a", col("tmp.b"), col("tmp.c"), "d"))

Chispa <2.4

Con DataFrames y UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType from pyspark.sql.functions import col, udf, explode zip_ = udf( lambda x, y: list(zip(x, y)), ArrayType(StructType([ # Adjust types to reflect data types StructField("first", IntegerType()), StructField("second", IntegerType()) ])) ) (df .withColumn("tmp", zip_("b", "c")) # UDF output cannot be directly passed to explode .withColumn("tmp", explode("tmp")) .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

Con RDDs :

(df .rdd .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) .toDF(["a", "b", "c", "d"]))

Ambas soluciones son ineficientes debido a la sobrecarga de comunicación de Python. Si el tamaño de los datos es fijo, puede hacer algo como esto:

from functools import reduce from pyspark.sql import DataFrame # Length of array n = 3 # For legacy Python you''ll need a separate function # in place of method accessor reduce( DataFrame.unionAll, (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") for i in range(n)) ).toDF("a", "b", "c", "d")

o incluso:

from pyspark.sql.functions import array, struct # SQL level zip of arrays of known size # followed by explode tmp = explode(array(*[ struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) for i in range(n) ])) (df .withColumn("tmp", tmp) .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

Esto debería ser significativamente más rápido en comparación con UDF o RDD. Generalizado para admitir un número arbitrario de columnas:

# This uses keyword only arguments # If you use legacy Python you''ll have to change signature # Body of the function can stay the same def zip_and_explode(*colnames, n): return explode(array(*[ struct(*[col(c).getItem(i).alias(c) for c in colnames]) for i in range(n) ])) df.withColumn("tmp", zip_and_explode("b", "c", n=3))