Apache Kafka: integración con Spark
En este capítulo, discutiremos cómo integrar Apache Kafka con Spark Streaming API.
Sobre Spark
Spark Streaming API permite el procesamiento de transmisiones escalable, de alto rendimiento y tolerante a fallas de transmisiones de datos en vivo. Los datos se pueden ingerir de muchas fuentes como Kafka, Flume, Twitter, etc., y se pueden procesar mediante algoritmos complejos como funciones de alto nivel como mapear, reducir, unir y ventana. Finalmente, los datos procesados se pueden enviar a sistemas de archivos, bases de datos y tableros de control en vivo. Los conjuntos de datos distribuidos resistentes (RDD) son una estructura de datos fundamental de Spark. Es una colección distribuida inmutable de objetos. Cada conjunto de datos en RDD se divide en particiones lógicas, que se pueden calcular en diferentes nodos del clúster.
Integración con Spark
Kafka es una potencial plataforma de mensajería e integración para Spark Streaming. Kafka actúa como el eje central para los flujos de datos en tiempo real y se procesan utilizando algoritmos complejos en Spark Streaming. Una vez que se procesan los datos, Spark Streaming podría publicar los resultados en otro tema de Kafka o almacenarlos en HDFS, bases de datos o paneles. El siguiente diagrama muestra el flujo conceptual.
Ahora, veamos en detalle las API de Kafka-Spark.
API SparkConf
Representa la configuración de una aplicación Spark. Se utiliza para establecer varios parámetros de Spark como pares clave-valor.
La
clase SparkConf
tiene los siguientes métodos:
set(string key, string value) - establecer la variable de configuración.
remove(string key) - eliminar clave de la configuración.
setAppName(string name) - establezca el nombre de la aplicación para su aplicación.
get(string key) - obtener clave
API StreamingContext
Este es el principal punto de entrada para la funcionalidad Spark. Un SparkContext representa la conexión a un clúster de Spark y se puede usar para crear RDD, acumuladores y variables de difusión en el clúster. La firma se define como se muestra a continuación.
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - URL del clúster para conectarse (por ejemplo, mesos: // host: puerto, spark: // host: puerto, local [4]).
appName - un nombre para su trabajo, para mostrar en la interfaz de usuario web del clúster
batchDuration - el intervalo de tiempo en el que los datos de transmisión se dividirán en lotes
public StreamingContext(SparkConf conf, Duration batchDuration)
Cree un StreamingContext proporcionando la configuración necesaria para un nuevo SparkContext.
conf - Parámetros de chispa
batchDuration - el intervalo de tiempo en el que los datos de transmisión se dividirán en lotes
API de KafkaUtils
La API de KafkaUtils se utiliza para conectar el clúster de Kafka a la transmisión de Spark. Esta API tiene el método significativo createStream
firma definido como se muestra a continuación.
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
El método que se muestra arriba se utiliza para crear un flujo de entrada que extrae mensajes de Kafka Brokers.
ssc - Objeto StreamingContext.
zkQuorum - Quórum de cuidador del zoológico.
groupId : El ID de grupo de este consumidor.
topics - devolver un mapa de temas para consumir.
storageLevel - Nivel de almacenamiento a utilizar para almacenar los objetos recibidos.
La API de KafkaUtils tiene otro método createDirectStream, que se utiliza para crear un flujo de entrada que extrae mensajes directamente de Kafka Brokers sin utilizar ningún receptor. Esta secuencia puede garantizar que cada mensaje de Kafka se incluya en las transformaciones exactamente una vez.
La aplicación de muestra se realiza en Scala. Para compilar la aplicación, descargue e instale sbt
, scala build tool (similar a maven). El código de la aplicación principal se presenta a continuación.
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Crear script
La integración de spark-kafka depende del tarro de integración de Spark, Spark Streaming y Spark Kafka. Cree un nuevo archivo build.sbt
y especifique los detalles de la aplicación y su dependencia. El sbt
descargará el archivo
jar necesario mientras compila y empaqueta la aplicación.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Compilación / Empaquetado
Ejecute el siguiente comando para compilar y empaquetar el archivo jar de la aplicación. Necesitamos enviar el archivo jar a la consola Spark para ejecutar la aplicación.
sbt package
Sometiéndose a Spark
Inicie la CLI de Kafka Producer (explicado en el capítulo anterior), cree un nuevo tema llamado my-first-topic
y proporcione algunos mensajes de muestra como se muestra a continuación.
Another spark test message
Ejecute el siguiente comando para enviar la aplicación a la consola Spark.
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
El resultado de muestra de esta aplicación se muestra a continuación.
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..