structtype read examples crear python apache-spark pyspark pyspark-sql

python - read - PySpark: calcular el máximo de fila del subconjunto de columnas y agregarlo a un dataframe existente



pyspark sql examples (1)

Me gustaría calcular el máximo de un subconjunto de columnas para cada fila y agregarlo como una nueva columna para el Dataframe existente.

Logré hacer esto de una manera muy incómoda:

def add_colmax(df,subset_columns,colnm): '''''' calculate the maximum of the selected "subset_columns" from dataframe df for each row, new column containing row wise maximum is added to dataframe df. df: dataframe. It must contain subset_columns as subset of columns colnm: Name of the new column containing row-wise maximum of subset_columns subset_columns: the subset of columns from w '''''' from pyspark.sql.functions import monotonicallyIncreasingId from pyspark.sql import Row def get_max_row_with_None(row): return float(np.max(row)) df_subset = df.select(subset_columns) rdd = df_subset.map( get_max_row_with_None) df_rowsum = rdd.map(Row(colnm)).toDF() df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId()) df = df.withColumn("id",monotonicallyIncreasingId()) df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id) return df

Esta función funciona como:

rdd1 = sc.parallelize([("foo", 1.0,3.0,None), ("bar", 2.0,2.0,-10), ("baz", 3.3,1.2,10.0)]) df1 = sqlContext.createDataFrame(rdd1, (''v1'', ''v2'',''v3'',''v4'')) df_new = add_colmax(df1,[''v2'',''v3'',''v4''],"rowsum") df_new.collect()

devoluciones:

[Row(v1=u''bar'', v2=2.0, v3=2.0, v4=-10, rowsum=2.0), Row(v1=u''baz'', v2=3.3, v3=1.2, v4=None, rowsum=3.3), Row(v1=u''foo'', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]

Creo que si pudiera usar las funciones definidas por el usuario con withColumn , esto se puede hacer mucho más simple. Pero no pude encontrar la forma de hacerlo. Por favor, avíseme si tiene una forma más sencilla de lograr esto. Estoy usando la chispa 1.6


Comencemos con un par de importaciones

from pyspark.sql.functions import col, lit, coalesce, greatest

A continuación, define menos infinito literal:

minf = lit(float("-inf"))

Asigne columnas y pase el resultado al greatest :

rowmax = greatest(*[coalesce(col(x), minf) for x in [''v2'',''v3'',''v4'']])

Finalmente con withColumn :

df1.withColumn("rowmax", rowmax)

con resultado:

+---+---+---+----+------+ | v1| v2| v3| v4|rowmax| +---+---+---+----+------+ |foo|1.0|3.0|null| 3.0| |bar|2.0|2.0| -10| 2.0| |baz|3.3|1.2|null| 3.3| +---+---+---+----+------+

Puede usar el mismo patrón con diferentes operaciones sabias en filas reemplazando minf con elemento neutral. Por ejemplo:

rowsum = sum([coalesce(col(x), lit(0)) for x in [''v2'',''v3'',''v4'']])

o:

from operator import mul from functools import reduce rowproduct = reduce( mul, [coalesce(col(x), lit(1)) for x in [''v2'',''v3'',''v4'']] )

Su propio código podría simplificarse significativamente con udf :

from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf def get_max_row_with_None_(*cols): return float(max(x for x in cols if x is not None)) get_max_row_with_None = udf(get_max_row_with_None_, DoubleType()) df1.withColumn("rowmax", get_max_row_with_None(''v2'',''v3'',''v4''))

Reemplace el minf con el lit(float("inf")) y el greatest con el least para obtener el valor más pequeño por fila.