Kafka Consumer for Spark escrito en Scala para Kafka API 0.10: deserializador AVRO personalizado
apache-spark apache-kafka (1)
Estoy actualizando mi API de Kafka de la aplicación Spark Scala a la v. 0.10. Solía crear un método personalizado para la deserialización del mensaje que viene en formato de cadena de bytes.
Me he dado cuenta de que hay una forma de pasar StringDeserializer o ByteArrayDeserializer como parámetro a cualquier clave o valor.
Sin embargo, no puedo encontrar ninguna información sobre cómo crear un deserializador de esquemas Avro personalizado para que kafkaStream pueda usarlo cuando creoDirectStream y consumo datos de Kafka.
¿Es posible?
Es posible. Deserializer<T>
sobrescribir la Deserializer<T>
definida en org.apache.kafka.common.serialization
y debe apuntar key.deserializer
o value.deserializer
a su clase personalizada a través de la clase ConsumerStrategy[K, V]
que contiene el Kafka parámetros. Por ejemplo:
import org.apache.kafka.common.serialization.Deserializer
class AvroDeserializer extends Deserializer[Array[Byte]] {
override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
override def close(): Unit = ???
override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}
Y entonces:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer
val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[AvroDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)