verificar - eliminar actualizaciones java
¿Cómo puedo actualizar una variable de transmisión en la transmisión por chispa? (4)
Casi todos los que se ocupan de aplicaciones de transmisión necesitan una forma de entrelazar (filtrar, buscar, etc.) datos de referencia (de DB, archivos, etc.) en los datos de transmisión. Tenemos una solución parcial de las dos partes enteras.
-
Buscar datos de referencia para usar en operaciones de transmisión
- crear un objeto CacheLookup con el caché TTL deseado
- envolver eso en Broadcast
- usar CacheLookup como parte de la lógica de transmisión
En su mayor parte, esto funciona bien, excepto por lo siguiente
-
Actualizar los datos de referencia
No hay una forma definitiva de lograr esto a pesar de las sugerencias en estos hilos, es decir: elimine la variable de transmisión anterior y cree una nueva. Múltiples incógnitas como lo que se espera entre estas operaciones.
Esta es una necesidad tan común, habría ayudado si hubiera una manera de enviar información para difundir la actualización de información variable. Con eso, es posible invalidar los cachés locales en "CacheLookup"
La segunda parte del problema aún no está resuelta. Me interesaría si hay algún enfoque viable para esto
Tengo, creo, un caso de uso relativamente común para la transmisión por chispa:
Tengo una secuencia de objetos que me gustaría filtrar en función de algunos datos de referencia
Inicialmente, pensé que esto sería algo muy simple de lograr usando una variable de difusión :
public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());
final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
Sin embargo, aunque con poca frecuencia, mis datos de referencia cambiarán periódicamente
Tenía la impresión de que podía modificar y
retransmitir
mi variable en el controlador y se propagaría a cada uno de los trabajadores, sin embargo, el objeto
Broadcast
no es
Serializable
y debe ser
final
.
¿Qué alternativas tengo? Las tres soluciones que se me ocurren son:
-
Mueva la búsqueda de datos de referencia a
forEachPartition
oforEachRdd
para que resida completamente en los trabajadores. Sin embargo, los datos de referencia viven en una API REST, por lo que también necesitaría almacenar de alguna manera un temporizador / contador para detener el acceso remoto para cada elemento en la secuencia. -
Reinicie el contexto de chispa cada vez que cambie la refdata, con una nueva variable de difusión.
-
Convierta los datos de referencia en un RDD , luego
join
las transmisiones de tal manera que ahora esté transmitiendoPair<MyObject, RefData>
, aunque esto enviará los datos de referencia con cada objeto.
Extendiendo la respuesta Por @Rohan Aletty. Aquí hay un código de muestra de un BroadcastWrapper que actualiza la variable de difusión basada en algunos ttl
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
//Your logic to refresh
ReferenceData data = getRefData();
var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}
Su código se vería así:
public void startSparkEngine() {
final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());
stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
});
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
Esto funcionó para mí en multi-cluster también. Espero que esto ayude
No estoy seguro de si ya lo ha intentado, pero creo que se puede lograr una actualización de una variable de difusión sin apagar
SparkContext
.
Mediante el uso del método
unpersist()
, las copias de la variable de difusión se eliminan en cada ejecutor y tendrían que ser la variable tendría que volver a emitirse para poder acceder de nuevo.
Para su caso de uso, cuando desee actualizar su transmisión, puede:
-
Espere a que sus ejecutores terminen con una serie actual de datos
-
Retirar la variable de difusión
-
Actualizar la variable de difusión
-
Rebroadcast para enviar los nuevos datos de referencia a los ejecutores
Me estoy aprovechando bastante de
esta publicación,
pero la persona que hizo la última respuesta afirmó haberlo hecho funcionar localmente.
Es importante tener en cuenta que probablemente desee establecer el bloqueo en
true
en la unpersist para asegurarse de que los ejecutores se deshagan de los datos antiguos (para que los valores obsoletos no se vuelvan a leer en la próxima iteración).
Problema recientemente enfrentado con esto. Pensé que podría ser útil para los usuarios de scala.
La forma Scala de hacer
BroadCastWrapper
es como el siguiente ejemplo.
import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
/* wrapper lets us update brodcast variables within DStreams'' foreachRDD
without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}
Cada vez que necesite llamar a la función de actualización para obtener una nueva variable de transmisión.