cassandra - Spark: cómo unirse a los RDD por rango de tiempo
apache-spark (3)
Tengo un delicado problema con Spark, donde simplemente no puedo entender.
Tenemos dos RDD (provenientes de Cassandra). RDD1 contiene Actions
y RDD2 contiene datos Historic
. Ambos tienen una identificación en la cual pueden ser emparejados / unidos. Pero el problema es que las dos tablas tienen una relación N: N ship. Actions
contienen varias filas con la misma identificación y también lo hace Historic
. Aquí hay algunas fechas de ejemplo de ambas tablas.
Actions
tiempo de las Actions
es en realidad una marca de tiempo
id | time | valueX
1 | 12:05 | 500
1 | 12:30 | 500
2 | 12:30 | 125
Set_at Historic
es en realidad una marca de tiempo
id | set_at| valueY
1 | 11:00 | 400
1 | 12:15 | 450
2 | 12:20 | 50
2 | 12:25 | 75
¿Cómo podemos unir estas dos tablas de una manera, que obtenemos un resultado como este?
1 | 100 # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1 | 50 # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2 | 50 # 125 - 75 for Actions#3 with time 12:30 because H. was in that time at 75
No puedo encontrar una buena solución que se sienta bien, sin hacer muchas iteraciones sobre enormes conjuntos de datos. Siempre tengo que pensar en hacer un rango del conjunto Historic
y luego verificar de alguna manera si las Actions
ajustan en el rango, por ejemplo, (11:00 - 12:15) para hacer el cálculo. Pero eso parece bastante lento para mí. ¿Hay alguna manera más eficiente de hacer eso? Me parece que este tipo de problema podría ser popular, pero todavía no he podido encontrar ninguna pista al respecto. ¿Cómo resolverías este problema en chispa?
Mis intentos actuales hasta el momento (en código medio hecho)
case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)
historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...)
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))
// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple
Después de algunas horas de pensar, intentando y fallando, se me ocurrió esta solución. No estoy seguro de si es bueno, pero debido a la falta de otras opciones, esta es mi solución.
Primero ampliamos nuestra case class Historic
case class Historic(id: String, set_at: Long, valueY: Int) {
val set_at_map = new java.util.TreeMap[Long, Int]() // as it seems Scala doesn''t provides something like this with similar operations we''ll need a few lines later
set_at_map.put(0, valueY) // Means from the beginning of Epoch ...
set_at_map.put(set_at, valueY) // .. to the set_at date
// This is the fun part. With .getHistoricValue we can pass any timestamp and we will get the a value of the key back that contains the passed date. For more information look at this answer: http://.com/a/13400317/1209327
def getHistoricValue(date: Long) : Option[Int] = {
var e = set_at_map.floorEntry(date)
if (e != null && e.getValue == null) {
e = set_at_map.lowerEntry(date)
}
if ( e == null ) None else e.getValue()
}
}
La clase de casos está lista y ahora la llevamos a la acción
val historicRDD = sc.cassandraTable[Historic](...)
.map( row => ( row.id, row ) )
.reduceByKey( (row1, row2) => {
row1.set_at_map.put(row2.set_at, row2.valueY) // we add the historic Events up to each id
row1
})
// Now we load the Actions and map it by id as we did with Historic
val actionsRDD = sc.cassandraTable[Actions](...)
.map( row => ( row.id, row ) )
// Now both RDDs have the same key and we can join them
val fin = actionsRDD.join(historicRDD)
.map( row => {
( row._1.id,
(
row._2._1.id,
row._2._1.valueX - row._2._2.getHistoricValue(row._2._1.time).get // returns valueY for that timestamp
)
)
})
Soy totalmente nuevo en Scala, así que avíseme si podemos mejorar este código en algún lugar.
Es un problema interesante. También pasé un tiempo tratando de encontrar un enfoque. Esto es lo que se me ocurrió:
Se dan clases de casos para Action(id, time, x)
e Historic(id, time, y)
- Únete a las acciones con la historia (esto puede ser pesado)
- filtrar todos los datos históricos no relevantes para una acción dada
- clave los resultados por (id, time) - diferenciar la misma clave en diferentes momentos
- reducir el historial por acción al valor máximo, dejándonos un registro histórico relevante para la acción dada
En Spark:
val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) => (a1, if (h1.t>h2.t) h1 else h2)}
// we are done, let''s produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}
Usando los datos proporcionados anteriormente, el informe se ve así:
report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))
(Transformé el tiempo a segundos para tener una marca de tiempo simplista)
Sé que esta pregunta ha sido respondida, pero quiero agregar otra solución que funcionó para mí:
tu información -
Actions
id | time | valueX
1 | 12:05 | 500
1 | 12:30 | 500
2 | 12:30 | 125
Historic
id | set_at| valueY
1 | 11:00 | 400
1 | 12:15 | 450
2 | 12:20 | 50
2 | 12:25 | 75
-
Actions
sindicales eHistoric
Combined id | time | valueX | record-type 1 | 12:05 | 500 | Action 1 | 12:30 | 500 | Action 2 | 12:30 | 125 | Action 1 | 11:00 | 400 | Historic 1 | 12:15 | 450 | Historic 2 | 12:20 | 50 | Historic 2 | 12:25 | 75 | Historic
Escribe un particionador personalizado y usa repartitionAndSortWithinPartitions para dividir por
id
, pero ordena portime
.Partition-1 1 | 11:00 | 400 | Historic 1 | 12:05 | 500 | Action 1 | 12:15 | 450 | Historic 1 | 12:30 | 500 | Action Partition-2 2 | 12:20 | 50 | Historic 2 | 12:25 | 75 | Historic 2 | 12:30 | 125 | Action
Atraviesa los registros por partición.
Si se trata de un registro Historical
, agréguelo a un mapa o actualice el mapa si ya tiene ese ID: valueY
un registro del último valueY
por cada id
usando un mapa por partición.
Si es un registro de Action
, obtenga el valor valueY
del mapa y valueX
del valueX
Un mapa M
Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75
Podría intentar dividir y ordenar por time
, pero necesita fusionar las particiones más tarde. Y eso podría agregar complejidad.
Esto, me pareció preferible a la combinación muchos a muchos que usualmente recibimos cuando usamos los rangos de tiempo para unirnos.