tutorial spark examples example apache-spark

apache-spark - examples - apache spark wikipedia



Cómo calcular los percentiles en Apache Spark (9)

Tengo un rdd de enteros (es decir, RDD[Int] ) y lo que me gustaría hacer es calcular los siguientes diez percentiles: [0th, 10th, 20th, ..., 90th, 100th] . ¿Cuál es la forma más eficiente de hacer eso?


¿Qué tal t-digest ?

https://github.com/tdunning/t-digest

Una nueva estructura de datos para la acumulación exacta en línea de estadísticas basadas en rango tales como cuantiles y medios recortados. El algoritmo t-digest también es muy compatible con los paralelos, por lo que es útil en aplicaciones de reducción de mapas y de transmisión paralela.

El algoritmo de construcción de digestión t usa una variante de clustering k-means unidimensional para generar una estructura de datos relacionada con el Q-digest. Esta estructura de datos t-digest se puede usar para estimar cuantiles o calcular otras estadísticas de rango. La ventaja del t-digest sobre el Q-digest es que el t-digest puede manejar valores de coma flotante mientras que el Q-digest está limitado a enteros. Con pequeños cambios, el t-digest puede manejar cualquier valor de cualquier conjunto ordenado que tenga algo parecido a una media. La precisión de las estimaciones de cuantiles producidas por los t-digests puede ser de órdenes de magnitud más precisas que las producidas por Q-digests a pesar del hecho de que los t-digests son más compactos cuando se almacenan en el disco.

En resumen, las características particularmente interesantes del t-digest son que

  • tiene resúmenes más pequeños que Q-digest
  • funciona tanto en dobles como enteros.
  • proporciona una precisión de una parte por millón para los cuantiles extremos y, por lo general, <1000 ppm de precisión para los cuantiles medios
  • es rápido
  • es muy simple
  • tiene una implementación de referencia que tiene> 90% de cobertura de prueba
  • se puede usar con map-reduce muy fácilmente porque los compendios pueden fusionarse

Debería ser bastante fácil usar la implementación Java de referencia de Spark.


Aquí está mi implementación de Python en Spark para calcular el percentil para un RDD que contiene valores de interés.

def percentile_threshold(ardd, percentile): assert percentile > 0 and percentile <= 100, "percentile should be larger then 0 and smaller or equal to 100" return ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) / .lookup(np.ceil(ardd.count() / 100 * percentile - 1))[0] # Now test it out import numpy as np randlist = range(1,10001) np.random.shuffle(randlist) ardd = sc.parallelize(randlist) print percentile_threshold(ardd,0.001) print percentile_threshold(ardd,1) print percentile_threshold(ardd,60.11) print percentile_threshold(ardd,99) print percentile_threshold(ardd,99.999) print percentile_threshold(ardd,100) # output: # 1 # 100 # 6011 # 9900 # 10000 # 10000

Por separado, definí la siguiente función para obtener el percentil 10 al 100.

def get_percentiles(rdd, stepsize=10): percentiles = [] rddcount100 = rdd.count() / 100 sortedrdd = ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) for p in range(0, 101, stepsize): if p == 0: pass # I am not aware of a formal definition of 0 percentile, # you can put a place holder like this if you want # percentiles.append(sortedrdd.lookup(0)[0] - 1) elif p == 100: percentiles.append(sortedrdd.lookup(np.ceil(rddcount100 * 100 - 1))[0]) else: pv = sortedrdd.lookup(np.ceil(rddcount100 * p) - 1)[0] percentiles.append(pv) return percentiles randlist = range(1,10001) np.random.shuffle(randlist) ardd = sc.parallelize(randlist) get_percentiles(ardd, 10) # [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]


Convierta su RDD en un RDD de doble, y luego use la .histogram(10) . Ver DoubleRDD ScalaDoc


Descubrí esta esencia

https://gist.github.com/felixcheung/92ae74bc349ea83a9e29

que contiene la siguiente función:

