servers kafkatemplate kafka example bootstrap baeldung java apache-kafka

java - kafkatemplate - spring kafka bootstrap servers



Retraso en los consumidores que consumen mensajes en Apache Kafka (2)

Estoy usando Kafka 0.8.0 y tratando de lograr el escenario mencionado a continuación.

API de JCA (actúa como productor y envía datos a) -----> Consumer ------> HBase

Estoy enviando cada mensaje al consumidor tan pronto como obtengo los datos utilizando el Cliente JCA. Por ejemplo, tan pronto como el productor envíe el mensaje n. ° 1, quiero obtener lo mismo del consumidor y ''poner'' en HBase. Pero mi consumidor comienza a buscar los mensajes después de algunos n mensajes aleatorios. Quiero sincronizar al productor y al consumidor para que ambos comiencen a trabajar juntos.

He utilizado:

1 corredor

1 tema único

1 productor único y consumidor de alto nivel.

¿Alguien puede sugerir qué debo hacer para lograr lo mismo?

EDITADO:

Añadiendo un fragmento de código relevante.

Consumer.java

public class Consumer extends Thread { private final ConsumerConnector consumer; private final String topic; PrintWriter pw = null; int t = 0; StringDecoder kd = new StringDecoder(null); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); Map<String, List<KafkaStream<String, Signal>>> consumerMap; KafkaStream<String, Signal> stream; ConsumerIterator<String, Signal> it; public Consumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; topicCountMap.put(topic, new Integer(1)); consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer( new VerifiableProperties())); stream = consumerMap.get(topic).get(0); it = stream.iterator(); } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.size", "1024"); return new ConsumerConfig(props); } synchronized public void run() { while (it.hasNext()) { t = (it.next().message()).getChannelid(); System.out.println("In Consumer received msg" + t); } } }

productor.java

public class Producer { public final kafka.javaapi.producer.Producer<String, Signal> producer; private final String topic; private final Properties props = new Properties(); public Producer(String topic) { props.put("serializer.class", "org.bigdata.kafka.Serializer"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "localhost:9092"); // Use random partitioner. Don''t need the key type. Just set it to Integer. // The message is of type userdefined Object . producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props)); this.topic = topic; } }

KafkaProperties.java

public interface KafkaProperties { final static String zkConnect = "127.0.0.1:2181"; final static String groupId = "group1"; final static String topic = "test00"; final static String kafkaServerURL = "localhost"; final static int kafkaServerPort = 9092; final static int kafkaProducerBufferSize = 64 * 1024; final static int connectionTimeOut = 100000; final static int reconnectInterval = 10000; final static String clientId = "SimpleConsumerDemoClient"; }

Así es como el consumidor se está comportando para los primeros 10 mensajes que no envía el mensaje recibido por el consumidor, pero a partir del mensaje 11 comienza a funcionar correctamente.

producer sending msg1 producer sending msg2 producer sending msg3 producer sending msg4 producer sending msg5 producer sending msg6 producer sending msg7 producer sending msg8 producer sending msg9 producer sending msg10 producer sending msg11 producer sending msg12 In Consumer received msg12 producer sending msg13 In Consumer received msg13 producer sending msg14 In Consumer received msg14 producer sending msg15 In Consumer received msg15 producer sending msg16 In Consumer received msg16 producer sending msg17 In Consumer received msg17 producer sending msg18 In Consumer received msg18 producer sending msg19 In Consumer received msg19 producer sending msg20 In Consumer received msg20 producer sending msg21 In Consumer received msg21

EDITADO: agregando la función de escucha donde el productor está enviando mensajes al consumidor. Y estoy usando la configuración del productor por defecto no la sobrescribí

public synchronized void onValueChanged(final MonitorEvent event_) { // Get the value from the DBR try { final DBR dbr = event_.getDBR(); final String[] val = (String[]) dbr.getValue(); producer1.producer.send(new KeyedMessage<String, Signal> (KafkaProperties.topic,new Signal(messageNo))); System.out.println("producer sending msg"+messageNo); messageNo++; } catch (Exception ex) { ex.printStackTrace(); } }


  1. Intente agregar props.put("request.required.acks", "1") a la configuración del productor. Por defecto, el productor no espera recibos y la entrega de mensajes no está garantizada. Por lo tanto, si inicia Broker justo antes de su prueba, el productor puede comenzar a enviar mensajes antes de que el broker se inicie por completo y se pierdan los primeros mensajes.

  2. Intente agregar props.put("auto.offset.reset", "smallest") a la configuración del consumidor. Es igual a la opción --from --from-beginning begin de kafka-console-consumer.sh. Si su consumidor comienza más tarde que el productor y no hay datos de compensación guardados en Zookeeper, entonces, de forma predeterminada, comenzará a consumir solo mensajes nuevos (consulte la sección Config. Del consumidor en documentos).


Esto puede deberse a más no de particiones que no de consumidores. Verifique si el tema se crea solo con una sola partición, entonces no perderá ningún mensaje en el consumidor.