spark sobre que preguntas mensajeria jabber instantanea historial grupos crear conversaciones como borrar bloquear sql apache-spark pyspark apache-spark-sql spark-dataframe

sql - que - preguntas sobre mensajeria instantanea



Encuentre la fila máxima por grupo en Spark DataFrame (2)

Creo que lo que podría estar buscando son funciones de ventana: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Aquí hay un ejemplo en Scala (no tengo un Spark Shell con Hive disponible en este momento, por lo que no pude probar el código, pero creo que debería funcionar):

case class MyRow(name: String, id_sa: String, id_sb: String) val myDF = sc.parallelize(Array( MyRow("n1", "a1", "b1"), MyRow("n2", "a1", "b2"), MyRow("n3", "a1", "b2"), MyRow("n1", "a2", "b2") )).toDF("name", "id_sa", "id_sb") import org.apache.spark.sql.expressions.Window val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc) myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

Probablemente haya formas más eficientes de lograr los mismos resultados con las funciones de Windows, pero espero que esto le indique la dirección correcta.

Estoy tratando de usar marcos de datos Spark en lugar de RDD, ya que parecen ser de más alto nivel que los RDD y tienden a producir un código más legible.

En un clúster de 14 nodos de Google Dataproc, tengo alrededor de 6 millones de nombres que son traducidos a ID por dos sistemas diferentes: sa y sb . Cada Row contiene name , id_sa e id_sb . Mi objetivo es producir una asignación de id_sa a id_sb modo que para cada id_sa , el id_sb correspondiente sea el id más frecuente entre todos los nombres adjuntos a id_sa .

Tratemos de aclarar con un ejemplo. Si tengo las siguientes filas:

[Row(name=''n1'', id_sa=''a1'', id_sb=''b1''), Row(name=''n2'', id_sa=''a1'', id_sb=''b2''), Row(name=''n3'', id_sa=''a1'', id_sb=''b2''), Row(name=''n4'', id_sa=''a2'', id_sb=''b2'')]

Mi objetivo es producir un mapeo de a1 a b2 . De hecho, los nombres asociados a a1 son n1 , n2 y n3 , que se asignan respectivamente a b1 , b2 y b2 , por lo que b2 es el mapeo más frecuente en los nombres asociados a a1 . Del mismo modo, a2 se asignará a b2 . Está bien suponer que siempre habrá un ganador: no es necesario romper los lazos.

Esperaba poder usar groupBy(df.id_sa) en mi marco de datos, pero no sé qué hacer a continuación. Esperaba una agregación que pudiera producir, al final, las siguientes filas:

[Row(id_sa=a1, max_id_sb=b2), Row(id_sa=a2, max_id_sb=b2)]

Pero tal vez estoy tratando de usar la herramienta incorrecta y debería volver a usar RDD.


Usando join (resultará en más de una fila en el grupo en caso de empate):

import pyspark.sql.functions as F from pyspark.sql.functions import count, col cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts") maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs") cnts.join(maxs, (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa")) ).select(col("cnts.id_sa"), col("cnts.id_sb"))

Uso de funciones de ventana (se eliminarán los lazos):

from pyspark.sql.functions import row_number from pyspark.sql.window import Window w = Window().partitionBy("id_sa").orderBy(col("cnt").desc()) (cnts .withColumn("rn", row_number().over(w)) .where(col("rn") == 1) .select("id_sa", "id_sb"))

Usando la struct ordenamiento:

from pyspark.sql.functions import struct (cnts .groupBy("id_sa") .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max")) .select(col("id_sa"), col("max.id_sb")))

Consulte también ¿Cómo seleccionar la primera fila de cada grupo?