scala - Rellenar huecos en series temporales Spark
apache-spark apache-spark-sql (1)
Tengo un problema con los datos de series temporales. Debido a fallas de alimentación faltan algunas marcas de tiempo en el conjunto de datos. Necesito llenar estos vacíos agregando filas, y después de eso, puedo interpolar los valores faltantes.
Datos de entrada:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
Salida deseada:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 02:30 0
2015-09-11 02:45 0
2015-09-11 03:00 0
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
Ahora lo he solucionado con un ciclo while dentro de una función foreach de conjunto de datos. El problema es que tengo que recopilar el conjunto de datos primero en el controlador antes de poder hacer un ciclo while. Así que esa no es la forma correcta para Spark.
¿Alguien puede darme una mejor solución?
Este es mi código:
MissingMeasurementsDS.collect().foreach(row => {
// empty list for new generated measurements
val output = ListBuffer.empty[Measurement]
// Missing measurements
val missingMeasurements = row.getAs[Int]("missingmeasurements")
val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
//Generate missing timestamps
var i = 1
while (i <= missingMeasurements) {
//Increment timestamp with 15 minutes (900000 milliseconds)
val newTimestamp = lastTimestamp.getTime + (900000 * i)
output += Measurement(new Timestamp(newTimestamp), 0))
i += 1
}
//Join interpolated measurements with correct measurements
completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())
Si el
DataFrame
entrada tiene la siguiente estructura:
root
|-- periodstart: timestamp (nullable = true)
|-- usage: long (nullable = true)
Scala
Determinar min / max:
val (minp, maxp) = df
.select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
.as[(Long, Long)]
.first
Establezca el paso, por ejemplo, durante 15 minutos:
val step: Long = 15 * 60
Generar rango de referencia:
val reference = spark
.range((minp / step) * step, ((maxp / step) + 1) * step, step)
.select($"id".cast("timestamp").alias("periodstart"))
Únete y llena los vacíos:
reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))
Pitón
Del mismo modo en PySpark:
from pyspark.sql.functions import col, min as min_, max as max_
step = 15 * 60
minp, maxp = df.select(
min_("periodstart").cast("long"), max_("periodstart").cast("long")
).first()
reference = spark.range(
(minp / step) * step, ((maxp / step) + 1) * step, step
).select(col("id").cast("timestamp").alias("periodstart"))
reference.join(df, ["periodstart"], "leftouter")