python - Columna GroupBy y filas de filtro con valor máximo en Pyspark
apache-spark apache-spark-sql (2)
Estoy casi seguro de que esto se ha preguntado antes, pero una búsqueda a través de stackoverflow no respondió mi pregunta. No es un duplicado de [2] ya que quiero el valor máximo, no el elemento más frecuente. Soy nuevo en pyspark e intento hacer algo realmente simple: quiero agrupar por la columna "A" y luego solo mantener la fila de cada grupo que tiene el valor máximo en la columna "B". Me gusta esto:
df_cleaned = df.groupBy("A").agg(F.max("B"))
Desafortunadamente, esto arroja todas las demás columnas: df_cleaned solo contiene las columnas "A" y el valor máximo de B. ¿Cómo mantengo las filas? ("A B C"...)
Otro enfoque posible es aplicar unir el marco de datos consigo mismo especificando "leftsemi". Este tipo de unión incluye todas las columnas del marco de datos en el lado izquierdo y ninguna columna en el lado derecho.
Por ejemplo:
import pyspark.sql.functions as f
data = [
(''a'', 5, ''c''),
(''a'', 8, ''d''),
(''a'', 7, ''e''),
(''b'', 1, ''f''),
(''b'', 3, ''g'')
]
df = sqlContext.createDataFrame(data, ["A", "B", "C"])
df.show()
+---+---+---+
| A| B| C|
+---+---+---+
| a| 5| c|
| a| 8| d|
| a| 7| e|
| b| 1| f|
| b| 3| g|
+---+---+---+
El valor máximo de la columna B por columna A se puede seleccionar haciendo:
df.groupBy(''A'').agg(f.max(''B'')
+---+---+
| A| B|
+---+---+
| a| 8|
| b| 3|
+---+---+
Usando esta expresión como un lado derecho en una semiunión izquierda, y renombrando la columna obtenida
max(B)
nuevo a su nombre original
B
, podemos obtener el resultado necesario:
df.join(df.groupBy(''A'').agg(f.max(''B'').alias(''B'')),on=''B'',how=''leftsemi'').show()
+---+---+---+
| B| A| C|
+---+---+---+
| 3| b| g|
| 8| a| d|
+---+---+---+
El plan físico detrás de esta solución y el de la respuesta aceptada son diferentes y todavía no me queda claro cuál funcionará mejor en grandes marcos de datos.
Se puede obtener el mismo resultado utilizando la sintaxis de SQL SQL haciendo:
df.registerTempTable(''table'')
q = ''''''SELECT *
FROM table a LEFT SEMI
JOIN (
SELECT
A,
max(B) as max_B
FROM table
GROUP BY A
) t
ON a.A=t.A AND a.B=t.max_B
''''''
sqlContext.sql(q).show()
+---+---+---+
| A| B| C|
+---+---+---+
| b| 3| g|
| a| 8| d|
+---+---+---+
Puede hacer esto sin un
udf
usando una
Window
.
Considere el siguiente ejemplo:
import pyspark.sql.functions as f
data = [
(''a'', 5),
(''a'', 8),
(''a'', 7),
(''b'', 1),
(''b'', 3)
]
df = sqlCtx.createDataFrame(data, ["A", "B"])
df.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 5|
#| a| 8|
#| a| 7|
#| b| 1|
#| b| 3|
#+---+---+
Cree una
Window
para particionar por la columna
A
y úsela para calcular el máximo de cada grupo.
Luego filtre las filas de modo que el valor en la columna
B
sea igual al máximo.
from pyspark.sql import Window
w = Window.partitionBy(''A'')
df.withColumn(''maxB'', f.max(''B'').over(w))/
.where(f.col(''B'') == f.col(''maxB''))/
.drop(''maxB'')/
.show()
#+---+---+
#| A| B|
#+---+---+
#| a| 8|
#| b| 3|
#+---+---+
O equivalentemente usando
pyspark-sql
:
df.registerTempTable(''table'')
q = "SELECT A, B FROM (SELECT *, MAX(B) OVER (PARTITION BY A) AS maxB FROM table) M WHERE B = maxB"
sqlCtx.sql(q).show()
#+---+---+
#| A| B|
#+---+---+
#| b| 3|
#| a| 8|
#+---+---+