varianza truncada probabilidad ponderada intervalos estadistica entre cuartiles correlacion confianza calcular python apache-spark median rdd pyspark

python - truncada - Cómo encontrar medianas y cuantiles usando Spark



probabilidad y estadistica en python (4)

Spark 2.0+:

Puede usar el método approxQuantile que implementa el algoritmo Greenwald-Khanna :

Python :

df.approxQuantile("x", [0.5], 0.25)

Scala :

df.stat.approxQuantile("x", Array(0.5), 0.25)

donde el último parámetro es un error relativo. Cuanto menor sea el número, los resultados más precisos y el cálculo más costoso.

Desde Spark 2.2 ( SPARK-14352 ) admite la estimación en múltiples columnas:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

y

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)

Chispa <2.0

Pitón

Como he mencionado en los comentarios, lo más probable es que no valga la pena. Si los datos son relativamente pequeños como en su caso, simplemente recopile y calcule la mediana localmente:

import numpy as np np.random.seed(323) rdd = sc.parallelize(np.random.randint(1000000, size=700000)) %time np.median(rdd.collect()) np.array(rdd.collect()).nbytes

Toma alrededor de 0.01 segundos en mi computadora de pocos años y alrededor de 5.5MB de memoria.

Si los datos son mucho más grandes, la clasificación será un factor limitante, por lo que en lugar de obtener un valor exacto, probablemente sea mejor muestrear, recopilar y calcular localmente. Pero si realmente quieres usar Spark, algo como esto debería hacer el truco (si no estropeé nada):

from numpy import floor import time def quantile(rdd, p, sample=None, seed=None): """Compute a quantile of order p ∈ [0, 1] :rdd a numeric rdd :p quantile(between 0 and 1) :sample fraction of and rdd to use. If not provided we use a whole dataset :seed random number generator seed to be used with sample """ assert 0 <= p <= 1 assert sample is None or 0 < sample <= 1 seed = seed if seed is not None else time.time() rdd = rdd if sample is None else rdd.sample(False, sample, seed) rddSortedWithIndex = (rdd. sortBy(lambda x: x). zipWithIndex(). map(lambda (x, i): (i, x)). cache()) n = rddSortedWithIndex.count() h = (n - 1) * p rddX, rddXPlusOne = ( rddSortedWithIndex.lookup(x)[0] for x in int(floor(h)) + np.array([0L, 1L])) return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

Y algunas pruebas:

np.median(rdd.collect()), quantile(rdd, 0.5) ## (500184.5, 500184.5) np.percentile(rdd.collect(), 25), quantile(rdd, 0.25) ## (250506.75, 250506.75) np.percentile(rdd.collect(), 75), quantile(rdd, 0.75) (750069.25, 750069.25)

Finalmente, definamos la mediana:

from functools import partial median = partial(quantile, p=0.5)

Hasta ahora todo bien, pero tarda 4.66 s en modo local sin ninguna comunicación de red. Probablemente hay una forma de mejorar esto, pero ¿por qué molestarse?

Idioma independiente ( Hive UDAF ):

Si usa HiveContext también puede usar Hive UDAF. Con valores integrales:

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df") sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

Con valores continuos:

sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

En percentile_approx puede pasar un argumento adicional que determina una cantidad de registros para usar.

¿Cómo puedo encontrar la mediana de un RDD de enteros usando un método distribuido, IPython y Spark? El RDD tiene aproximadamente 700,000 elementos y, por lo tanto, es demasiado grande para recolectar y encontrar la mediana.

Esta pregunta es similar a esta pregunta. Sin embargo, la respuesta a la pregunta es usar Scala, que no sé.

¿Cómo puedo calcular la mediana exacta con Apache Spark?

Usando el pensamiento para la respuesta de Scala, estoy tratando de escribir una respuesta similar en Python.

