structtype spark read apache-spark pyspark apache-spark-sql spark-dataframe pyspark-sql

apache-spark - read - spark sql java



Dividir la columna de cadena del marco de datos de Spark en varias columnas (3)

He visto a varias personas sugiriendo que Dataframe.explode es una forma útil de hacer esto, pero da como resultado más filas que el marco de datos original, que no es lo que quiero en absoluto. Simplemente quiero hacer el equivalente de Dataframe de lo muy simple:

rdd.map(lambda row: row + [row.my_str_col.split(''-'')])

que toma algo parecido a:

col1 | my_str_col -----+----------- 18 | 856-yygrm 201 | 777-psgdg

y lo convierte a esto:

col1 | my_str_col | _col3 | _col4 -----+------------+-------+------ 18 | 856-yygrm | 856 | yygrm 201 | 777-psgdg | 777 | psgdg

Soy consciente de pyspark.sql.functions.split() , pero da como resultado una columna de matriz anidada en lugar de dos columnas de nivel superior como quiero.

Idealmente, quiero que estas nuevas columnas también se nombren.


Aquí hay una solución para el caso general que no implica la necesidad de conocer la longitud de la matriz con anticipación, utilizando collect o udf s. Desafortunadamente, esto solo funciona para la versión 2.1 y posteriores de spark , porque requiere la función posexplode .

Supongamos que tiene el siguiente DataFrame:

df = spark.createDataFrame( [ [1, ''A, B, C, D''], [2, ''E, F, G''], [3, ''H, I''], [4, ''J''] ] , ["num", "letters"] ) df.show() #+---+----------+ #|num| letters| #+---+----------+ #| 1|A, B, C, D| #| 2| E, F, G| #| 3| H, I| #| 4| J| #+---+----------+

Divida la columna de letters y luego use posexplode para explotar la matriz resultante junto con la posición en la matriz. Luego use pyspark.sql.functions.expr para tomar el elemento en el índice pos en esta matriz.

import pyspark.sql.functions as f df.select( "num", f.split("letters", ", ").alias("letters"), f.posexplode(f.split("letters", ", ")).alias("pos", "val") )/ .show() #+---+------------+---+---+ #|num| letters|pos|val| #+---+------------+---+---+ #| 1|[A, B, C, D]| 0| A| #| 1|[A, B, C, D]| 1| B| #| 1|[A, B, C, D]| 2| C| #| 1|[A, B, C, D]| 3| D| #| 2| [E, F, G]| 0| E| #| 2| [E, F, G]| 1| F| #| 2| [E, F, G]| 2| G| #| 3| [H, I]| 0| H| #| 3| [H, I]| 1| I| #| 4| [J]| 0| J| #+---+------------+---+---+

Ahora creamos dos nuevas columnas a partir de este resultado. El primero es el nombre de nuestra nueva columna, que será una concatenación de letter y el índice en la matriz. La segunda columna será el valor en el índice correspondiente en la matriz. Obtenemos este último explotando la funcionalidad de pyspark.sql.functions.expr que nos permite usar valores de columna como parámetros .

df.select( "num", f.split("letters", ", ").alias("letters"), f.posexplode(f.split("letters", ", ")).alias("pos", "val") )/ .drop("val")/ .select( "num", f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"), f.expr("letters[pos]").alias("val") )/ .show() #+---+-------+---+ #|num| name|val| #+---+-------+---+ #| 1|letter0| A| #| 1|letter1| B| #| 1|letter2| C| #| 1|letter3| D| #| 2|letter0| E| #| 2|letter1| F| #| 2|letter2| G| #| 3|letter0| H| #| 3|letter1| I| #| 4|letter0| J| #+---+-------+---+

Ahora podemos simplemente groupBy the num y pivot el DataFrame. Poniendo todo eso junto, obtenemos:

df.select( "num", f.split("letters", ", ").alias("letters"), f.posexplode(f.split("letters", ", ")).alias("pos", "val") )/ .drop("val")/ .select( "num", f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"), f.expr("letters[pos]").alias("val") )/ .groupBy("num").pivot("name").agg(f.first("val"))/ .show() #+---+-------+-------+-------+-------+ #|num|letter0|letter1|letter2|letter3| #+---+-------+-------+-------+-------+ #| 1| A| B| C| D| #| 3| H| I| null| null| #| 2| E| F| G| null| #| 4| J| null| null| null| #+---+-------+-------+-------+-------+


Encontré una solución para el caso desigual general (o cuando obtienes las columnas anidadas, obtenidas con la función .split ()):

import pyspark.sql.functions as f @f.udf(StructType([StructField(col_3, StringType(), True), StructField(col_4, StringType(), True)])) def splitCols(array): return array[0], ''''.join(array[1:len(array)]) df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), ''-'')))/ .select(df.columns+[''name.*''])

Básicamente, solo necesita seleccionar todas las columnas anteriores + las anidadas ''column_name. *'' Y las obtendrá como dos columnas de nivel superior en este caso.


pyspark.sql.functions.split() es el enfoque correcto aquí: simplemente necesita aplanar la columna ArrayType anidada en varias columnas de nivel superior. En este caso, donde cada matriz solo contiene 2 elementos, es muy fácil. Simplemente use Column.getItem() para recuperar cada parte de la matriz como una columna:

split_col = pyspark.sql.functions.split(df[''my_str_col''], ''-'') df = df.withColumn(''NAME1'', split_col.getItem(0)) df = df.withColumn(''NAME2'', split_col.getItem(1))

El resultado será:

col1 | my_str_col | NAME1 | NAME2 -----+------------+-------+------ 18 | 856-yygrm | 856 | yygrm 201 | 777-psgdg | 777 | psgdg

No estoy seguro de cómo resolvería esto en un caso general donde las matrices anidadas no tenían el mismo tamaño de fila a fila.