java - udemy - Spark Streaming: ¿Por qué los costos de procesamiento interno son tan altos para manejar el estado de usuario de unos pocos MB?
udemy scala spark (1)
La gestión estatal se ha mejorado en la chispa 1.6.
Consulte SPARK-2629 Administración de estado mejorada para Spark Streaming;
Y en la especificación de diseño detallado:
Mejora de la gestión del estado en Spark Streaming
Un inconveniente de rendimiento se menciona a continuación:
Necesidad de una administración de estado más optimizada que no escanee todas las claves Current updateStateByKey escanea cada clave en cada intervalo de lote, incluso si no hay datos para esa clave. Si bien esta semántica es útil en algunas cargas de trabajo, la mayoría de las cargas de trabajo solo requieren `` escanear y actualizar el estado para el que hay datos nuevos. Y solo un pequeño porcentaje de todo el estado necesita ser tocado para eso en cada intervalo de lote. The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed.
En base a nuestros experimentos, vemos que los costos de procesamiento interno de Spark Streaming requieren una gran cantidad de tiempo cuando el estado se convierte en más de un millón de objetos. Como resultado, la latencia sufre, porque tenemos que aumentar el intervalo de lote para evitar un comportamiento inestable (tiempo de procesamiento> intervalo de lote).
No tiene nada que ver con los detalles de nuestra aplicación, ya que se puede reproducir mediante el código a continuación.
¿Cuáles son exactamente esos costos de infraestructura / procesamiento interno de Spark que tardan tanto tiempo en manejar el estado del usuario? ¿Hay alguna opción para disminuir el tiempo de procesamiento además de simplemente aumentar el intervalo de lotes?
Planeamos usar state extensively: al menos 100MB más o menos en cada uno de los pocos nodos para mantener todos los datos en la memoria y solo volcarlos una vez cada hora.
Aumentar el intervalo de lote ayuda, pero queremos mantener el intervalo de lote mínimo.
La razón probablemente no sea el espacio ocupado por el estado, sino el gran gráfico de objetos, porque cuando cambiamos la lista por una gran matriz de primitivas, el problema desapareció.
Solo una suposición: podría tener algo que ver con org.apache.spark.util.SizeEstimator
usado internamente por Spark, porque se muestra al perfilar de vez en cuando.
Aquí hay una demostración simple para reproducir la imagen de arriba en iCore7 moderno:
- menos de 15 MB de estado
- sin entrada de flujo en absoluto
- la función ''updateStateByKey'' más rápida posible (ficticia)
- intervalo de lote de 1 segundo
- punto de control (requerido por Spark, debe tener) al disco local
- probado localmente y en YARN
Código:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
// Tried KryoSerializer, but it does not seem to help much
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
.setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)
List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);
JavaPairDStream<String, State> stream = javaStreamingContext
.textFileStream(".") // fake: effectively, no input at all
.mapToPair(input -> (Tuple2<String, State>) null) // fake to get JavaPairDStream
.updateStateByKey(
(inputs, maybeState) -> maybeState, // simplest possible dummy function
new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
initialRdd); // set generated state
stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
System.out.println("Is empty: " + rdd.isEmpty());
return null;
});
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
private static List<Tuple2<String, State>> prepareInitialRddData() {
// ''stateCount'' tuples with value = list of size ''dataListSize'' of strings of length ''elementDataSize''
int stateCount = 1000;
int dataListSize = 200;
int elementDataSize = 10;
List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
List<String> stateData = new ArrayList<>(dataListSize);
for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
}
initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
}
return initialRddInput;
}
}