Apache Kafka: ejemplo de productor simple

Creemos una aplicación para publicar y consumir mensajes usando un cliente Java. El cliente productor de Kafka consta de las siguientes API.

API de KafkaProducer

Entendamos el conjunto más importante de API de productor de Kafka en esta sección. La parte central de la API de KafkaProducer es la clase KafkaProducer . La clase KafkaProducer proporciona una opción para conectar un intermediario de Kafka en su constructor con los siguientes métodos.

  • La clase KafkaProducer proporciona un método de envío para enviar mensajes de forma asincrónica a un tema. La firma de send () es la siguiente

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - El productor gestiona un búfer de registros en espera de ser enviados.

  • Callback - Una devolución de llamada proporcionada por el usuario para ejecutar cuando el servidor ha reconocido el registro (nulo indica que no hay devolución de llamada).

  • La clase KafkaProducer proporciona un método de descarga para garantizar que todos los mensajes enviados anteriormente se hayan completado realmente. La sintaxis del método flush es la siguiente:

public void flush()
  • La clase KafkaProducer proporciona el método partitionFor, que ayuda a obtener los metadatos de la partición para un tema determinado. Esto se puede utilizar para particiones personalizadas. La firma de este método es la siguiente:

public Map metrics()

Devuelve el mapa de métricas internas mantenido por el productor.

  • public void close (): la clase KafkaProducer proporciona bloques de métodos de cierre hasta que se completan todas las solicitudes enviadas anteriormente.

API de productor

La parte central de Producer API es la clase Producer . La clase Producer proporciona una opción para conectar Kafka Broker en su constructor mediante los siguientes métodos.

La clase de productor

La clase de productor proporciona el método de envío a send mensajes a uno o varios temas con las siguientes firmas.

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

Hay dos tipos de productores: Sync y Async.

La misma configuración de API se aplica también al productor de sincronización . La diferencia entre ellos es que un productor de sincronización envía mensajes directamente, pero envía mensajes en segundo plano. Se prefiere el productor asincrónico cuando desea un mayor rendimiento. En las versiones anteriores como 0.8, un productor asíncrono no tiene una devolución de llamada para que send () registre los controladores de errores. Esto solo está disponible en la versión actual de 0.9.

cierre del vacío público ()

La clase de productor proporciona close método para cerrar las conexiones del grupo de productores a todos los intermediarios de Kafka.

Ajustes de configuración

Los parámetros de configuración principales de la API del productor se enumeran en la siguiente tabla para una mejor comprensión:

S. No Parámetros de configuración y descripción
1

client.id

identifica la aplicación del productor

2

producer.type

ya sea sincronizado o asincrónico

3

acks

La configuración de acks controla los criterios según las solicitudes del productor se consideran completas.

4

retries

Si la solicitud del productor falla, vuelva a intentarlo automáticamente con un valor específico.

5

bootstrap.servers

lista inicial de corredores.

6

linger.ms

si desea reducir el número de solicitudes, puede establecer linger.ms en algo mayor que algún valor.

7

key.serializer

Clave para la interfaz del serializador.

8

value.serializer

valor para la interfaz del serializador.

9

batch.size

Tamaño del búfer.

10

buffer.memory

controla la cantidad total de memoria disponible para el productor para el almacenamiento en búfer.

API ProducerRecord

ProducerRecord es un par clave / valor que se envía al constructor de la clase Cluster.ProducerRecord de Kafka para crear un registro con pares de partición, clave y valor utilizando la siguiente firma.

public ProducerRecord (string topic, int partition, k key, v value)
  • Topic - nombre del tema definido por el usuario que se agregará al registro.

  • Partition - recuento de particiones

  • Key - La clave que se incluirá en el registro.

  • Value - Grabar contenido
public ProducerRecord (string topic, k key, v value)

El constructor de la clase ProducerRecord se utiliza para crear un registro con pares clave, valor y sin partición.

  • Topic - Crear un tema para asignar registro.

  • Key - clave para el registro.

  • Value - registrar contenidos.

public ProducerRecord (string topic, v value)

La clase ProducerRecord crea un registro sin partición ni clave.

  • Topic - crear un tema.

  • Value - registrar contenidos.

Los métodos de la clase ProducerRecord se enumeran en la siguiente tabla:

S. No Métodos de clase y descripción
1

public string topic()

El tema se agregará al registro.

2

public K key()

Clave que se incluirá en el registro. Si no hay tal clave, aquí se volverá a convertir nulo.

3

public V value()

Registre el contenido.

4

partition()

Recuento de particiones para el registro

Aplicación SimpleProducer

Antes de crear la aplicación, primero inicie ZooKeeper y Kafka Broker y luego cree su propio tema en Kafka Broker usando el comando Create topic. Después de eso, cree una clase Java llamada Sim-pleProducer.java y escriba la siguiente codificación.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation - La aplicación se puede compilar usando el siguiente comando.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution - La aplicación se puede ejecutar usando el siguiente comando.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

