versión verificar eliminar descargar cómo actualizar actualizaciones java scala apache-spark spark-streaming rdd broadcast

verificar - eliminar actualizaciones java



¿Cómo puedo actualizar una variable de transmisión en la transmisión por chispa? (4)

Casi todos los que se ocupan de aplicaciones de transmisión necesitan una forma de entrelazar (filtrar, buscar, etc.) datos de referencia (de DB, archivos, etc.) en los datos de transmisión. Tenemos una solución parcial de las dos partes enteras.

  1. Buscar datos de referencia para usar en operaciones de transmisión

    • crear un objeto CacheLookup con el caché TTL deseado
    • envolver eso en Broadcast
    • usar CacheLookup como parte de la lógica de transmisión

En su mayor parte, esto funciona bien, excepto por lo siguiente

  1. Actualizar los datos de referencia

    No hay una forma definitiva de lograr esto a pesar de las sugerencias en estos hilos, es decir: elimine la variable de transmisión anterior y cree una nueva. Múltiples incógnitas como lo que se espera entre estas operaciones.

Esta es una necesidad tan común, habría ayudado si hubiera una manera de enviar información para difundir la actualización de información variable. Con eso, es posible invalidar los cachés locales en "CacheLookup"

La segunda parte del problema aún no está resuelta. Me interesaría si hay algún enfoque viable para esto

Tengo, creo, un caso de uso relativamente común para la transmisión por chispa:

Tengo una secuencia de objetos que me gustaría filtrar en función de algunos datos de referencia

Inicialmente, pensé que esto sería algo muy simple de lograr usando una variable de difusión :

public void startSparkEngine { Broadcast<ReferenceData> refdataBroadcast = sparkContext.broadcast(getRefData()); final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> { final ReferenceData refData = refdataBroadcast.getValue(); return obj.getField().equals(refData.getField()); } filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); }

Sin embargo, aunque con poca frecuencia, mis datos de referencia cambiarán periódicamente

Tenía la impresión de que podía modificar y retransmitir mi variable en el controlador y se propagaría a cada uno de los trabajadores, sin embargo, el objeto Broadcast no es Serializable y debe ser final .

¿Qué alternativas tengo? Las tres soluciones que se me ocurren son:

  1. Mueva la búsqueda de datos de referencia a forEachPartition o forEachRdd para que resida completamente en los trabajadores. Sin embargo, los datos de referencia viven en una API REST, por lo que también necesitaría almacenar de alguna manera un temporizador / contador para detener el acceso remoto para cada elemento en la secuencia.

  2. Reinicie el contexto de chispa cada vez que cambie la refdata, con una nueva variable de difusión.

  3. Convierta los datos de referencia en un RDD , luego join las transmisiones de tal manera que ahora esté transmitiendo Pair<MyObject, RefData> , aunque esto enviará los datos de referencia con cada objeto.


Extendiendo la respuesta Por @Rohan Aletty. Aquí hay un código de muestra de un BroadcastWrapper que actualiza la variable de difusión basada en algunos ttl

public class BroadcastWrapper { private Broadcast<ReferenceData> broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) { JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); return jsc; } public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ Date currentDate = Calendar.getInstance().getTime(); long diff = currentDate.getTime()-lastUpdatedAt.getTime(); if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms if (var != null) var.unpersist(); lastUpdatedAt = new Date(System.currentTimeMillis()); //Your logic to refresh ReferenceData data = getRefData(); var = getSparkContext(sparkContext).broadcast(data); } return var; } }

Su código se vería así:

public void startSparkEngine() { final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> { Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context()); stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField())); }); filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); }

Esto funcionó para mí en multi-cluster también. Espero que esto ayude


No estoy seguro de si ya lo ha intentado, pero creo que se puede lograr una actualización de una variable de difusión sin apagar SparkContext . Mediante el uso del método unpersist() , las copias de la variable de difusión se eliminan en cada ejecutor y tendrían que ser la variable tendría que volver a emitirse para poder acceder de nuevo. Para su caso de uso, cuando desee actualizar su transmisión, puede:

  1. Espere a que sus ejecutores terminen con una serie actual de datos

  2. Retirar la variable de difusión

  3. Actualizar la variable de difusión

  4. Rebroadcast para enviar los nuevos datos de referencia a los ejecutores

Me estoy aprovechando bastante de esta publicación, pero la persona que hizo la última respuesta afirmó haberlo hecho funcionar localmente. Es importante tener en cuenta que probablemente desee establecer el bloqueo en true en la unpersist para asegurarse de que los ejecutores se deshagan de los datos antiguos (para que los valores obsoletos no se vuelvan a leer en la próxima iteración).


Problema recientemente enfrentado con esto. Pensé que podría ser útil para los usuarios de scala.

La forma Scala de hacer BroadCastWrapper es como el siguiente ejemplo.

import java.io.{ ObjectInputStream, ObjectOutputStream } import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag /* wrapper lets us update brodcast variables within DStreams'' foreachRDD without running into serialization issues */ case class BroadcastWrapper[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } }

Cada vez que necesite llamar a la función de actualización para obtener una nueva variable de transmisión.