scala apache-spark count time-series

Spark scala: cómo hacer count() acondicionando en dos filas



apache-spark time-series (1)

Soy novato en spark scala y me disculpo por hacer una pregunta tonta (si lo es). Estoy atascado en un problema que simplifiqué de la siguiente manera:

Hay un marco de datos con tres columnas, "machineID" es la identidad de una máquina. "startTime" es la marca de tiempo de inicio de una tarea. "endTime" es la marca de tiempo de finalización de una tarea.

Mi objetivo es contar cuántos intervalos de inactividad tiene cada máquina.
Por ejemplo,
en la tabla a continuación, la primera y la segunda fila muestran que la máquina n. ° 1 comenzó en el tiempo 0 y finalizó en el momento 3, y comenzó de nuevo en el tiempo 4, por lo que el intervalo de tiempo [3, 4] está inactivo. Para la 3ra y 4ta fila, la máquina n. ° 1 comenzó en el tiempo 10 y finalizó en el momento 20, y comenzó de nuevo inmediatamente, por lo que no hay tiempo de inactividad.

machineID, startTime, endTime 1, 0, 3 1, 4, 8 1, 10, 20 1, 20, 31 ... 1, 412, 578 ... 2, 231, 311 2, 781, 790 ...

El marco de datos ya ha sido groupBy ("machineID").
Estoy usando spark 2.0.1 y scala 2.11.8


Para acceder a las filas anteriores / siguientes en un DataFrame, podemos usar las funciones de Ventana. En este caso, vamos a usar lag para acceder a la hora de finalización anterior, agrupada por machineId.

import org.apache.spark.sql.expressions.Window // Dataframe Schema case class MachineData(id:String, start:Int, end:Int) // Sample Data machineDF.show +---+-----+---+ | id|start|end| +---+-----+---+ | 1| 0| 3| | 1| 4| 8| | 1| 10| 20| | 1| 20| 31| | 1| 412|578| | 2| 231|311| | 2| 781|790| +---+-----+---+ // define the window as a partition over machineId, ordered by start (time) val byMachine = Window.partitionBy($"id").orderBy($"start") // we define a new column, "previous end" using the Lag Window function over the previously defined window val prevEnd = lag($"end", 1).over(byMachine) // new DF with the prevEnd column val withPrevEnd = machineDF.withColumn("prevEnd", prevEnd) withPrevEnd.show +---+-----+---+-------+ | id|start|end|prevEnd| +---+-----+---+-------+ | 1| 0| 3| null| | 1| 4| 8| 3| | 1| 10| 20| 8| | 1| 20| 31| 20| | 1| 412|578| 31| | 2| 231|311| null| | 2| 781|790| 311| +---+-----+---+-------+ // we''re calculating the idle intervals as the numerical diff as an example val idleIntervals = withPrevEnd.withColumn("diff", $"start"-$"prevEnd") idleIntervals.show +---+-----+---+-------+----+ | id|start|end|prevEnd|diff| +---+-----+---+-------+----+ | 1| 0| 3| null|null| | 1| 4| 8| 3| 1| | 1| 10| 20| 8| 2| | 1| 20| 31| 20| 0| | 1| 412|578| 31| 381| | 2| 231|311| null|null| | 2| 781|790| 311| 470| +---+-----+---+-------+----+ // to calculate the total, we are summing over the differences. Adapt this as your business logic requires. val totalIdleIntervals = idleIntervals.select($"id",$"diff").groupBy($"id").agg(sum("diff")) +---+---------+ | id|sum(diff)| +---+---------+ | 1| 384| | 2| 470| +---+---------+