Ejemplo de consumidor simple

A partir de ahora hemos creado un productor para enviar mensajes al clúster de Kafka. Ahora creemos un consumidor para consumir mensajes del clúster de Kafka. La API de KafkaConsumer se utiliza para consumir mensajes del clúster de Kafka. El constructor de la clase KafkaConsumer se define a continuación.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - Devuelve un mapa de las configuraciones del consumidor.

La clase KafkaConsumer tiene los siguientes métodos significativos que se enumeran en la siguiente tabla.

S. No Método y descripción
1

public java.util.Set<TopicPar-tition> assignment()

Obtiene el conjunto de particiones actualmente asignadas por el consumidor.

2

public string subscription()

Suscríbase a la lista de temas dada para obtener particiones asignadas dinámicamente.

3

public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)

Suscríbase a la lista de temas dada para obtener particiones asignadas dinámicamente.

4

public void unsubscribe()

Anule la suscripción de los temas de la lista de particiones dada.

5

public void sub-scribe(java.util.List<java.lang.String> topics)

Suscríbase a la lista de temas dada para obtener particiones asignadas dinámicamente. Si la lista de temas dada está vacía, se trata igual que unsubscribe ().

6

public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)

El patrón de argumento se refiere al patrón de suscripción en el formato de expresión regular y el argumento del oyente recibe notificaciones del patrón de suscripción.

7

public void as-sign(java.util.List<TopicParti-tion> partitions)

Asigne manualmente una lista de particiones al cliente.

8

poll()

Obtenga datos para los temas o particiones especificados mediante una de las API de suscripción / asignación. Esto devolverá un error, si los temas no están suscritos antes del sondeo de datos.

9

public void commitSync()

Confirme las compensaciones devueltas en la última encuesta () para toda la lista suscrita de temas y particiones. La misma operación se aplica a commitAsyn ().

10

public void seek(TopicPartition partition, long offset)

Obtenga el valor de compensación actual que el consumidor usará en el próximo método poll ().

11

public void resume()

Reanudar las particiones en pausa.

12

public void wakeup()

Despierta al consumidor.

API ConsumerRecord

La API ConsumerRecord se utiliza para recibir registros del clúster de Kafka. Esta API consta de un nombre de tema, un número de partición, desde el que se recibe el registro y un desplazamiento que apunta al registro en una partición de Kafka. La clase ConsumerRecord se utiliza para crear un registro de consumidor con un nombre de tema específico, recuento de particiones y pares <clave, valor>. Tiene la siguiente firma.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • Topic - El nombre del tema para el registro de consumidor recibido del clúster de Kafka.

  • Partition - Partición para el tema.

  • Key - La clave del registro, si no existe una clave se devolverá nula.

  • Value - Grabar contenidos.

API ConsumerRecords

La API ConsumerRecords actúa como un contenedor para ConsumerRecord. Esta API se utiliza para mantener la lista de ConsumerRecord por partición para un tema en particular. Su constructor se define a continuación.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - Devuelve un mapa de partición para un tema en particular.

  • Records - Lista de devolución de ConsumerRecord.

La clase ConsumerRecords tiene los siguientes métodos definidos.

S. No Métodos y descripción
1

public int count()

El número de registros para todos los temas.

2

public Set partitions()

El conjunto de particiones con datos en este conjunto de registros (si no se devolvieron datos, el conjunto está vacío).

3

public Iterator iterator()

Iterator le permite recorrer una colección, obteniendo o eliminando elementos.

4

public List records()

Obtenga la lista de registros para la partición dada.

Ajustes de configuración

Las opciones de configuración para la configuración principal de la API del cliente consumidor se enumeran a continuación:

S. No Configuración y descripción
1

bootstrap.servers

Bootstrapping lista de corredores.

2

group.id

Asigna un consumidor individual a un grupo.

3

enable.auto.commit

Habilite la confirmación automática para compensaciones si el valor es verdadero; de lo contrario, no se confirma.

4

auto.commit.interval.ms

Devuelve la frecuencia con la que se escriben en ZooKeeper las compensaciones consumidas actualizadas.

5

session.timeout.ms

Indica cuántos milisegundos esperará Kafka a que ZooKeeper responda a una solicitud (lectura o escritura) antes de darse por vencido y continuar consumiendo mensajes.

Aplicación SimpleConsumer

Los pasos de la aplicación del productor siguen siendo los mismos aquí. Primero, inicie su corredor de ZooKeeper y Kafka. Luego cree una aplicación SimpleConsumer con la clase java llamada SimpleCon-sumer.java y escriba el siguiente código.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation - La aplicación se puede compilar usando el siguiente comando.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution − La aplicación se puede ejecutar usando el siguiente comando

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input- Abra la CLI del productor y envíe algunos mensajes al tema. Puede poner la entrada simple como 'Hola consumidor'.

Output - Lo siguiente será la salida.

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer