structtype read print crear dataframe pyspark

dataframe - read - cambio de nombre de columnas para agregados de cuadros de datos pyspark



pyspark read csv (3)

Aunque todavía prefiero la sintaxis de dplyr , este fragmento de código servirá:

import pyspark.sql.functions as sf df.groupBy("group")/ .agg(sf.sum(''money'').alias(''money''))/ .show(100)

Se vuelve detallado.

Estoy analizando algunos datos con cuadros de datos de pyspark, supongamos que tengo un marco de datos que estoy agregando:

df.groupBy("group")/ .agg({"money":"sum"})/ .show(100)

Esto me dará:

group SUM(money#2L) A 137461285853 B 172185566943 C 271179590646

La agregación funciona bien pero no me gusta el nuevo nombre de columna "SUM (dinero # 2L)". ¿Hay alguna manera de cambiar el nombre de esta columna en algo legible por humanos desde el método .agg ? Tal vez algo más similar a lo que uno haría en dplyr :

df %>% group_by(group) %>% summarise(sum_money = sum(money))


Hice una pequeña función auxiliar para esto que podría ayudar a algunas personas.

import re from functools import partial def rename_cols(agg_df, ignore_first_n=1): """changes the default spark aggregate names `avg(colname)` to something a bit more useful. Pass an aggregated dataframe and the number of aggregation columns to ignore. """ delimiters = "(", ")" split_pattern = ''|''.join(map(re.escape, delimiters)) splitter = partial(re.split, split_pattern) split_agg = lambda x: ''_''.join(splitter(x))[0:-ignore_first_n] renamed = map(split_agg, agg_df.columns[ignore_first_n:]) renamed = zip(agg_df.columns[ignore_first_n:], renamed) for old, new in renamed: agg_df = agg_df.withColumnRenamed(old, new) return agg_df

Un ejemplo:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks") .groupby("id") .agg({"rank": "mean", "*": "count", "rate": "mean", "price": "mean", "clicks": "mean", }) ) >>> gb.columns [''id'', ''avg(rate)'', ''count(1)'', ''avg(price)'', ''avg(rank)'', ''avg(clicks)''] >>> rename_cols(gb).columns [''id'', ''avg_rate'', ''count_1'', ''avg_price'', ''avg_rank'', ''avg_clicks'']

Haciendo al menos un poco para evitar que las personas tipeen tanto.


withColumnRenamed debería hacer el truco. Aquí está el enlace a la API pyspark.sql .

df.groupBy("group")/ .agg({"money":"sum"})/ .withColumnRenamed("SUM(money)", "money") .show(100)