spark query manipulating dataframes data create crear python apache-spark pyspark spark-dataframe

python - query - Agregar la suma de la columna como nueva columna en el marco de datos de PySpark



pyspark dataframe to pandas (4)

Esto no era obvio. No veo una suma basada en filas de las columnas definidas en la API de marcos de datos de la chispa.

Versión 2

Esto se puede hacer de una manera bastante simple:

newdf = df.withColumn(''total'', sum(df[col] for col in df.columns))

df.columns es suministrado por pyspark como una lista de cadenas que dan todos los nombres de columna en el Spark Dataframe. Para una suma diferente, puede proporcionar cualquier otra lista de nombres de columna en su lugar.

No probé esto como mi primera solución porque no estaba seguro de cómo se comportaría. Pero funciona.

Versión 1

Esto es demasiado complicado, pero también funciona.

Puedes hacerlo:

  1. use df.columns para obtener una lista de los nombres de las columnas
  2. Usa esa lista de nombres para hacer una lista de las columnas.
  3. pase esa lista a algo que invoque la función de adición sobrecargada de la columna de una manera funcional de tipo plegable

Con python''s reduce , un cierto conocimiento de cómo funciona la sobrecarga del operador y el código pyspark para las columnas here convierte en:

def column_add(a,b): return a.__add__(b) newdf = df.withColumn(''total_col'', reduce(column_add, ( df[col] for col in df.columns ) ))

Tenga en cuenta que esto es una reducción de python, no una reducción de chispa RDD, y el término de paréntesis en el segundo parámetro para reducir requiere el paréntesis porque es una expresión generadora de listas.

Probado, funciona!

$ pyspark >>> df = sc.parallelize([{''a'': 1, ''b'':2, ''c'':3}, {''a'':8, ''b'':5, ''c'':6}, {''a'':3, ''b'':1, ''c'':0}]).toDF().cache() >>> df DataFrame[a: bigint, b: bigint, c: bigint] >>> df.columns [''a'', ''b'', ''c''] >>> def column_add(a,b): ... return a.__add__(b) ... >>> df.withColumn(''total'', reduce(column_add, ( df[col] for col in df.columns ) )).collect() [Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

Estoy usando PySpark y tengo un marco de datos Spark con un montón de columnas numéricas. Quiero agregar una columna que sea la suma de todas las demás columnas.

Supongamos que mi marco de datos tenía las columnas "a", "b" y "c". Sé que puedo hacerlo:

df.withColumn(''total_col'', df.a + df.b + df.c)

El problema es que no quiero escribir cada columna individualmente y agregarlas, especialmente si tengo muchas columnas. Quiero poder hacer esto automáticamente o especificando una lista de nombres de columna que quiero agregar. Hay otra manera de hacer esto?


La forma más directa de hacerlo es usar la función expr

from pyspark.sql.functions import * data = data.withColumn(''total'', expr("col1 + col2 + col3 + col4"))


La solución

newdf = df.withColumn(''total'', sum(df[col] for col in df.columns))

Publicado por @Paul trabaja. Sin embargo, estaba recibiendo el error, tantos otros como he visto,

TypeError: ''Column'' object is not callable

Después de algún tiempo encontré el problema (al menos en mi caso). El problema es que previamente importé algunas funciones de pyspark con la línea

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

por lo tanto, la línea importó el comando sum pyspark mientras que df.withColumn(''total'', sum(df[col] for col in df.columns)) se supone que usa la función de sum python normal.

Puede eliminar la referencia de la función pyspark con del sum .

De lo contrario en mi caso cambié la importación a

import pyspark.sql.functions as F

y luego hizo referencia a las funciones como F.sum .


Mi problema fue similar al anterior (un poco más complejo) ya que tuve que agregar sumas de columnas consecutivas como nuevas columnas en el marco de datos de PySpark. Este enfoque utiliza el código de la versión 1 de Paul anterior:

import pyspark from pyspark.sql import SparkSession import pandas as pd spark = SparkSession.builder.appName(''addColAsCumulativeSUM'').getOrCreate() df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)/ ,(6,1,-4),(0,2,-2),(6,4,1)/ ,(4,5,2),(5,-3,-5),(6,4,-1)]/ ,schema=[''x1'',''x2'',''x3'']) df.show() +---+---+---+ | x1| x2| x3| +---+---+---+ | 1| 2| 3| | 4| 5| 6| | 3| 2| 1| | 6| 1| -4| | 0| 2| -2| | 6| 4| 1| | 4| 5| 2| | 5| -3| -5| | 6| 4| -1| +---+---+---+ colnames=df.columns

Agregue nuevas columnas que sean sumas acumulativas (consecutivas):

for i in range(0,len(colnames)): colnameLst= colnames[0:i+1] colname = ''cm''+ str(i+1) df = df.withColumn(colname, sum(df[col] for col in colnameLst))

df.show ()

+---+---+---+---+---+---+ | x1| x2| x3|cm1|cm2|cm3| +---+---+---+---+---+---+ | 1| 2| 3| 1| 3| 6| | 4| 5| 6| 4| 9| 15| | 3| 2| 1| 3| 5| 6| | 6| 1| -4| 6| 7| 3| | 0| 2| -2| 0| 2| 0| | 6| 4| 1| 6| 10| 11| | 4| 5| 2| 4| 9| 11| | 5| -3| -5| 5| 2| -3| | 6| 4| -1| 6| 10| 9| +---+---+---+---+---+---+

Las columnas de ''suma acumulativa'' añadidas son las siguientes:

cm1 = x1 cm2 = x1 + x2 cm3 = x1 + x2 + x3