apache-spark - read - spark sql java
Dividir la columna de cadena del marco de datos de Spark en varias columnas (3)
He visto a varias personas sugiriendo que
Dataframe.explode
es una forma útil de hacer esto, pero da como resultado más filas que el marco de datos original, que no es lo que quiero en absoluto.
Simplemente quiero hacer el equivalente de Dataframe de lo muy simple:
rdd.map(lambda row: row + [row.my_str_col.split(''-'')])
que toma algo parecido a:
col1 | my_str_col
-----+-----------
18 | 856-yygrm
201 | 777-psgdg
y lo convierte a esto:
col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
Soy consciente de
pyspark.sql.functions.split()
, pero da como resultado una columna de matriz anidada en lugar de dos columnas de nivel superior como quiero.
Idealmente, quiero que estas nuevas columnas también se nombren.
Aquí hay una solución para el caso general que no implica la necesidad de conocer la longitud de la matriz con anticipación, utilizando
collect
o
udf
s.
Desafortunadamente, esto solo funciona para la versión 2.1 y posteriores de
spark
, porque requiere la función
posexplode
.
Supongamos que tiene el siguiente DataFrame:
df = spark.createDataFrame(
[
[1, ''A, B, C, D''],
[2, ''E, F, G''],
[3, ''H, I''],
[4, ''J'']
]
, ["num", "letters"]
)
df.show()
#+---+----------+
#|num| letters|
#+---+----------+
#| 1|A, B, C, D|
#| 2| E, F, G|
#| 3| H, I|
#| 4| J|
#+---+----------+
Divida la columna de
letters
y luego use
posexplode
para explotar la matriz resultante junto con la posición en la matriz.
Luego use
pyspark.sql.functions.expr
para tomar el elemento en el índice
pos
en esta matriz.
import pyspark.sql.functions as f
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)/
.show()
#+---+------------+---+---+
#|num| letters|pos|val|
#+---+------------+---+---+
#| 1|[A, B, C, D]| 0| A|
#| 1|[A, B, C, D]| 1| B|
#| 1|[A, B, C, D]| 2| C|
#| 1|[A, B, C, D]| 3| D|
#| 2| [E, F, G]| 0| E|
#| 2| [E, F, G]| 1| F|
#| 2| [E, F, G]| 2| G|
#| 3| [H, I]| 0| H|
#| 3| [H, I]| 1| I|
#| 4| [J]| 0| J|
#+---+------------+---+---+
Ahora creamos dos nuevas columnas a partir de este resultado.
El primero es el nombre de nuestra nueva columna, que será una concatenación de
letter
y el índice en la matriz.
La segunda columna será el valor en el índice correspondiente en la matriz.
Obtenemos este último explotando la funcionalidad de
pyspark.sql.functions.expr
que nos permite
usar valores de columna como parámetros
.
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)/
.drop("val")/
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)/
.show()
#+---+-------+---+
#|num| name|val|
#+---+-------+---+
#| 1|letter0| A|
#| 1|letter1| B|
#| 1|letter2| C|
#| 1|letter3| D|
#| 2|letter0| E|
#| 2|letter1| F|
#| 2|letter2| G|
#| 3|letter0| H|
#| 3|letter1| I|
#| 4|letter0| J|
#+---+-------+---+
Ahora podemos simplemente
groupBy
the
num
y
pivot
el DataFrame.
Poniendo todo eso junto, obtenemos:
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)/
.drop("val")/
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)/
.groupBy("num").pivot("name").agg(f.first("val"))/
.show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#| 1| A| B| C| D|
#| 3| H| I| null| null|
#| 2| E| F| G| null|
#| 4| J| null| null| null|
#+---+-------+-------+-------+-------+
Encontré una solución para el caso desigual general (o cuando obtienes las columnas anidadas, obtenidas con la función .split ()):
import pyspark.sql.functions as f
@f.udf(StructType([StructField(col_3, StringType(), True),
StructField(col_4, StringType(), True)]))
def splitCols(array):
return array[0], ''''.join(array[1:len(array)])
df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), ''-'')))/
.select(df.columns+[''name.*''])
Básicamente, solo necesita seleccionar todas las columnas anteriores + las anidadas ''column_name. *'' Y las obtendrá como dos columnas de nivel superior en este caso.
pyspark.sql.functions.split()
es el enfoque correcto aquí: simplemente necesita aplanar la columna ArrayType anidada en varias columnas de nivel superior.
En este caso, donde cada matriz solo contiene 2 elementos, es muy fácil.
Simplemente use
Column.getItem()
para recuperar cada parte de la matriz como una columna:
split_col = pyspark.sql.functions.split(df[''my_str_col''], ''-'')
df = df.withColumn(''NAME1'', split_col.getItem(0))
df = df.withColumn(''NAME2'', split_col.getItem(1))
El resultado será:
col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
No estoy seguro de cómo resolvería esto en un caso general donde las matrices anidadas no tenían el mismo tamaño de fila a fila.