python - textvariable - tk combobox
Cómo transformar datos con ventana deslizante sobre datos de series temporales en Pyspark (3)
Para agregar a la respuesta de venuktan , aquí es cómo crear una ventana deslizante basada en el tiempo usando Spark SQL y retener el contenido completo de la ventana, en lugar de tomar un agregado de la misma. Esto era necesario en mi caso de uso de preprocesamiento de datos de series temporales en ventanas deslizantes para la entrada en Spark ML.
Una limitación de este enfoque es que suponemos que desea tomar ventanas deslizantes con el tiempo.
En primer lugar, puede crear su Spark DataFrame, por ejemplo leyendo un archivo CSV:
df = spark.read.csv(''foo.csv'')
Suponemos que su archivo CSV tiene dos columnas: una de las cuales es una marca de tiempo de Unix y la otra es una columna de la que desea extraer ventanas deslizantes.
from pyspark.sql import functions as f
window_duration = ''1000 millisecond''
slide_duration = ''500 millisecond''
df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) /
.groupBy(f.window("_c0", window_duration, slide_duration)) /
.agg(f.collect_list(f.array(''_c1''))) /
.withColumnRenamed(''collect_list(array(_c1))'', ''sliding_window'')
Bonificación: para convertir esta columna de matriz al formato DenseVector requerido para Spark ML, consulte el enfoque UDF aquí .
Bono adicional: para anidar la columna resultante, de modo que cada elemento de su ventana deslizante tenga su propia columna, intente este enfoque aquí .
Espero que esto ayude, por favor avíseme si puedo aclarar algo.
Estoy tratando de extraer características basadas en una ventana deslizante sobre datos de series temporales.
En Scala, parece que hay una función
sliding
basada en
esta publicación
y
la documentación
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
Mis preguntas, ¿hay funciones similares en PySpark? ¿O cómo logramos transformaciones de ventanas deslizantes similares si aún no existe tal función?
Por lo que puedo decir, la función
sliding
no está disponible desde Python y
SlidingRDD
es una clase privada y no se puede acceder fuera de
MLlib
.
Si usa el
sliding
en un RDD existente, puede crear el
sliding
del pobre de la siguiente manera:
def sliding(rdd, n):
assert n > 0
def gen_window(xi, n):
x, i = xi
return [(i - offset, (i, x)) for offset in xrange(n)]
return (
rdd.
zipWithIndex(). # Add index
flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
groupByKey(). # Group to create windows
# Sort values to ensure order inside window and drop indices
mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
sortByKey(). # Sort to makes sure we keep original order
values(). # Get values
filter(lambda x: len(x) == n)) # Drop beginning and end
Alternativamente, puede probar algo como esto (con una pequeña ayuda de
toolz
)
from toolz.itertoolz import sliding_window, concat
def sliding2(rdd, n):
assert n > 1
def get_last_el(i, iter):
"""Return last n - 1 elements from the partition"""
return [(i, [x for x in iter][(-n + 1):])]
def slide(i, iter):
"""Prepend previous items and return sliding window"""
return sliding_window(n, concat([last_items.value[i - 1], iter]))
def clean_last_items(last_items):
"""Adjust for empty or to small partitions"""
clean = {-1: [None] * (n - 1)}
for i in range(rdd.getNumPartitions()):
clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
return {k: tuple(v) for k, v in clean.items()}
last_items = sc.broadcast(clean_last_items(
rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))
return rdd.mapPartitionsWithIndex(slide)
spark 1.4 tiene funciones de ventana, como se describe aquí: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
Espero que ayude, por favor hágamelo saber.