tutorial topic strategies kafkaconsumer kafka example create apache-kafka kafka-consumer-api

apache-kafka - topic - kafka producer



¿Cómo usar la API del consumidor de Kafka 0.8.2? (2)

Sí, puedo confirmar que la versión 0.8.2.1 tuvo problemas al consumir mensajes. Ahora haciendo un consumidor simple con Java / Groovy y la versión 0.10.1.0, todo funciona perfectamente.

Ya no es necesario configurar PARTITION_ASSIGNMENT_STRATEGY .

Estoy empezando con el último documento de Kafka http://kafka.apache.org/documentation.html . Pero me encuentro con algún problema cuando trato de usar la nueva API para el consumidor. He hecho el trabajo con los siguientes pasos:

1. Añadir una nueva dependencia.

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.8.2.1</version> </dependency>

2. Añadir configuraciones

Map<String, Object> config = new HashMap<String, Object>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3. Usa la API de KafkaConsumer

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config); consumer.subscribe("topic");

Sin embargo, cuando trato de sondear el mensaje del agente, no tengo más que nulo:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0); if (records != null) process(records); else System.err.println("null");

Y luego sé lo que está mal con el consumidor después de verificar el código fuente:

@Override public Map<String, ConsumerRecords<K,V>> poll(long timeout) { // TODO Auto-generated method stub return null; }

Para empeorar las cosas, no puedo encontrar ninguna otra información útil sobre la API 0.8.2, ya que todos los usos de Kafka no son compatibles con la última versión. ¿Alguien podría ayudarme? Muchas gracias.


También estoy tratando de escribir un Consumidor sobre Kafka 0.8.2.1 para leer los mensajes producidos por el nuevo Productor.

Hasta ahora, lo que tengo es que la API del productor está lista y es utilizable, mientras que en el lado del consumidor tenemos que esperar 0.8.3, como @habsq señaló y ya descubrió que hay algún código incluido para el consumidor, pero Todavía no es funcional.

Por lo tanto, el cliente a utilizar (la API del cliente actual) es el que se encuentra en el proyecto "central" de su versión actual de Kafka, es decir, 0.8.2.1 (mejor no rebajar el cliente a ninguna otra versión).

Así que por ahora necesitamos importar dos tarros: uno para los "nuevos" clientes java y otro para el proyecto principal, dependiendo también de la versión de scala que esté usando (uso 2.11).

En mi caso, uso Graddle para administrar dependencias, así que solo necesito importar

dependencies { compile group: ''org.apache.kafka'', name: ''kafka-clients'', version: ''0.8.2.1'' compile group: ''org.apache.kafka'', name: ''kafka_2.11'', version: ''0.8.2.1'' }

Al actualizar dependencias obtendrá todas las bibliotecas necesarias.

Si está utilizando una versión diferente de Scala, simplemente cambie la versión; de todos modos, puede encontrar todas las diferentes versiones o el pom completo en maven central: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

Si utiliza la implementación de Consumer, todos los ejemplos actuales deberían funcionar como de costumbre.

PS ref: Kafka-users ml hilo http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2