scala apache-spark apache-spark-sql time-series

scala - Rellenar huecos en series temporales Spark



apache-spark apache-spark-sql (1)

Tengo un problema con los datos de series temporales. Debido a fallas de alimentación faltan algunas marcas de tiempo en el conjunto de datos. Necesito llenar estos vacíos agregando filas, y después de eso, puedo interpolar los valores faltantes.

Datos de entrada:

periodstart usage --------------------------------- 2015-09-11 02:15 23000 2015-09-11 03:15 23344 2015-09-11 03:30 23283 2015-09-11 03:45 23786 2015-09-11 04:00 25039

Salida deseada:

periodstart usage --------------------------------- 2015-09-11 02:15 23000 2015-09-11 02:30 0 2015-09-11 02:45 0 2015-09-11 03:00 0 2015-09-11 03:15 23344 2015-09-11 03:30 23283 2015-09-11 03:45 23786 2015-09-11 04:00 25039

Ahora lo he solucionado con un ciclo while dentro de una función foreach de conjunto de datos. El problema es que tengo que recopilar el conjunto de datos primero en el controlador antes de poder hacer un ciclo while. Así que esa no es la forma correcta para Spark.

¿Alguien puede darme una mejor solución?

Este es mi código:

MissingMeasurementsDS.collect().foreach(row => { // empty list for new generated measurements val output = ListBuffer.empty[Measurement] // Missing measurements val missingMeasurements = row.getAs[Int]("missingmeasurements") val lastTimestamp = row.getAs[Timestamp]("previousperiodstart") //Generate missing timestamps var i = 1 while (i <= missingMeasurements) { //Increment timestamp with 15 minutes (900000 milliseconds) val newTimestamp = lastTimestamp.getTime + (900000 * i) output += Measurement(new Timestamp(newTimestamp), 0)) i += 1 } //Join interpolated measurements with correct measurements completeMeasurementsDS.join(output.toDS()) }) completeMeasurementsDS.show() println("OutputDF count = " + completeMeasurementsDS.count())


Si el DataFrame entrada tiene la siguiente estructura:

root |-- periodstart: timestamp (nullable = true) |-- usage: long (nullable = true)

Scala

Determinar min / max:

val (minp, maxp) = df .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint"))) .as[(Long, Long)] .first

Establezca el paso, por ejemplo, durante 15 minutos:

val step: Long = 15 * 60

Generar rango de referencia:

val reference = spark .range((minp / step) * step, ((maxp / step) + 1) * step, step) .select($"id".cast("timestamp").alias("periodstart"))

Únete y llena los vacíos:

reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))

Pitón

Del mismo modo en PySpark:

from pyspark.sql.functions import col, min as min_, max as max_ step = 15 * 60 minp, maxp = df.select( min_("periodstart").cast("long"), max_("periodstart").cast("long") ).first() reference = spark.range( (minp / step) * step, ((maxp / step) + 1) * step, step ).select(col("id").cast("timestamp").alias("periodstart")) reference.join(df, ["periodstart"], "leftouter")