topandas spark examples crear python apache-spark user-defined-functions pyspark spark-dataframe

examples - python spark dataframe



Apache Spark: asigne el resultado de UDF a varias columnas de marcos de datos (1)

Estoy usando pyspark, cargando un archivo csv grande en un marco de datos con spark-csv, y como paso de preprocesamiento necesito aplicar una variedad de operaciones a los datos disponibles en una de las columnas (que contiene una cadena json) . Eso devolverá los valores de X, cada uno de los cuales debe almacenarse en su propia columna separada.

Esa funcionalidad será implementada en un UDF. Sin embargo, no estoy seguro de cómo devolver una lista de valores de ese UDF y alimentarlos en columnas individuales. A continuación se muestra un ejemplo simple:

(...) from pyspark.sql.functions import udf def udf_test(n): return [n/2, n%2] test_udf=udf(udf_test) df.select(''amount'',''trans_date'').withColumn("test", test_udf("amount")).show(4)

Eso produce lo siguiente:

+------+----------+--------------------+ |amount|trans_date| test| +------+----------+--------------------+ | 28.0|2016-02-07| [14.0, 0.0]| | 31.01|2016-02-07|[15.5050001144409...| | 13.41|2016-02-04|[6.70499992370605...| | 307.7|2015-02-17|[153.850006103515...| | 22.09|2016-02-05|[11.0450000762939...| +------+----------+--------------------+ only showing top 5 rows

¿Cuál sería la mejor manera de almacenar los dos valores (en este ejemplo) devueltos por el udf en columnas separadas? En este momento se están escribiendo como cadenas:

df.select(''amount'',''trans_date'').withColumn("test", test_udf("amount")).printSchema() root |-- amount: float (nullable = true) |-- trans_date: string (nullable = true) |-- test: string (nullable = true)


No es posible crear varias columnas de nivel superior a partir de una sola llamada UDF, pero puede crear una nueva struct . Requiere un UDF con el tipo de returnType especificado:

from pyspark.sql.functions import udf from pyspark.sql.types import * schema = StructType([ StructField("foo", FloatType(), False), StructField("bar", FloatType(), False) ]) def udf_test(n): return (n / 2, n % 2) if n and n != 0.0 else (float(''nan''), float(''nan'')) test_udf = udf(udf_test, schema) df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"]) foobars = df.select(test_udf("y").alias("foobar")) foobars.printSchema() ## root ## |-- foobar: struct (nullable = true) ## | |-- foo: float (nullable = false) ## | |-- bar: float (nullable = false)

Además aplana el esquema con una simple select :

foobars.select("foobar.foo", "foobar.bar").show() ## +---+---+ ## |foo|bar| ## +---+---+ ## |1.0|0.0| ## |1.5|1.0| ## +---+---+

Vea también Derivar múltiples columnas de una sola columna en un Spark DataFrame