distinct apache-spark

distinct - Eficiente conde distintivo con Apache Spark



apache-spark (6)

100 millones de clientes hacen clic 100 mil millones de veces en las páginas de algunos sitios web (digamos 100 sitios web). Y la secuencia de clics está disponible para usted en un gran conjunto de datos.

Usando las abstracciones de Apache Spark, ¿cuál es la forma más eficiente de contar visitantes distintos por sitio web?


He tenido que hacer cosas similares, una cosa de eficiencia que puedes hacer (que no es realmente una chispa) es asignar tus identificaciones de vistor a listas de bytes en lugar de GUID Strings, puedes guardar 4x de espacio luego (como 2 Chars es codificación hexadecimal) de un solo byte, y un Char usa 2 bytes en una cadena).

// Inventing these custom types purely for this question - don''t do this in real life! type VistorID = List[Byte] type WebsiteID = Int val visitors: RDD[(WebsiteID, VisitorID)] = ??? visitors.distinct().mapValues(_ => 1).reduceByKey(_ + _)

Tenga en cuenta que también podría hacer:

visitors.distinct().map(_._1).countByValue()

pero esto no escala también.


Noté que la función distinta básica puede ser significativamente más rápida cuando la ejecuta en un RDD que al ejecutarla en una colección de DataFrame. Por ejemplo:

DataFrame df = sqlContext.load(...) df.distinct.count // 0.8 s df.rdd.distinct.count // 0.2 s


Si lo desea por página web, entonces visitors.distinct()... es ineficiente. Si hay muchos visitantes y muchas páginas web, entonces se distingue entre una gran cantidad de combinaciones (webpage, visitor) , que pueden abrumar la memoria.

Aquí hay otra forma:

visitors.groupByKey().map { case (webpage, visitor_iterable) => (webpage, visitor_iterable.toArray.distinct.length) }

Esto requiere que los visitantes de una sola página web quepan en la memoria, por lo que puede no ser el mejor en todos los casos.


Si los data son un RDD de pares (de sitio, visitante), entonces data.countApproxDistinctByKey(0.05) le dará un RDD de (sitio, recuento). El parámetro se puede reducir para obtener más precisión a cambio de un mayor procesamiento.



visitors.distinct().count() serían las formas obvias, con la primera manera en distinct puedes especificar el nivel de paralelismo y también ver una mejora en la velocidad. Si es posible configurar visitantes como una secuencia y usar D-streams, eso haría el recuento en tiempo real. Puede transmitir directamente desde un directorio y usar los mismos métodos que en el RDD como:

val file = ssc.textFileStream("...") file.distinct().count()

La última opción es usar def countApproxDistinct(relativeSD: Double = 0.05): Long sin embargo, esto se etiqueta como experimental, pero sería significativamente más rápido que count si relativeSD (std deviation) es mayor.

EDITAR: dado que desea el conteo por sitio web, puede reducir el ID del sitio web, esto se puede hacer de manera eficiente (con combinadores) ya que el recuento es total. Si tiene un RDD de tuplas de id. De usuario de nombre de sitio web, puede hacerlo. visitors.countDistinctByKey() o visitors.countApproxDistinctByKey() , una vez más, el aproximado es experimental. Para usar aproximadamente distintas teclas, necesitas un PairRDD

Nota interesante: si estás de acuerdo con las aproximaciones y quieres resultados rápidos, es posible que desees examinar blinkDB hecho por las mismas personas que los laboratorios de amplificadores de chispa.