/** * compute percentile from an unsorted Spark RDD * @param data: input data set of Long integers * @param tile: percentile to compute (eg. 85 percentile) * @return value of input data at the specified percentile */ def computePercentile(data: RDD[Long], tile: Double): Double = { // NIST method; data to be sorted in ascending order val r = data.sortBy(x => x) val c = r.count() if (c == 1) r.first() else { val n = (tile / 100d) * (c + 1d) val k = math.floor(n).toLong val d = n - k if (k <= 0) r.first() else { val index = r.zipWithIndex().map(_.swap) val last = c if (k >= c) { index.lookup(last - 1).head } else { index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head) } } } }


En base a la respuesta dada en UDAF mediana en Spark / Scala , utilicé un UDAF para calcular los percentiles sobre las ventanas de chispa (chispa 2.1):

Primero, un UDAF genérico abstracto utilizado para otras agregaciones

import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer abstract class GenericUDAF extends UserDefinedAggregateFunction { def inputSchema: StructType = StructType(StructField("value", DoubleType) :: Nil) def bufferSchema: StructType = StructType( StructField("window_list", ArrayType(DoubleType, false)) :: Nil ) def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = new ArrayBuffer[Double]() } def update(buffer: MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = { var bufferVal = buffer.getAs[mutable.WrappedArray[Double]](0).toBuffer bufferVal+=input.getAs[Double](0) buffer(0) = bufferVal } def merge(buffer1: MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = { buffer1(0) = buffer1.getAs[ArrayBuffer[Double]](0) ++ buffer2.getAs[ArrayBuffer[Double]](0) } def dataType: DataType def evaluate(buffer: Row): Any }

Luego el Percentile UDAF personalizado para deciles:

import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class DecilesUDAF extends GenericUDAF { override def dataType: DataType = ArrayType(DoubleType, false) override def evaluate(buffer: Row): Any = { val sortedWindow = buffer.getAs[mutable.WrappedArray[Double]](0).sorted.toBuffer val windowSize = sortedWindow.size if (windowSize == 0) return null if (windowSize == 1) return (0 to 10).map(_ => sortedWindow.head).toArray (0 to 10).map(i => sortedWindow(Math.min(windowSize-1, i*windowSize/10))).toArray } }

El UDAF es luego instanciado y llamado a través de una ventana particionada y ordenada:

val deciles = new DecilesUDAF() df.withColumn("mt_deciles", deciles(col("mt")).over(myWindow))

A continuación, puede dividir la matriz resultante en múltiples columnas con getItem:

def splitToColumns(size: Int, splitCol:String)(df: DataFrame) = { (0 to size).foldLeft(df) { case (df_arg, i) => df_arg.withColumn("mt_decile_"+i, col(splitCol).getItem(i)) } } df.transform(splitToColumns(10, "mt_deciles" ))

El UDAF es más lento que las funciones de chispa nativas, pero siempre que cada bolsa agrupada o cada ventana sea relativamente pequeña y se ajuste a un solo ejecutor, debería estar bien. La principal ventaja es usar paralelismo de chispa. Con poco esfuerzo, este código podría extenderse a n-cuantiles.

Probé el código usando esta función:

def testDecilesUDAF = { val window = W.partitionBy("user") val deciles = new DecilesUDAF() val schema = StructType(StructField("mt", DoubleType) :: StructField("user", StringType) :: Nil) val rows1 = (1 to 20).map(i => Row(i.toDouble, "a")) val rows2 = (21 to 40).map(i => Row(i.toDouble, "b")) val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows1++rows2), schema) df.withColumn("deciles", deciles(col("mt")).over(window)) .transform(splitToColumns(10, "deciles" )) .drop("deciles") .show(100, truncate=false) }

Primeras 3 líneas de salida:

+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+ |mt |user|mt_decile_0|mt_decile_1|mt_decile_2|mt_decile_3|mt_decile_4|mt_decile_5|mt_decile_6|mt_decile_7|mt_decile_8|mt_decile_9|mt_decile_10| +----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+ |21.0|b |21.0 |23.0 |25.0 |27.0 |29.0 |31.0 |33.0 |35.0 |37.0 |39.0 |40.0 | |22.0|b |21.0 |23.0 |25.0 |27.0 |29.0 |31.0 |33.0 |35.0 |37.0 |39.0 |40.0 | |23.0|b |21.0 |23.0 |25.0 |27.0 |29.0 |31.0 |33.0 |35.0 |37.0 |39.0 |40.0 |


Otra forma alternativa puede ser usar top y last en RDD de double. Por ejemplo, val percentile_99th_value = scores.top ((count / 100) .toInt) .last

Este método es más adecuado para percentiles individuales.


Si N por ciento es pequeño como 10, 20%, entonces haré lo siguiente:

  1. Calcule el tamaño del conjunto de datos, rdd.count (), sáltelo tal vez ya lo sepa y tome como argumento.

  2. En lugar de ordenar todo el conjunto de datos, descubriré la parte superior (N) de cada partición. Para eso, tendría que averiguar N = qué es N% de rdd.count, luego ordenar las particiones y tomar la parte superior (N) de cada partición. Ahora tiene un conjunto de datos mucho más pequeño para ordenar.

3.rdd.sortPor

4.zipWithIndex

5.filter (índice <topN)


Si no le importa convertir su RDD a un DataFrame y usar un UDAF de Hive, puede usar percentile . Suponiendo que ha cargado HiveContext hiveContext en el alcance:

hiveContext.sql("SELECT percentile(x, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9)) FROM yourDataFrame")

Descubrí este UDAF de Hive en esta respuesta.


Usted puede :

  1. Ordene el conjunto de datos a través de rdd.sortBy ()
  2. Calcule el tamaño del conjunto de datos a través de rdd.count ()
  3. Zip con índice para facilitar la recuperación del percentil
  4. Recupere el percentil deseado a través de rdd.lookup (), p. Ej., Para el 10º percentil rdd.lookup (tamaño 0.1 *)

Para calcular la mediana y el percentil 99: getPercentiles (rdd, new double [] {0.5, 0.99}, size, numPartitions);

En Java 8:

public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) { double[] values = new double[percentiles.length]; JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions); JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap()); for (int i = 0; i < percentiles.length; i++) { double percentile = percentiles[i]; long id = (long) (rddSize * percentile); values[i] = indexed.lookup(id).get(0); } return values; }

Tenga en cuenta que esto requiere ordenar el conjunto de datos, O (n.log (n)) y puede ser costoso en grandes conjuntos de datos.

La otra respuesta que sugiere simplemente calcular un histograma no calcularía correctamente el percentil: aquí hay un contraejemplo: un conjunto de datos compuesto por 100 números, 99 números que son 0 y un número que es 1. Terminas con todos los 99 0 en el primer bin, y el 1 en el último bin, con 8 contenedores vacíos en el medio.