algorithm scala apache-spark

algorithm - Apache Spark-Manejo de ventanas deslizantes en RDD temporales



scala apache-spark (1)

He estado trabajando bastante con Apache Spark en los últimos meses, pero ahora he recibido una tarea bastante difícil, calcular el promedio / mínimo / máximo, etc. en una ventana deslizante sobre un RDD emparejado donde el componente clave es una etiqueta de fecha y El componente de valor es una matriz. Por lo tanto, cada función de agregación también debe devolver una matriz, donde se promedia el promedio de todas las celdas en el período de tiempo.

Quiero poder decir que quiero el promedio por cada 7 días, con una ventana deslizante de un día. La unidad de movimiento de la ventana deslizante es siempre una, y luego la unidad del tamaño de la ventana (por lo tanto, si es cada 12 semanas, la unidad de movimiento de la ventana es 1).

Mi pensamiento inicial ahora es simplemente iterar, si queremos un promedio por X días, X veces, y cada vez que simplemente agrupemos los elementos por su fecha, con un desplazamiento.

Así que si tenemos este escenario:

Días: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

Matrices: ABCDEFGHIJKLMNO

Y queremos el promedio por 5 días, lo repetiré 5 veces y mostraré la agrupación aquí:

Primera iteración:

Grupo 1: (1, A) (2, B) (3, C) (4, D) (5, E)

Grupo 2: (6, F) (7, G) (8, H) (9, I) (10, J)

Grupo 3: (11, K) (12, L) (13, M) (14, N) (15, O)

Segunda iteración:

Grupo 1: (2, B) (3, C) (4, D) (5, E) (6, F)

Grupo 2: (7, G) (8, H) (9, I) (10, J), (11, K)

Grupo 3: (12, L) (13, M) (14, N) (15, O)

Etcétera, y para cada grupo, tengo que hacer un procedimiento de plegado / reducción para obtener el promedio.

Sin embargo, como puede imaginar, esto es bastante lento y probablemente sea una forma bastante mala de hacerlo. No puedo realmente encontrar una mejor manera de hacerlo.


Si te conviertes en un DataFrame, todo esto se vuelve mucho más simple: puedes unirte automáticamente a los datos y encontrar el promedio. Digamos que tengo una serie de datos como este:

tsDF.show date amount 1970-01-01 10.0 1970-01-01 5.0 1970-01-01 7.0 1970-01-02 14.0 1970-01-02 13.9 1970-01-03 1.0 1970-01-03 5.0 1970-01-03 9.0 1970-01-04 9.0 1970-01-04 5.8 1970-01-04 2.8 1970-01-04 8.9 1970-01-05 8.1 1970-01-05 2.1 1970-01-05 2.78 1970-01-05 20.78

Que se enrolla como:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show date SUM(amount) COUNT(date) 1970-01-01 22.0 3 1970-01-02 27.9 2 1970-01-03 15.0 3 1970-01-04 26.5 4 1970-01-05 33.76 4

Luego necesitaría crear un UDF para cambiar la fecha de la condición de unión (tenga en cuenta que solo estoy usando una ventana de 2 días usando offset = -2 ):

def dateShift(myDate: java.sql.Date): java.sql.Date = { val offset = -2; val cal = Calendar.getInstance; cal.setTime(myDate); cal.add(Calendar.DATE, offset); new java.sql.Date(cal.getTime.getTime) } val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)

Y luego podría encontrar fácilmente un promedio móvil de 2 días como este:

val windowDF = tsDF.select($"date") .groupBy($"date") .agg($"date") .join( tsDF.select($"date" as "r_date", $"amount" as "r_amount"), $"r_date" > udfDateShift($"date") and $"r_date" <= $"date" ) .groupBy($"date") .agg($"date",avg($"r_amount") as "2 day avg amount / record") val windowDF.show date 2 day avg amount / record 1970-01-01 7.333333333333333 1970-01-02 9.98 1970-01-03 8.58 1970-01-04 5.928571428571429 1970-01-05 7.5325

Si bien esto no es exactamente lo que estaba tratando de hacer, usted ve cómo puede usar una auto-unión DataFrame para extraer promedios en ejecución de un conjunto de datos. Espero que hayas encontrado esto útil.