python - read - PySpark: calcular el máximo de fila del subconjunto de columnas y agregarlo a un dataframe existente
pyspark sql examples (1)
Me gustaría calcular el máximo de un subconjunto de columnas para cada fila y agregarlo como una nueva columna para el Dataframe
existente.
Logré hacer esto de una manera muy incómoda:
def add_colmax(df,subset_columns,colnm):
''''''
calculate the maximum of the selected "subset_columns" from dataframe df for each row,
new column containing row wise maximum is added to dataframe df.
df: dataframe. It must contain subset_columns as subset of columns
colnm: Name of the new column containing row-wise maximum of subset_columns
subset_columns: the subset of columns from w
''''''
from pyspark.sql.functions import monotonicallyIncreasingId
from pyspark.sql import Row
def get_max_row_with_None(row):
return float(np.max(row))
df_subset = df.select(subset_columns)
rdd = df_subset.map( get_max_row_with_None)
df_rowsum = rdd.map(Row(colnm)).toDF()
df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
df = df.withColumn("id",monotonicallyIncreasingId())
df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
return df
Esta función funciona como:
rdd1 = sc.parallelize([("foo", 1.0,3.0,None),
("bar", 2.0,2.0,-10),
("baz", 3.3,1.2,10.0)])
df1 = sqlContext.createDataFrame(rdd1, (''v1'', ''v2'',''v3'',''v4''))
df_new = add_colmax(df1,[''v2'',''v3'',''v4''],"rowsum")
df_new.collect()
devoluciones:
[Row(v1=u''bar'', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
Row(v1=u''baz'', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
Row(v1=u''foo'', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]
Creo que si pudiera usar las funciones definidas por el usuario con withColumn
, esto se puede hacer mucho más simple. Pero no pude encontrar la forma de hacerlo. Por favor, avíseme si tiene una forma más sencilla de lograr esto. Estoy usando la chispa 1.6
Comencemos con un par de importaciones
from pyspark.sql.functions import col, lit, coalesce, greatest
A continuación, define menos infinito literal:
minf = lit(float("-inf"))
Asigne columnas y pase el resultado al greatest
:
rowmax = greatest(*[coalesce(col(x), minf) for x in [''v2'',''v3'',''v4'']])
Finalmente con withColumn
:
df1.withColumn("rowmax", rowmax)
con resultado:
+---+---+---+----+------+
| v1| v2| v3| v4|rowmax|
+---+---+---+----+------+
|foo|1.0|3.0|null| 3.0|
|bar|2.0|2.0| -10| 2.0|
|baz|3.3|1.2|null| 3.3|
+---+---+---+----+------+
Puede usar el mismo patrón con diferentes operaciones sabias en filas reemplazando minf
con elemento neutral. Por ejemplo:
rowsum = sum([coalesce(col(x), lit(0)) for x in [''v2'',''v3'',''v4'']])
o:
from operator import mul
from functools import reduce
rowproduct = reduce(
mul,
[coalesce(col(x), lit(1)) for x in [''v2'',''v3'',''v4'']]
)
Su propio código podría simplificarse significativamente con udf
:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
def get_max_row_with_None_(*cols):
return float(max(x for x in cols if x is not None))
get_max_row_with_None = udf(get_max_row_with_None_, DoubleType())
df1.withColumn("rowmax", get_max_row_with_None(''v2'',''v3'',''v4''))
Reemplace el minf
con el lit(float("inf"))
y el greatest
con el least
para obtener el valor más pequeño por fila.