python - truncada - Cómo encontrar medianas y cuantiles usando Spark
probabilidad y estadistica en python (4)
Spark 2.0+:
Puede usar el método
approxQuantile
que implementa el
algoritmo Greenwald-Khanna
:
Python :
df.approxQuantile("x", [0.5], 0.25)
Scala :
df.stat.approxQuantile("x", Array(0.5), 0.25)
donde el último parámetro es un error relativo. Cuanto menor sea el número, los resultados más precisos y el cálculo más costoso.
Desde Spark 2.2 ( SPARK-14352 ) admite la estimación en múltiples columnas:
df.approxQuantile(["x", "y", "z"], [0.5], 0.25)
y
df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)
Chispa <2.0
Pitón
Como he mencionado en los comentarios, lo más probable es que no valga la pena. Si los datos son relativamente pequeños como en su caso, simplemente recopile y calcule la mediana localmente:
import numpy as np
np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, size=700000))
%time np.median(rdd.collect())
np.array(rdd.collect()).nbytes
Toma alrededor de 0.01 segundos en mi computadora de pocos años y alrededor de 5.5MB de memoria.
Si los datos son mucho más grandes, la clasificación será un factor limitante, por lo que en lugar de obtener un valor exacto, probablemente sea mejor muestrear, recopilar y calcular localmente. Pero si realmente quieres usar Spark, algo como esto debería hacer el truco (si no estropeé nada):
from numpy import floor
import time
def quantile(rdd, p, sample=None, seed=None):
"""Compute a quantile of order p ∈ [0, 1]
:rdd a numeric rdd
:p quantile(between 0 and 1)
:sample fraction of and rdd to use. If not provided we use a whole dataset
:seed random number generator seed to be used with sample
"""
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd.
sortBy(lambda x: x).
zipWithIndex().
map(lambda (x, i): (i, x)).
cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(floor(h)) + np.array([0L, 1L]))
return rddX + (h - floor(h)) * (rddXPlusOne - rddX)
Y algunas pruebas:
np.median(rdd.collect()), quantile(rdd, 0.5)
## (500184.5, 500184.5)
np.percentile(rdd.collect(), 25), quantile(rdd, 0.25)
## (250506.75, 250506.75)
np.percentile(rdd.collect(), 75), quantile(rdd, 0.75)
(750069.25, 750069.25)
Finalmente, definamos la mediana:
from functools import partial
median = partial(quantile, p=0.5)
Hasta ahora todo bien, pero tarda 4.66 s en modo local sin ninguna comunicación de red. Probablemente hay una forma de mejorar esto, pero ¿por qué molestarse?
Idioma independiente ( Hive UDAF ):
Si usa
HiveContext
también puede usar Hive UDAF.
Con valores integrales:
rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")
Con valores continuos:
sqlContext.sql("SELECT percentile(x, 0.5) FROM df")
En
percentile_approx
puede pasar un argumento adicional que determina una cantidad de registros para usar.
¿Cómo puedo encontrar la mediana de un
RDD
de enteros usando un método distribuido, IPython y Spark?
El
RDD
tiene aproximadamente 700,000 elementos y, por lo tanto, es demasiado grande para recolectar y encontrar la mediana.
Esta pregunta es similar a esta pregunta. Sin embargo, la respuesta a la pregunta es usar Scala, que no sé.
¿Cómo puedo calcular la mediana exacta con Apache Spark?
Usando el pensamiento para la respuesta de Scala, estoy tratando de escribir una respuesta similar en Python.
Sé que primero quiero ordenar el
RDD
.
No se como.
Veo el
sortBy
(
sortBy
este RDD por el
keyfunc
dado) y
sortByKey
(
sortByKey
este
RDD
, que se supone que consiste en pares (clave, valor)).
Creo que ambos usan el valor clave y mi
RDD
solo tiene elementos enteros.
-
Primero, estaba pensando en hacer
myrdd.sortBy(lambda x: x)
? -
A continuación, encontraré la longitud de rdd (
rdd.count()
). - Finalmente, quiero encontrar el elemento o 2 elementos en el centro del rdd. Necesito ayuda con este método también.
EDITAR:
Tuve una idea.
Tal vez pueda indexar mi
RDD
y luego clave = índice y valor = elemento.
¿Y luego puedo intentar ordenar por valor?
No sé si esto es posible porque solo hay un método
sortByKey
.
Agregar una solución si solo desea un método RDD y no desea pasar a DF. Este fragmento puede obtener un percentil para un RDD del doble.
Si ingresa el percentil como 50, debe obtener su mediana requerida. Avíseme si hay casos de esquina que no se tienen en cuenta.
/**
* Gets the nth percentile entry for an RDD of doubles
*
* @param inputScore : Input scores consisting of a RDD of doubles
* @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
* It prefers the higher value when the desired quantile lies between two data points
* @return : The number best representing the percentile in the Rdd of double
*/
def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = {
val numEntries = inputScore.count().toDouble
val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt
inputScore
.sortBy { case (score) => score }
.zipWithIndex()
.filter { case (score, index) => index == retrievedEntry }
.map { case (score, index) => score }
.collect()(0)
}
Aquí está el método que utilicé usando funciones de ventana (con pyspark 2.2.0).
from pyspark.sql import DataFrame
class median():
""" Create median class with over method to pass partition """
def __init__(self, df, col, name):
assert col
self.column=col
self.df = df
self.name = name
def over(self, window):
from pyspark.sql.functions import percent_rank, pow, first
first_window = window.orderBy(self.column) # first, order by column we want to compute the median for
df = self.df.withColumn("percent_rank", percent_rank().over(first_window)) # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = window.orderBy(pow(df.percent_rank-0.5, 2)) # order by (percent_rank - 0.5)^2 ascending
return df.withColumn(self.name, first(self.column).over(second_window)) # the first row of the window corresponds to median
def addMedian(self, col, median_name):
""" Method to be added to spark native DataFrame class """
return median(self, col, median_name)
# Add method to DataFrame class
DataFrame.addMedian = addMedian
Luego llame al método addMedian para calcular la mediana de col2:
from pyspark.sql import Window
median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)
Finalmente puede agrupar por si es necesario.
df.groupby("col1", "median")
He escrito la función que toma el marco de datos como entrada y devuelve un marco de datos que tiene una mediana como salida sobre una partición y order_col es la columna para la que queremos calcular la mediana para part_col es el nivel en el que queremos calcular la mediana para :
from pyspark.sql import Window
import pyspark.sql.functions as F
def calculate_median(dataframe, part_col, order_col):
win = Window.partitionBy(*part_col).orderBy(order_col)
# count_row = dataframe.groupby(*part_col).distinct().count()
dataframe.persist()
dataframe.count()
temp = dataframe.withColumn("rank", F.row_number().over(win))
temp = temp.withColumn(
"count_row_part",
F.count(order_col).over(Window.partitionBy(part_col))
)
temp = temp.withColumn(
"even_flag",
F.when(
F.col("count_row_part") %2 == 0,
F.lit(1)
).otherwise(
F.lit(0)
)
).withColumn(
"mid_value",
F.floor(F.col("count_row_part")/2)
)
temp = temp.withColumn(
"avg_flag",
F.when(
(F.col("even_flag")==1) &
(F.col("rank") == F.col("mid_value"))|
((F.col("rank")-1) == F.col("mid_value")),
F.lit(1)
).otherwise(
F.when(
F.col("rank") == F.col("mid_value")+1,
F.lit(1)
)
)
)
temp.show(10)
return temp.filter(
F.col("avg_flag") == 1
).groupby(
part_col + ["avg_flag"]
).agg(
F.avg(F.col(order_col)).alias("median")
).drop("avg_flag")