scala - sources - spark streaming kafka
Cómo escribirle a Kafka desde Spark Streaming (6)
Estoy usando Spark Streaming para procesar datos entre dos colas de Kafka, pero parece que no puedo encontrar una buena manera de escribir sobre Kafka desde Spark. He intentado esto:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
)
)
y funciona según lo previsto, pero instaurar un nuevo KafkaProducer para cada mensaje es claramente inviable en un contexto real y estoy tratando de evitarlo.
KafkaProducer no es serializable, obviamente.
Me gustaría mantener una referencia a una única instancia para cada proceso y acceder a ella cuando necesito enviar un mensaje. ¿Cómo puedo hacer eso?
¿Por qué no es factible? Fundamentalmente, cada partición de cada RDD se ejecutará de forma independiente (y puede ejecutarse en un nodo de clúster diferente), por lo que debe volver a hacer la conexión (y cualquier sincronización) al comienzo de la tarea de cada partición. Si la sobrecarga de eso es demasiado alta, entonces debe aumentar el tamaño del lote en su StreamingContext
hasta que sea aceptable (obv. Hay un costo de latencia para hacer esto).
(Si no está manejando miles de mensajes en cada partición, ¿está seguro de que necesita una transmisión de chispa? ¿Lo haría mejor con una aplicación independiente?)
Estaba teniendo el mismo problema y encontré http://allegro.tech/2015/08/spark-kafka-integration.html .
El autor resuelve el problema creando 1 productor por ejecutor. En lugar de enviar al productor, solo envía una "receta" sobre cómo crear un productor en un ejecutor al transmitirlo.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
Él usa una envoltura que crea perezosamente al productor:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}
object KafkaSink {
def apply(config: Map[String, Object]): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
}
El contenedor es serializable porque el productor de Kafka se inicializa justo antes del primer uso en un ejecutor. El controlador guarda la referencia al contenedor y el contenedor envía los mensajes usando el productor de cada ejecutor:
dstream.foreachRDD { rdd =>
rdd.foreach { message =>
kafkaSink.value.send("topicName", message)
}
}
Esto podría ser lo que quieres hacer. Básicamente, crea un productor para cada partición de registros.
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
{
case x:String=>{
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
})
)
Espero que ayude
Hay un escritor de Streaming Kafka mantenido por Cloudera (en realidad se separó de un Spark JIRA [1] ). Básicamente, crea un productor por partición, lo que amortiza el tiempo dedicado a crear objetos "pesados" en una colección de elementos (con suerte grande).
El escritor se puede encontrar aquí: https://github.com/cloudera/spark-kafka-writer
Mi primer consejo sería intentar crear una nueva instancia en foreachPartition y medir si eso es lo suficientemente rápido para sus necesidades (crear instancias de objetos pesados en foreachPartition es lo que sugiere la documentación oficial).
Otra opción es usar un grupo de objetos como se ilustra en este ejemplo:
Sin embargo, me resultó difícil implementarlo al usar el punto de control.
Otra versión que funciona bien para mí es una fábrica como se describe en la siguiente publicación de blog, solo tiene que comprobar si proporciona suficiente paralelismo para sus necesidades (consulte la sección de comentarios):
Sí, desafortunadamente Spark (1.x, 2.x) no hace que sea sencillo escribir a Kafka de manera eficiente.
Sugeriría el siguiente enfoque:
- Utilice (y reutilice) una instancia de
KafkaProducer
por proceso de ejecutor / JVM.
Aquí está la configuración de alto nivel para este enfoque:
- Primero, debes "envolver" al
KafkaProducer
de Kafka porque, como dijiste, no es serializable. Envolverlo te permite "enviarlo" a los ejecutores. La idea clave aquí es usar un valorlazy val
para retrasar la creación de instancias del productor hasta su primer uso, lo cual es una solución efectiva para que no tenga que preocuparse de queKafkaProducer
no seKafkaProducer
serializar. - Usted "envía" el productor envuelto a cada ejecutor utilizando una variable de difusión.
- Dentro de su lógica de procesamiento real, accede al productor empaquetado a través de la variable de difusión y la usa para escribir los resultados de procesamiento en Kafka.
Los fragmentos de código a continuación funcionan con Spark Streaming a partir de Spark 2.0.
Paso 1: Envolver KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new MySparkKafkaProducer(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
Paso 2: use una variable de difusión para darle a cada ejecutor su propia instancia KafkaProducer
envuelta
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}
Paso 3: Escribe desde Spark Streaming a Kafka, reutilizando la misma instancia envuelta de KafkaProducer
(para cada ejecutor)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}
Espero que esto ayude.