time-series hdfs moving-average apache-spark

time series - Apache Spark Moving Average



time-series hdfs (3)

La media móvil es un problema complicado para Spark y cualquier sistema distribuido. Cuando los datos se distribuyen en varias máquinas, habrá algunas ventanas de tiempo que cruzan las particiones. Tenemos que duplicar los datos al inicio de las particiones, de modo que el cálculo de la media móvil por partición proporcione una cobertura completa.

Aquí hay una manera de hacer esto en Spark. Los datos de ejemplo:

val ts = sc.parallelize(0 to 100, 10) val window = 3

Un simple particionador que pone cada fila en la partición que especificamos con la clave:

class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = key.asInstanceOf[Int] }

Cree los datos con la primera window - 1 fila copiada en la partición anterior:

val partitioned = ts.mapPartitionsWithIndex((i, p) => { val overlap = p.take(window - 1).toArray val spill = overlap.iterator.map((i - 1, _)) val keep = (overlap.iterator ++ p).map((i, _)) if (i == 0) keep else keep ++ spill }).partitionBy(new StraightPartitioner(ts.partitions.length)).values

Simplemente calcule el promedio móvil en cada partición:

val movingAverage = partitioned.mapPartitions(p => { val sorted = p.toSeq.sorted val olds = sorted.iterator val news = sorted.iterator var sum = news.take(window - 1).sum (olds zip news).map({ case (o, n) => { sum += n val v = sum sum -= o v }}) })

Debido a los segmentos duplicados, esto no tendrá vacíos en la cobertura.

scala> movingAverage.collect.sameElements(3 to 297 by 3) res0: Boolean = true

Tengo un gran archivo en HDFS que tiene puntos de datos de la serie de tiempo (precios de las acciones de Yahoo).

Quiero encontrar el promedio móvil de la serie temporal, ¿cómo hago para escribir el trabajo Apache Spark para hacer eso?


Puede usar la función deslizante de MLLIB, que probablemente hace lo mismo que la respuesta de Daniel. Tendrá que ordenar los datos por tiempo antes de usar la función deslizante.

import org.apache.spark.mllib.rdd.RDDFunctions._ sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect()


Spark 1.4 introdujo las funciones de ventana , lo que significa que puede hacer una media móvil de la siguiente manera ajustar la ventana con filas entre :

val schema = Seq("id", "cykle", "value") val data = Seq( (1, 1, 1), (1, 2, 11), (1, 3, 1), (1, 4, 11), (1, 5, 1), (1, 6, 11), (2, 1, 1), (2, 2, 11), (2, 3, 1), (2, 4, 11), (2, 5, 1), (2, 6, 11) ) val dft = sc.parallelize(data).toDF(schema: _*) dft.select(''*).show // PARTITION BY id ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5) val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2) val x = dft.select($"id",$"cykle",avg($"value").over(w)) x.show

Salida (en zeppelin):

schema: Seq[String] = List(id, cykle, value) data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11)) dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int] +---+-----+-----+ | id|cykle|value| +---+-----+-----+ | 1| 1| 1| | 1| 2| 11| | 1| 3| 1| | 1| 4| 11| | 1| 5| 1| | 1| 6| 11| | 2| 1| 1| | 2| 2| 11| | 2| 3| 1| | 2| 4| 11| | 2| 5| 1| | 2| 6| 11| +---+-----+-----+ w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, ''avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double] +---+-----+-------------------------------------------------------------------------+ | id|cykle|''avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING| +---+-----+-------------------------------------------------------------------------+ | 1| 1| 4.333333333333333| | 1| 2| 6.0| | 1| 3| 5.0| | 1| 4| 7.0| | 1| 5| 6.0| | 1| 6| 7.666666666666667| | 2| 1| 4.333333333333333| | 2| 2| 6.0| | 2| 3| 5.0| | 2| 4| 7.0| | 2| 5| 6.0| | 2| 6| 7.666666666666667| +---+-----+————————————————————————————————————+