java - kafkatemplate - spring kafka bootstrap servers
Retraso en los consumidores que consumen mensajes en Apache Kafka (2)
Estoy usando Kafka 0.8.0 y tratando de lograr el escenario mencionado a continuación.
API de JCA (actúa como productor y envía datos a) -----> Consumer ------> HBase
Estoy enviando cada mensaje al consumidor tan pronto como obtengo los datos utilizando el Cliente JCA. Por ejemplo, tan pronto como el productor envíe el mensaje n. ° 1, quiero obtener lo mismo del consumidor y ''poner'' en HBase. Pero mi consumidor comienza a buscar los mensajes después de algunos n mensajes aleatorios. Quiero sincronizar al productor y al consumidor para que ambos comiencen a trabajar juntos.
He utilizado:
1 corredor
1 tema único
1 productor único y consumidor de alto nivel.
¿Alguien puede sugerir qué debo hacer para lograr lo mismo?
EDITADO:
Añadiendo un fragmento de código relevante.
Consumer.java
public class Consumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
PrintWriter pw = null;
int t = 0;
StringDecoder kd = new StringDecoder(null);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
Map<String, List<KafkaStream<String, Signal>>> consumerMap;
KafkaStream<String, Signal> stream;
ConsumerIterator<String, Signal> it;
public Consumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
topicCountMap.put(topic, new Integer(1));
consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
new VerifiableProperties()));
stream = consumerMap.get(topic).get(0);
it = stream.iterator();
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.size", "1024");
return new ConsumerConfig(props);
}
synchronized public void run() {
while (it.hasNext()) {
t = (it.next().message()).getChannelid();
System.out.println("In Consumer received msg" + t);
}
}
}
productor.java
public class Producer {
public final kafka.javaapi.producer.Producer<String, Signal> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("serializer.class", "org.bigdata.kafka.Serializer");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don''t need the key type. Just set it to Integer.
// The message is of type userdefined Object .
producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
this.topic = topic;
}
}
KafkaProperties.java
public interface KafkaProperties {
final static String zkConnect = "127.0.0.1:2181";
final static String groupId = "group1";
final static String topic = "test00";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 100000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
Así es como el consumidor se está comportando para los primeros 10 mensajes que no envía el mensaje recibido por el consumidor, pero a partir del mensaje 11 comienza a funcionar correctamente.
producer sending msg1
producer sending msg2
producer sending msg3
producer sending msg4
producer sending msg5
producer sending msg6
producer sending msg7
producer sending msg8
producer sending msg9
producer sending msg10
producer sending msg11
producer sending msg12
In Consumer received msg12
producer sending msg13
In Consumer received msg13
producer sending msg14
In Consumer received msg14
producer sending msg15
In Consumer received msg15
producer sending msg16
In Consumer received msg16
producer sending msg17
In Consumer received msg17
producer sending msg18
In Consumer received msg18
producer sending msg19
In Consumer received msg19
producer sending msg20
In Consumer received msg20
producer sending msg21
In Consumer received msg21
EDITADO: agregando la función de escucha donde el productor está enviando mensajes al consumidor. Y estoy usando la configuración del productor por defecto no la sobrescribí
public synchronized void onValueChanged(final MonitorEvent event_) {
// Get the value from the DBR
try {
final DBR dbr = event_.getDBR();
final String[] val = (String[]) dbr.getValue();
producer1.producer.send(new KeyedMessage<String, Signal>
(KafkaProperties.topic,new Signal(messageNo)));
System.out.println("producer sending msg"+messageNo);
messageNo++;
} catch (Exception ex) {
ex.printStackTrace();
}
}
Intente agregar
props.put("request.required.acks", "1")
a la configuración del productor. Por defecto, el productor no espera recibos y la entrega de mensajes no está garantizada. Por lo tanto, si inicia Broker justo antes de su prueba, el productor puede comenzar a enviar mensajes antes de que el broker se inicie por completo y se pierdan los primeros mensajes.Intente agregar
props.put("auto.offset.reset", "smallest")
a la configuración del consumidor. Es igual a la opción --from--from-beginning
begin de kafka-console-consumer.sh. Si su consumidor comienza más tarde que el productor y no hay datos de compensación guardados en Zookeeper, entonces, de forma predeterminada, comenzará a consumir solo mensajes nuevos (consulte la sección Config. Del consumidor en documentos).
Esto puede deberse a más no de particiones que no de consumidores. Verifique si el tema se crea solo con una sola partición, entonces no perderá ningún mensaje en el consumidor.