Sé que primero quiero ordenar el RDD . No se como. Veo el sortBy ( sortBy este RDD por el keyfunc dado) y sortByKey ( sortByKey este RDD , que se supone que consiste en pares (clave, valor)). Creo que ambos usan el valor clave y mi RDD solo tiene elementos enteros.

  1. Primero, estaba pensando en hacer myrdd.sortBy(lambda x: x) ?
  2. A continuación, encontraré la longitud de rdd ( rdd.count() ).
  3. Finalmente, quiero encontrar el elemento o 2 elementos en el centro del rdd. Necesito ayuda con este método también.

EDITAR:

Tuve una idea. Tal vez pueda indexar mi RDD y luego clave = índice y valor = elemento. ¿Y luego puedo intentar ordenar por valor? No sé si esto es posible porque solo hay un método sortByKey .


Agregar una solución si solo desea un método RDD y no desea pasar a DF. Este fragmento puede obtener un percentil para un RDD del doble.

Si ingresa el percentil como 50, debe obtener su mediana requerida. Avíseme si hay casos de esquina que no se tienen en cuenta.

/** * Gets the nth percentile entry for an RDD of doubles * * @param inputScore : Input scores consisting of a RDD of doubles * @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23. * It prefers the higher value when the desired quantile lies between two data points * @return : The number best representing the percentile in the Rdd of double */ def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = { val numEntries = inputScore.count().toDouble val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt inputScore .sortBy { case (score) => score } .zipWithIndex() .filter { case (score, index) => index == retrievedEntry } .map { case (score, index) => score } .collect()(0) }


Aquí está el método que utilicé usando funciones de ventana (con pyspark 2.2.0).

from pyspark.sql import DataFrame class median(): """ Create median class with over method to pass partition """ def __init__(self, df, col, name): assert col self.column=col self.df = df self.name = name def over(self, window): from pyspark.sql.functions import percent_rank, pow, first first_window = window.orderBy(self.column) # first, order by column we want to compute the median for df = self.df.withColumn("percent_rank", percent_rank().over(first_window)) # add percent_rank column, percent_rank = 0.5 coressponds to median second_window = window.orderBy(pow(df.percent_rank-0.5, 2)) # order by (percent_rank - 0.5)^2 ascending return df.withColumn(self.name, first(self.column).over(second_window)) # the first row of the window corresponds to median def addMedian(self, col, median_name): """ Method to be added to spark native DataFrame class """ return median(self, col, median_name) # Add method to DataFrame class DataFrame.addMedian = addMedian

Luego llame al método addMedian para calcular la mediana de col2:

from pyspark.sql import Window median_window = Window.partitionBy("col1") df = df.addMedian("col2", "median").over(median_window)

Finalmente puede agrupar por si es necesario.

df.groupby("col1", "median")


He escrito la función que toma el marco de datos como entrada y devuelve un marco de datos que tiene una mediana como salida sobre una partición y order_col es la columna para la que queremos calcular la mediana para part_col es el nivel en el que queremos calcular la mediana para :

from pyspark.sql import Window import pyspark.sql.functions as F def calculate_median(dataframe, part_col, order_col): win = Window.partitionBy(*part_col).orderBy(order_col) # count_row = dataframe.groupby(*part_col).distinct().count() dataframe.persist() dataframe.count() temp = dataframe.withColumn("rank", F.row_number().over(win)) temp = temp.withColumn( "count_row_part", F.count(order_col).over(Window.partitionBy(part_col)) ) temp = temp.withColumn( "even_flag", F.when( F.col("count_row_part") %2 == 0, F.lit(1) ).otherwise( F.lit(0) ) ).withColumn( "mid_value", F.floor(F.col("count_row_part")/2) ) temp = temp.withColumn( "avg_flag", F.when( (F.col("even_flag")==1) & (F.col("rank") == F.col("mid_value"))| ((F.col("rank")-1) == F.col("mid_value")), F.lit(1) ).otherwise( F.when( F.col("rank") == F.col("mid_value")+1, F.lit(1) ) ) ) temp.show(10) return temp.filter( F.col("avg_flag") == 1 ).groupby( part_col + ["avg_flag"] ).agg( F.avg(F.col(order_col)).alias("median") ).drop("avg_flag")