tablas sumar recorrer promedio para leer funciones datos data columnas count apache-spark distinct dataframe apache-spark-sql

count - sumar - recorrer data frame pandas



Spark: Cómo traducir el conteo(distinto(valor)) en las API de Dataframe (2)

Estoy tratando de comparar diferentes maneras de agregar mis datos.

Estos son mis datos de entrada con 2 elementos (página, visitante):

(PAG1,V1) (PAG1,V1) (PAG2,V1) (PAG2,V2) (PAG2,V1) (PAG1,V1) (PAG1,V2) (PAG1,V1) (PAG1,V2) (PAG1,V1) (PAG2,V2) (PAG1,V3)

Trabajando con un comando SQL en Spark SQL con este código:

import sqlContext.implicits._ case class Log(page: String, visitor: String) val logs = data.map(p => Log(p._1,p._2)).toDF() logs.registerTempTable("logs") val sqlResult= sqlContext.sql( """select page ,count(distinct visitor) as visitor from logs group by page """) val result = sqlResult.map(x=>(x(0).toString,x(1).toString)) result.foreach(println)

Obtengo esta salida:

(PAG1,3) // PAG1 has been visited by 3 different visitors (PAG2,2) // PAG2 has been visited by 2 different visitors

Ahora, me gustaría obtener el mismo resultado utilizando Dataframes and thiers API, pero no puedo obtener el mismo resultado:

import sqlContext.implicits._ case class Log(page: String, visitor: String) val logs = data.map(p => Coppia(p._1,p._2)).toDF() val result = log.select("page","visitor").groupBy("page").count().distinct result.foreach(println)

De hecho, eso es lo que obtengo como resultado:

[PAG1,8] // just the simple page count for every page [PAG2,4]

Probablemente sea algo tonto, pero no puedo verlo ahora.

¡Gracias por adelantado!

FF


Lo que necesita es la función de agregación countDistinct :

import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Log(page: String, visitor: String) val logs = data.map(p => Coppia(p._1,p._2)) .toDF() val result = log.select("page","visitor") .groupBy(''page) .agg(''page, countDistinct(''visitor)) result.foreach(println)


Puede utilizar el comando groupBy de groupBy dos veces para hacerlo. Aquí, df1 es su entrada original.

val df2 = df1.groupBy($"page",$"visitor").agg(count($"visitor").as("count"))

Este comando produciría el siguiente resultado:

page visitor count ---- ------ ---- PAG2 V2 2 PAG1 V3 1 PAG1 V1 5 PAG1 V2 2 PAG2 V1 2

Luego usa el comando groupBy nuevamente para obtener el resultado final.

df2.groupBy($"page").agg(count($"visitor").as("count"))

Salida final:

page count ---- ---- PAG1 3 PAG2 2