java - spark - Construye una cadena con estado para diferentes eventos y asigna ID global en la chispa
spark streaming tutorial (1)
Estamos trabajando con spark 1.6
y estamos tratando de mantener la identidad global para eventos similares. Puede haber pocos "grupos" de eventos con identificación idéntica (en el ejemplo como número. Las letras se agregan solo por singularidad). Y sabemos que algunos de estos eventos son similares, así que podemos conectarlos. Queremos mantener algo como:
Z -> 1, 2, 3
X -> 4
Entonces, en el futuro, si llegan algunos eventos con ID 4, podemos asignar X
como una identidad global.
Por favor, consulte el ejemplo para una mejor ilustración:
Digamos que tenemos algunos datos de transmisión entrando al trabajo de chispa.
1a
1b
2c
2d
2e
3f
3g
3h
4i
Como el evento 1 es nuestra primera aparición, queremos asignar 1 to Z
Luego, lo que sabemos es que 1b y 2c son similares. así que queremos mantener 2->1
mapa en algún lugar 2->1
. Lo mismo ocurre con 2e y 3f, por lo que necesitamos mapear 3-2
. Así que por ahora tenemos 3 pares 1->Z
, 2->1
, 3->2
.
Y queremos crear una ruta "histórica": Z <- 1 <- 2 <- 3
Al final tendremos todos los eventos con ID = Z
1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> X
Intentamos usar mapwithstate
pero lo único que pudimos hacer fue eso 2->1
y 3->2
. Con mapwithstate
no pudimos obtener el estado de "padre" en el estado del evento actual, por ej. el evento actual 3 con el padre 2 y no puede obtener 2 -> 1
y tampoco 1 -> Z
¿Es posible tener algún mapeo global para esto? Ya probamos acumuladores y transmisiones, pero parece no ser muy adecuado. Y no pudimos reemplazar los eventos 1 para la primera asignación y los eventos 2 para la segunda asignación con Z
Si vendrá un nuevo evento 5
y es similar con 3h, por ejemplo, tenemos que asignar el mapeo 5->Z
nuevamente.
Lo que sigue es una solución para el problema dado, usando una referencia mutable a un RDD de ''estado'' que actualizamos con nuevos resultados cada vez.
Utilizamos transform
para etiquetar la secuencia de eventos entrantes con la identificación global única haciendo una combinación de similitud. Esta es una unión "a mano" donde usamos un producto de los dos conjuntos de datos y comparamos cada entrada en pares.
Tenga en cuenta que este es un proceso costoso. Hay muchas partes que se pueden cambiar, dependiendo de las características específicas de la secuencia esperada. Por ejemplo, podríamos reemplazar el RDD de estado global por un map
local y aplicar combinaciones de map-side
para una unión de similitud más rápida, pero eso depende mucho de la cardinalidad esperada del conjunto de identificadores únicos.
Esto fue más complicado de lo que originalmente esperaba. Tómalo solo como punto de partida para una solución más robusta. Por ejemplo, la operación de union
en el RDD del estado necesita puntos de control regulares para evitar que el DAG crezca más allá del control. (Hay mucho margen de mejora, pero eso está más allá de un esfuerzo razonable para proporcionar una respuesta).
Aquí esbozo el núcleo de la solución, para el cuaderno completo de pruebas, vea UniqueGlobalStateChains.snb
// this mutable reference points to the `states` that we keep across interations
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD
// we assume an incoming Event stream. Here we prepare it for the global id-process
@transient val eventsById = eventStream.map(event => (event.id, event))
@transient val groupedEvents = eventsById.groupByKey()
// this is the core of the solution.
// We transform the incoming events into tagged events.
// As a by-product, the mutable `states` reference will get updated with the latest state mapping.
// the "chain" of events can be reconstructed ordering the states by timestamp
@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) =>
val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}
val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}
val newEventIds = events.keys // let''s extract the ids of the incoming (grouped) events
val similarityJoinMap = newEventIds.cartesian(currentMappings)
.collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}
.collectAsMap
//val similarityBC = sparkContext.broadcast(similarityJoinMap)
val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))
newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids
val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) =>
events.map(event => (event.id,event.payload, globalKey))
}
val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}
currentState = newStates
states.unpersist(false)
states = newStates.union(states)
states.cache()
newTaggedEvents
}
Dada esta secuencia de entrada:
"1|a,1|b,3|c", "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h"
Obtenemos:
Eventos etiquetados con una ID global:
---
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819
---
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180
---
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819
---
5|k: gen-5819
---
1|g: gen-2635,4|f: gen-4180
---
6|h: gen-5819
Y podemos reconstruir la cadena de eventos que se derivan de una identificación global:
gen-4180: 1<-2<-3<-4
gen-2635: 1
gen-5819: 3<-4<-5<-6
-o-