cassandra apache-spark rdd

cassandra - Spark: cómo unirse a los RDD por rango de tiempo



apache-spark (3)

Tengo un delicado problema con Spark, donde simplemente no puedo entender.

Tenemos dos RDD (provenientes de Cassandra). RDD1 contiene Actions y RDD2 contiene datos Historic . Ambos tienen una identificación en la cual pueden ser emparejados / unidos. Pero el problema es que las dos tablas tienen una relación N: N ship. Actions contienen varias filas con la misma identificación y también lo hace Historic . Aquí hay algunas fechas de ejemplo de ambas tablas.

Actions tiempo de las Actions es en realidad una marca de tiempo

id | time | valueX 1 | 12:05 | 500 1 | 12:30 | 500 2 | 12:30 | 125

Set_at Historic es en realidad una marca de tiempo

id | set_at| valueY 1 | 11:00 | 400 1 | 12:15 | 450 2 | 12:20 | 50 2 | 12:25 | 75

¿Cómo podemos unir estas dos tablas de una manera, que obtenemos un resultado como este?

1 | 100 # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400 1 | 50 # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450 2 | 50 # 125 - 75 for Actions#3 with time 12:30 because H. was in that time at 75

No puedo encontrar una buena solución que se sienta bien, sin hacer muchas iteraciones sobre enormes conjuntos de datos. Siempre tengo que pensar en hacer un rango del conjunto Historic y luego verificar de alguna manera si las Actions ajustan en el rango, por ejemplo, (11:00 - 12:15) para hacer el cálculo. Pero eso parece bastante lento para mí. ¿Hay alguna manera más eficiente de hacer eso? Me parece que este tipo de problema podría ser popular, pero todavía no he podido encontrar ninguna pista al respecto. ¿Cómo resolverías este problema en chispa?

Mis intentos actuales hasta el momento (en código medio hecho)

case class Historic(id: String, set_at: Long, valueY: Int) val historicRDD = sc.cassandraTable[Historic](...) historicRDD .map( row => ( row.id, row ) ) .reduceByKey(...) // transforming to another case which results in something like this; code not finished yet // (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450))) // From here we could join with Actions // And then some .filter maybe to select the right Lists tuple


Después de algunas horas de pensar, intentando y fallando, se me ocurrió esta solución. No estoy seguro de si es bueno, pero debido a la falta de otras opciones, esta es mi solución.

Primero ampliamos nuestra case class Historic

case class Historic(id: String, set_at: Long, valueY: Int) { val set_at_map = new java.util.TreeMap[Long, Int]() // as it seems Scala doesn''t provides something like this with similar operations we''ll need a few lines later set_at_map.put(0, valueY) // Means from the beginning of Epoch ... set_at_map.put(set_at, valueY) // .. to the set_at date // This is the fun part. With .getHistoricValue we can pass any timestamp and we will get the a value of the key back that contains the passed date. For more information look at this answer: http://.com/a/13400317/1209327 def getHistoricValue(date: Long) : Option[Int] = { var e = set_at_map.floorEntry(date) if (e != null && e.getValue == null) { e = set_at_map.lowerEntry(date) } if ( e == null ) None else e.getValue() } }

La clase de casos está lista y ahora la llevamos a la acción

val historicRDD = sc.cassandraTable[Historic](...) .map( row => ( row.id, row ) ) .reduceByKey( (row1, row2) => { row1.set_at_map.put(row2.set_at, row2.valueY) // we add the historic Events up to each id row1 }) // Now we load the Actions and map it by id as we did with Historic val actionsRDD = sc.cassandraTable[Actions](...) .map( row => ( row.id, row ) ) // Now both RDDs have the same key and we can join them val fin = actionsRDD.join(historicRDD) .map( row => { ( row._1.id, ( row._2._1.id, row._2._1.valueX - row._2._2.getHistoricValue(row._2._1.time).get // returns valueY for that timestamp ) ) })

Soy totalmente nuevo en Scala, así que avíseme si podemos mejorar este código en algún lugar.


Es un problema interesante. También pasé un tiempo tratando de encontrar un enfoque. Esto es lo que se me ocurrió:

Se dan clases de casos para Action(id, time, x) e Historic(id, time, y)

  • Únete a las acciones con la historia (esto puede ser pesado)
  • filtrar todos los datos históricos no relevantes para una acción dada
  • clave los resultados por (id, time) - diferenciar la misma clave en diferentes momentos
  • reducir el historial por acción al valor máximo, dejándonos un registro histórico relevante para la acción dada

En Spark:

val actionById = actions.keyBy(_.id) val historyById = historic.keyBy(_.id) val actionByHistory = actionById.join(historyById) val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))} val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) => (a1, if (h1.t>h2.t) h1 else h2)} // we are done, let''s produce a report now val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

Usando los datos proporcionados anteriormente, el informe se ve así:

report.collect Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(Transformé el tiempo a segundos para tener una marca de tiempo simplista)


Sé que esta pregunta ha sido respondida, pero quiero agregar otra solución que funcionó para mí:

tu información -

Actions id | time | valueX 1 | 12:05 | 500 1 | 12:30 | 500 2 | 12:30 | 125 Historic id | set_at| valueY 1 | 11:00 | 400 1 | 12:15 | 450 2 | 12:20 | 50 2 | 12:25 | 75

  1. Actions sindicales e Historic

Combined id | time | valueX | record-type 1 | 12:05 | 500 | Action 1 | 12:30 | 500 | Action 2 | 12:30 | 125 | Action 1 | 11:00 | 400 | Historic 1 | 12:15 | 450 | Historic 2 | 12:20 | 50 | Historic 2 | 12:25 | 75 | Historic

  1. Escribe un particionador personalizado y usa repartitionAndSortWithinPartitions para dividir por id , pero ordena por time .

    Partition-1 1 | 11:00 | 400 | Historic 1 | 12:05 | 500 | Action 1 | 12:15 | 450 | Historic 1 | 12:30 | 500 | Action Partition-2 2 | 12:20 | 50 | Historic 2 | 12:25 | 75 | Historic 2 | 12:30 | 125 | Action

  2. Atraviesa los registros por partición.

Si se trata de un registro Historical , agréguelo a un mapa o actualice el mapa si ya tiene ese ID: valueY un registro del último valueY por cada id usando un mapa por partición.

Si es un registro de Action , obtenga el valor valueY del mapa y valueX del valueX

Un mapa M

Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75

Podría intentar dividir y ordenar por time , pero necesita fusionar las particiones más tarde. Y eso podría agregar complejidad.

Esto, me pareció preferible a la combinación muchos a muchos que usualmente recibimos cuando usamos los rangos de tiempo para unirnos.