Spark scala: cómo hacer count() acondicionando en dos filas
apache-spark time-series (1)
Soy novato en spark scala y me disculpo por hacer una pregunta tonta (si lo es). Estoy atascado en un problema que simplifiqué de la siguiente manera:
Hay un marco de datos con tres columnas, "machineID" es la identidad de una máquina. "startTime" es la marca de tiempo de inicio de una tarea. "endTime" es la marca de tiempo de finalización de una tarea.
Mi objetivo es contar cuántos intervalos de inactividad tiene cada máquina.
Por ejemplo,
en la tabla a continuación, la primera y la segunda fila muestran que la máquina n. ° 1 comenzó en el tiempo 0 y finalizó en el momento 3, y comenzó de nuevo en el tiempo 4, por lo que el intervalo de tiempo [3, 4] está inactivo. Para la 3ra y 4ta fila, la máquina n. ° 1 comenzó en el tiempo 10 y finalizó en el momento 20, y comenzó de nuevo inmediatamente, por lo que no hay tiempo de inactividad.
machineID, startTime, endTime
1, 0, 3
1, 4, 8
1, 10, 20
1, 20, 31
...
1, 412, 578
...
2, 231, 311
2, 781, 790
...
El marco de datos ya ha sido groupBy ("machineID").
Estoy usando spark 2.0.1 y scala 2.11.8
Para acceder a las filas anteriores / siguientes en un DataFrame, podemos usar las funciones de Ventana. En este caso, vamos a usar lag
para acceder a la hora de finalización anterior, agrupada por machineId.
import org.apache.spark.sql.expressions.Window
// Dataframe Schema
case class MachineData(id:String, start:Int, end:Int)
// Sample Data
machineDF.show
+---+-----+---+
| id|start|end|
+---+-----+---+
| 1| 0| 3|
| 1| 4| 8|
| 1| 10| 20|
| 1| 20| 31|
| 1| 412|578|
| 2| 231|311|
| 2| 781|790|
+---+-----+---+
// define the window as a partition over machineId, ordered by start (time)
val byMachine = Window.partitionBy($"id").orderBy($"start")
// we define a new column, "previous end" using the Lag Window function over the previously defined window
val prevEnd = lag($"end", 1).over(byMachine)
// new DF with the prevEnd column
val withPrevEnd = machineDF.withColumn("prevEnd", prevEnd)
withPrevEnd.show
+---+-----+---+-------+
| id|start|end|prevEnd|
+---+-----+---+-------+
| 1| 0| 3| null|
| 1| 4| 8| 3|
| 1| 10| 20| 8|
| 1| 20| 31| 20|
| 1| 412|578| 31|
| 2| 231|311| null|
| 2| 781|790| 311|
+---+-----+---+-------+
// we''re calculating the idle intervals as the numerical diff as an example
val idleIntervals = withPrevEnd.withColumn("diff", $"start"-$"prevEnd")
idleIntervals.show
+---+-----+---+-------+----+
| id|start|end|prevEnd|diff|
+---+-----+---+-------+----+
| 1| 0| 3| null|null|
| 1| 4| 8| 3| 1|
| 1| 10| 20| 8| 2|
| 1| 20| 31| 20| 0|
| 1| 412|578| 31| 381|
| 2| 231|311| null|null|
| 2| 781|790| 311| 470|
+---+-----+---+-------+----+
// to calculate the total, we are summing over the differences. Adapt this as your business logic requires.
val totalIdleIntervals = idleIntervals.select($"id",$"diff").groupBy($"id").agg(sum("diff"))
+---+---------+
| id|sum(diff)|
+---+---------+
| 1| 384|
| 2| 470|
+---+---------+