the textvariable texto tamaño modificar entry como comando borrar python apache-spark time-series pyspark

python - textvariable - tk combobox



Cómo transformar datos con ventana deslizante sobre datos de series temporales en Pyspark (3)

Para agregar a la respuesta de venuktan , aquí es cómo crear una ventana deslizante basada en el tiempo usando Spark SQL y retener el contenido completo de la ventana, en lugar de tomar un agregado de la misma. Esto era necesario en mi caso de uso de preprocesamiento de datos de series temporales en ventanas deslizantes para la entrada en Spark ML.

Una limitación de este enfoque es que suponemos que desea tomar ventanas deslizantes con el tiempo.

En primer lugar, puede crear su Spark DataFrame, por ejemplo leyendo un archivo CSV:

df = spark.read.csv(''foo.csv'')

Suponemos que su archivo CSV tiene dos columnas: una de las cuales es una marca de tiempo de Unix y la otra es una columna de la que desea extraer ventanas deslizantes.

from pyspark.sql import functions as f window_duration = ''1000 millisecond'' slide_duration = ''500 millisecond'' df.withColumn("_c0", f.from_unixtime(f.col("_c0"))) / .groupBy(f.window("_c0", window_duration, slide_duration)) / .agg(f.collect_list(f.array(''_c1''))) / .withColumnRenamed(''collect_list(array(_c1))'', ''sliding_window'')

Bonificación: para convertir esta columna de matriz al formato DenseVector requerido para Spark ML, consulte el enfoque UDF aquí .

Bono adicional: para anidar la columna resultante, de modo que cada elemento de su ventana deslizante tenga su propia columna, intente este enfoque aquí .

Espero que esto ayude, por favor avíseme si puedo aclarar algo.

Estoy tratando de extraer características basadas en una ventana deslizante sobre datos de series temporales. En Scala, parece que hay una función sliding basada en esta publicación y la documentación

import org.apache.spark.mllib.rdd.RDDFunctions._ sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect()

Mis preguntas, ¿hay funciones similares en PySpark? ¿O cómo logramos transformaciones de ventanas deslizantes similares si aún no existe tal función?


Por lo que puedo decir, la función sliding no está disponible desde Python y SlidingRDD es una clase privada y no se puede acceder fuera de MLlib .

Si usa el sliding en un RDD existente, puede crear el sliding del pobre de la siguiente manera:

def sliding(rdd, n): assert n > 0 def gen_window(xi, n): x, i = xi return [(i - offset, (i, x)) for offset in xrange(n)] return ( rdd. zipWithIndex(). # Add index flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset groupByKey(). # Group to create windows # Sort values to ensure order inside window and drop indices mapValues(lambda vals: [x for (i, x) in sorted(vals)]). sortByKey(). # Sort to makes sure we keep original order values(). # Get values filter(lambda x: len(x) == n)) # Drop beginning and end

Alternativamente, puede probar algo como esto (con una pequeña ayuda de toolz )

from toolz.itertoolz import sliding_window, concat def sliding2(rdd, n): assert n > 1 def get_last_el(i, iter): """Return last n - 1 elements from the partition""" return [(i, [x for x in iter][(-n + 1):])] def slide(i, iter): """Prepend previous items and return sliding window""" return sliding_window(n, concat([last_items.value[i - 1], iter])) def clean_last_items(last_items): """Adjust for empty or to small partitions""" clean = {-1: [None] * (n - 1)} for i in range(rdd.getNumPartitions()): clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):] return {k: tuple(v) for k, v in clean.items()} last_items = sc.broadcast(clean_last_items( rdd.mapPartitionsWithIndex(get_last_el).collectAsMap())) return rdd.mapPartitionsWithIndex(slide)