poll partitions kafka interval example auto_offset_reset apache-kafka kafka-consumer-api

apache kafka - partitions - El consumidor no recibe mensajes, la consola kafka, la nueva api del consumidor, Kafka 0.9



max poll interval ms (12)

¿Puedes por favor intentar así?

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

Estoy haciendo el Kafka Quickstart para Kafka 0.9.0.0.

Tengo un guardián del zoológico escuchando en localhost:2181 porque corrí

bin/zookeeper-server-start.sh config/zookeeper.properties

Tengo un solo agente que escucha en localhost:9092 porque corrí

bin/kafka-server-start.sh config/server.properties

Tengo un productor que publica para el tema "prueba" porque corrí

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test yello is this thing on? let''s try another gimme more

Cuando ejecuto el antiguo consumidor de API, funciona ejecutando

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

Sin embargo, cuando ejecuto el nuevo consumidor de API, no obtengo nada cuando ejecuto

bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning / --bootstrap-server localhost:9092

¿Es posible suscribirse a un tema del consumidor de la consola utilizando la nueva API? ¿Cómo puedo arreglarlo?


Ejecute el siguiente comando desde bin:

./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

"prueba" es el nombre del tema


En kafka_2.11-0.11.0.0, el servidor zookeeper está en desuso y usa bootstrap-server, y tomará la dirección IP y el puerto del agente. Si da los parámetros correctos del agente, podrá consumir mensajes.

por ejemplo, $ bin / kafka-console-consumer.sh --bootstrap-server: 9093 --tema de prueba --desde el inicio

Estoy usando el puerto 9093, para ti puede variar.

Saludos.


En mi caja de MAC enfrentaba el mismo problema de consola-consumidor que no consume ningún mensaje cuando se usa el comando

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic

Pero cuando lo intenté con

kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic --partition 0

Felizmente enumera los mensajes enviados. ¿Es esto un error en Kafka 1.10.11?


En mi caso, no funcionó con ninguno de los dos enfoques. También config/log4j.properties el nivel de registro a DEBUG en config/log4j.properties , inicié el consumidor de la consola.

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic MY_TOPIC

Luego consiguió el registro de abajo

[2018-03-11 12:11:25,711] DEBUG [MetadataCache brokerId=10] Error while fetching metadata for MY_TOPIC-3: leader not available (kafka.server.MetadataCache)

El punto aquí es que tengo dos nodos kafka pero uno está inactivo, por alguna razón, de forma predeterminada, el consumidor de kafka-console no consumirá si hay alguna partición no disponible porque el nodo está inactivo (la partición 3 en ese caso). No sucede en mi aplicación.

Las posibles soluciones son

  • Inicio de los corredores de down
  • Elimine el tema y vuelva a crearlo de esa manera todas las particiones se colocarán en el nodo del intermediario en línea

Estaba recibiendo el mismo problema en mi Mac. Revisé los registros y encontré el siguiente error.

Number of alive brokers ''1'' does not meet the required replication factor ''3'' for the offsets topic (configured via ''offsets.topic.replication.factor''). This error can be ignored if the cluster is starting up and not all brokers are up yet.

Esto se puede solucionar cambiando el factor de replicación a 1. Agregue la siguiente línea en server.properties y reinicie Kafka / Zookeeper.

offsets.topic.replication.factor=1


Este problema también impacta la ingesta de datos desde el kafka mediante el uso de canales y el sumidero de datos en HDFS.

Para solucionar el problema anterior:

  1. Detener a los corredores Kafka
  2. Conéctese al clúster zookeeper y elimine / brokers z node
  3. Reiniciar los corredores kafka

No hay ningún problema con respecto a la versión del cliente kafka y la versión de scala que estamos usando el clúster. Zookeeper puede tener información incorrecta acerca de los hosts del broker.

Para verificar la acción:

Crear tema en kafka.

kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --desde-comenzar

Abre un canal productor y alimenta algunos mensajes a él.

kafka-console-producer --broker-list slavenode03.cdh.com:9092 --topic rkkrishnaa3210

Abra un canal de consumidores para consumir el mensaje de un tema específico.

kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --desde-comenzar

Para probar esto desde el canal:

Flume agente de configuración:

rk.sources = source1 rk.channels = channel1 rk.sinks = sink1 rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181 rk.sources.source1.topic = rkkrishnaa321 rk.sources.source1.groupId = flume1 rk.sources.source1.channels = channel1 rk.sources.source1.interceptors = i1 rk.sources.source1.interceptors.i1.type = timestamp rk.sources.source1.kafka.consumer.timeout.ms = 100 rk.channels.channel1.type = memory rk.channels.channel1.capacity = 10000 rk.channels.channel1.transactionCapacity = 1000 rk.sinks.sink1.type = hdfs rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d rk.sinks.sink1.hdfs.rollInterval = 5 rk.sinks.sink1.hdfs.rollSize = 0 rk.sinks.sink1.hdfs.rollCount = 0 rk.sinks.sink1.hdfs.fileType = DataStream rk.sinks.sink1.channel = channel1

Ejecutar agente de canalización:

flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk

Observe los registros del consumidor de que el mensaje del tema está escrito en HDFS.

18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1 18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1 18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned. 18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started. 18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean. 18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started 18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp 18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp 18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920 18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.


Me acabo de encontrar con este problema y la solución fue eliminar /brokers en zookeeper y reiniciar los nodos kafka.

bin/zookeeper-shell <zk-host>:2181

y entonces

rmr /brokers

No estoy seguro de por qué esto lo resuelve.

Cuando habilité el registro de depuración, vi este mensaje de error una y otra vez en el consumidor:

2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - Sending GroupCoordinator request for group test to broker xx.xx.xx.xx:9092 (id: 1007 rack: null) 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - Received GroupCoordinator response ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group test 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - Group coordinator lookup for group test failed: The group coordinator is not available. 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - Coordinator discovery failed for group test, refreshing metadata


Para mí, la solución descrita en este hilo funcionó - https://.com/a/51540528/7568227

Comprobar si

offsets.topic.replication.factor

(o probablemente otros parámetros de configuración relacionados con la replicación) no es mayor que el número de intermediarios. Ese fue el problema en mi caso.

No hubo necesidad de usar --partition 0 después de esta corrección.

De lo contrario, recomiendo seguir el procedimiento de depuración descrito en el hilo mencionado.


Su localhost es el foo aquí. Si reemplaza la palabra localhost por el nombre de host real, debería funcionar.

Me gusta esto:

productor

./bin/kafka-console-producer.sh --broker-list / sandbox-hdp.hortonworks.com:9092 --topic test

consumidor:

./bin/kafka-console-consumer.sh --topic test --from-beginning / --bootstrap-server bin/kafka-console-consumer.sh --new-consumer / --topic test --from-beginning / --bootstrap-server localhost:9092


Tengo el mismo problema, ahora me he dado cuenta.

Cuando usa --zookeeper, se supone que se le proporcionará una dirección de zookeeper como parámetro.

Cuando utiliza --bootstrap-server, se supone que se le debe proporcionar la dirección del agente como parámetro.


Utilice esto: bin / kafka-console-consumer.sh --bootstrap-server localhost: 9092 --tema de prueba --desde-comienzo

Nota: elimine "--new-consumer" de su comando

Para referencia, consulte aquí: https://kafka.apache.org/quickstart