Apache Kafka: ejemplo de grupo de consumidores
El grupo de consumidores es un consumo de múltiples subprocesos o máquinas de los temas de Kafka.
Grupo de consumidores
Los consumidores pueden unirse a un grupo utilizando el mismo
group.id.
El paralelismo máximo de un grupo es que el número de consumidores en el grupo ← no de particiones.
Kafka asigna las particiones de un tema al consumidor en un grupo, de modo que cada partición sea consumida por exactamente un consumidor en el grupo.
Kafka garantiza que un mensaje solo lo lee un único consumidor del grupo.
Los consumidores pueden ver el mensaje en el orden en que se almacenaron en el registro.
Reequilibrio de un consumidor
Agregar más procesos / subprocesos hará que Kafka se reequilibre. Si algún consumidor o corredor no envía latidos a ZooKeeper, se puede volver a configurar a través del clúster de Kafka. Durante este reequilibrio, Kafka asignará particiones disponibles a los subprocesos disponibles, posiblemente moviendo una partición a otro proceso.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilacion
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
Ejecución
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
Aquí hemos creado un nombre de grupo de muestra como my-group
con dos consumidores. Del mismo modo, puede crear su grupo y número de consumidores en el grupo.
Entrada
Abra la CLI del productor y envíe algunos mensajes como:
Test consumer group 01
Test consumer group 02
Salida del primer proceso
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
Salida del segundo proceso
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
Ahora, con suerte, habría entendido SimpleConsumer y ConsumeGroup utilizando la demostración del cliente Java. Ahora tiene una idea sobre cómo enviar y recibir mensajes utilizando un cliente Java. Continuemos la integración de Kafka con tecnologías de big data en el próximo capítulo.