apache spark - robbins - chispa de recuperación punto de control de recuperación es muy, muy lento
robbins administracion 12 edicion pdf (3)
- Objetivo: leer desde Kinesis y almacenar datos en S3 en formato Parquet a través de streaming de chispa.
- Situación: la aplicación se ejecuta correctamente inicialmente, ejecutando lotes de 1 hora y el tiempo de procesamiento es de menos de 30 minutos en promedio. Por alguna razón, digamos que la aplicación falla y tratamos de reiniciar desde el punto de control. El procesamiento ahora toma para siempre y no avanza. Intentamos probar lo mismo en un intervalo de lotes de 1 minuto, el procesamiento funciona bien y toma 1.2 minutos para que se complete el lote. Cuando nos recuperamos del punto de control, toma aproximadamente 15 minutos para cada lote.
- Notas: estamos usando s3 para los puntos de control usando 1 ejecutor, con 19 g de memoria y 3 núcleos por ejecutor
Adjuntando las capturas de pantalla:
Primera ejecución: antes de la recuperación del punto de control
Tratando de recuperar desde el punto de control:
Config.scala
object Config {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)
val checkpointDirectory = sc.hadoopConfiguration.get("checkpointDirectory")
// sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))
val numStreams = 2
def getSparkContext(): SparkContext = {
this.sc
}
def getSqlContext(): HiveContext = {
this.sqlContext
}
}
S3Basin.scala
object S3Basin {
def main(args: Array[String]): Unit = {
Kinesis.startStreaming(s3basinFunction _)
}
def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
streams.foreachRDD(jsonRDDRaw =>{
println(s"Old partitions ${jsonRDDRaw.partitions.length}")
val jsonRDD = jsonRDDRaw.coalesce(10,true)
println(s"New partitions ${jsonRDD.partitions.length}")
if(!jsonRDD.isEmpty()){
val sqlContext = SQLContext.getOrCreate(jsonRDD.context)
sqlContext.read.json(jsonRDD.map(f=>{
val str = new String(f)
if(str.startsWith("{/"message/"")){
str.substring(11,str.indexOf("@version")-2)
}
else{
str
}
})).registerTempTable("events")
sqlContext.sql(
"""
|select
|to_date(from_utc_timestamp(from_unixtime(at), ''US/Pacific'')) as event_date,
|hour(from_utc_timestamp(from_unixtime(at), ''US/Pacific'')) as event_hour,
|*
|from events
""".stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)
sqlContext.dropTempTable("events")
}
})
}
}
Kinesis.scala
object Kinesis{
def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))) // new context
streamingContext.checkpoint(Config.checkpointDirectory) // set checkpoint directory
val sc = Config.getSparkContext
var awsCredentails : BasicAWSCredentials = null
val kinesisClient = if(Config.useIAMInstanceRole){
new AmazonKinesisClient()
}
else{
awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
new AmazonKinesisClient(awsCredentails)
}
val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
val appName = sc.hadoopConfiguration.get("kinesis.appName")
val streamName = sc.hadoopConfiguration.get("kinesis.streamName")
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))
// Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
// on sequence number of records that have been received. Same as batchInterval for this
// example.
val kinesisCheckpointInterval = batchInterval
// Get the region name from the endpoint URL to save Kinesis Client Library metadata in
// DynamoDB of the same region as the Kinesis stream
val regionName = sc.hadoopConfiguration.get("kinesis.regionName")
val kinesisStreams = (0 until Config.numStreams).map { i =>
println(s"creating stream for $i")
if(Config.useIAMInstanceRole){
KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}else{
KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)
}
}
val unionStreams = streamingContext.union(kinesisStreams)
streamFunc(unionStreams)
streamingContext
}
def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {
val sc = Config.getSparkContext
if(sc.defaultParallelism < Config.numStreams+1){
throw new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")
}
val streamingContext = StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))
// sys.ShutdownHookThread {
// println("Gracefully stopping Spark Streaming Application")
// streamingContext.stop(true, true)
// println("Application stopped greacefully")
// }
//
streamingContext.start()
streamingContext.awaitTermination()
}
}
Cuando se reinicia un controlador fallido, ocurre lo siguiente:
- Recuperar cálculo: la información de los puntos de control se usa para reiniciar el controlador, reconstruir los contextos y reiniciar todos los receptores.
- Recuperar metadatos de bloque: se recuperarán los metadatos de todos los bloques que serán necesarios para continuar el procesamiento.
- Volver a generar trabajos incompletos: para los lotes con procesamiento que no se ha completado debido a la falla, los RDD y los trabajos correspondientes se regeneran utilizando los metadatos del bloque recuperado.
- Lea el bloque guardado en los registros: cuando se ejecutan esos trabajos, los datos del bloque se leen directamente desde los registros de escritura anticipada. Esto recupera todos los datos necesarios que se guardaron de forma confiable en los registros.
- Reenviar datos no reconocidos: la fuente enviará nuevamente los datos almacenados en el búfer que no se guardaron en el registro en el momento de la falla. ya que no había sido confirmado por el receptor.
Dado que todos estos pasos se realizan en el controlador, su lote de 0 eventos lleva mucho tiempo. Esto debería suceder solo con el primer lote, entonces las cosas serán normales.
Referencia here .
Tuve problemas similares antes, mi aplicación se vuelve cada vez más lenta.
intente liberar memoria después de usar rdd, llame a rdd.unpersist()
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)
o spark.streaming.backpressure.enabled
a true
http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements
Además, verifique la configuración de su locality
, tal vez demasiados datos se mueven.
plantearon un problema de Jira: https://issues.apache.org/jira/browse/SPARK-19304
El problema es porque leemos más datos por iteración de lo que se requiere y luego descartamos los datos. Esto se puede evitar agregando un límite a la getResults
a getResults
aws.