python - saveastable - Pyspark: divide varias columnas de matriz en filas
structtype pyspark (2)
Tengo un marco de datos que tiene una fila y varias columnas. Algunas de las columnas son valores únicos y otras son listas. Todas las columnas de la lista tienen la misma longitud. Quiero dividir cada columna de la lista en una fila separada, manteniendo cualquier columna que no sea de la lista tal como está.
Muestra DF:
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
sqlc = SQLContext(sc)
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d=''foo'')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
Lo que quiero:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
Si solo tuviera una columna de lista, esto sería fácil simplemente haciendo una
explode
:
df_exploded = df.withColumn(''b'', explode(''b''))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+
Sin embargo, si trato de
explode
también la columna
c
, termino con un marco de datos con una longitud del cuadrado de lo que quiero:
df_exploded_again = df_exploded.withColumn(''c'', explode(''c''))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+
Lo que quiero es: para cada columna, tomar el enésimo elemento de la matriz en esa columna y agregarlo a una nueva fila. Intenté mapear y explotar en todas las columnas del marco de datos, pero tampoco parece funcionar:
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
flatMap
usar
flatMap
, no
map
ya que desea hacer múltiples filas de salida de cada fila de entrada.
from pyspark.sql import Row
def dualExplode(r):
rowDict = r.asDict()
bList = rowDict.pop(''b'')
cList = rowDict.pop(''c'')
for b,c in zip(bList, cList):
newDict = dict(rowDict)
newDict[''b''] = b
newDict[''c''] = c
yield Row(**newDict)
df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))
Chispa> = 2.4
Puede reemplazar
zip_
udf
con la función
arrays_zip
from pyspark.sql.functions import arrays_zip, col
(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))
Chispa <2.4
Con
DataFrames
y UDF:
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
Con
RDDs
:
(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))
Ambas soluciones son ineficientes debido a la sobrecarga de comunicación de Python. Si el tamaño de los datos es fijo, puede hacer algo como esto:
from functools import reduce
from pyspark.sql import DataFrame
# Length of array
n = 3
# For legacy Python you''ll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")
o incluso:
from pyspark.sql.functions import array, struct
# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))
(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
Esto debería ser significativamente más rápido en comparación con UDF o RDD. Generalizado para admitir un número arbitrario de columnas:
# This uses keyword only arguments
# If you use legacy Python you''ll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))
df.withColumn("tmp", zip_and_explode("b", "c", n=3))