sql - que - preguntas sobre mensajeria instantanea
Encuentre la fila máxima por grupo en Spark DataFrame (2)
Creo que lo que podría estar buscando son funciones de ventana: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
Aquí hay un ejemplo en Scala (no tengo un Spark Shell con Hive disponible en este momento, por lo que no pude probar el código, pero creo que debería funcionar):
case class MyRow(name: String, id_sa: String, id_sb: String)
val myDF = sc.parallelize(Array(
MyRow("n1", "a1", "b1"),
MyRow("n2", "a1", "b2"),
MyRow("n3", "a1", "b2"),
MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)
myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")
Probablemente haya formas más eficientes de lograr los mismos resultados con las funciones de Windows, pero espero que esto le indique la dirección correcta.
Estoy tratando de usar marcos de datos Spark en lugar de RDD, ya que parecen ser de más alto nivel que los RDD y tienden a producir un código más legible.
En un clúster de 14 nodos de Google Dataproc, tengo alrededor de 6 millones de nombres que son traducidos a ID por dos sistemas diferentes:
sa
y
sb
.
Cada
Row
contiene
name
,
id_sa
e
id_sb
.
Mi objetivo es producir una asignación de
id_sa
a
id_sb
modo que para cada
id_sa
, el
id_sb
correspondiente sea el id más frecuente entre todos los nombres adjuntos a
id_sa
.
Tratemos de aclarar con un ejemplo. Si tengo las siguientes filas:
[Row(name=''n1'', id_sa=''a1'', id_sb=''b1''),
Row(name=''n2'', id_sa=''a1'', id_sb=''b2''),
Row(name=''n3'', id_sa=''a1'', id_sb=''b2''),
Row(name=''n4'', id_sa=''a2'', id_sb=''b2'')]
Mi objetivo es producir un mapeo de
a1
a
b2
.
De hecho, los nombres asociados a
a1
son
n1
,
n2
y
n3
, que se asignan respectivamente a
b1
,
b2
y
b2
, por lo que
b2
es el mapeo más frecuente en los nombres asociados a
a1
.
Del mismo modo,
a2
se asignará a
b2
.
Está bien suponer que siempre habrá un ganador: no es necesario romper los lazos.
Esperaba poder usar
groupBy(df.id_sa)
en mi marco de datos, pero no sé qué hacer a continuación.
Esperaba una agregación que pudiera producir, al final, las siguientes filas:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
Pero tal vez estoy tratando de usar la herramienta incorrecta y debería volver a usar RDD.
Usando
join
(resultará en más de una fila en el grupo en caso de empate):
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")
cnts.join(maxs,
(col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))
Uso de funciones de ventana (se eliminarán los lazos):
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())
(cnts
.withColumn("rn", row_number().over(w))
.where(col("rn") == 1)
.select("id_sa", "id_sb"))
Usando la
struct
ordenamiento:
from pyspark.sql.functions import struct
(cnts
.groupBy("id_sa")
.agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
.select(col("id_sa"), col("max.id_sb")))
Consulte también ¿Cómo seleccionar la primera fila de cada grupo?