apache kafka - partitions - El consumidor no recibe mensajes, la consola kafka, la nueva api del consumidor, Kafka 0.9
max poll interval ms (12)
¿Puedes por favor intentar así?
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
Estoy haciendo el Kafka Quickstart para Kafka 0.9.0.0.
Tengo un guardián del zoológico escuchando en localhost:2181
porque corrí
bin/zookeeper-server-start.sh config/zookeeper.properties
Tengo un solo agente que escucha en localhost:9092
porque corrí
bin/kafka-server-start.sh config/server.properties
Tengo un productor que publica para el tema "prueba" porque corrí
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
yello
is this thing on?
let''s try another
gimme more
Cuando ejecuto el antiguo consumidor de API, funciona ejecutando
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Sin embargo, cuando ejecuto el nuevo consumidor de API, no obtengo nada cuando ejecuto
bin/kafka-console-consumer.sh --new-consumer --topic test --from-beginning /
--bootstrap-server localhost:9092
¿Es posible suscribirse a un tema del consumidor de la consola utilizando la nueva API? ¿Cómo puedo arreglarlo?
Ejecute el siguiente comando desde bin:
./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
"prueba" es el nombre del tema
En kafka_2.11-0.11.0.0, el servidor zookeeper está en desuso y usa bootstrap-server, y tomará la dirección IP y el puerto del agente. Si da los parámetros correctos del agente, podrá consumir mensajes.
por ejemplo, $ bin / kafka-console-consumer.sh --bootstrap-server: 9093 --tema de prueba --desde el inicio
Estoy usando el puerto 9093, para ti puede variar.
Saludos.
En mi caja de MAC enfrentaba el mismo problema de consola-consumidor que no consume ningún mensaje cuando se usa el comando
kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic
Pero cuando lo intenté con
kafka-console-consumer --bootstrap-server localhost:9095 --from-beginning --topic my-replicated-topic --partition 0
Felizmente enumera los mensajes enviados. ¿Es esto un error en Kafka 1.10.11?
En mi caso, no funcionó con ninguno de los dos enfoques. También config/log4j.properties
el nivel de registro a DEBUG en config/log4j.properties
, inicié el consumidor de la consola.
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic MY_TOPIC
Luego consiguió el registro de abajo
[2018-03-11 12:11:25,711] DEBUG [MetadataCache brokerId=10] Error while fetching metadata for MY_TOPIC-3: leader not available (kafka.server.MetadataCache)
El punto aquí es que tengo dos nodos kafka pero uno está inactivo, por alguna razón, de forma predeterminada, el consumidor de kafka-console no consumirá si hay alguna partición no disponible porque el nodo está inactivo (la partición 3 en ese caso). No sucede en mi aplicación.
Las posibles soluciones son
- Inicio de los corredores de down
- Elimine el tema y vuelva a crearlo de esa manera todas las particiones se colocarán en el nodo del intermediario en línea
Estaba recibiendo el mismo problema en mi Mac. Revisé los registros y encontré el siguiente error.
Number of alive brokers ''1'' does not meet the required replication factor ''3'' for the offsets topic (configured via ''offsets.topic.replication.factor'').
This error can be ignored if the cluster is starting up and not all brokers are up yet.
Esto se puede solucionar cambiando el factor de replicación a 1. Agregue la siguiente línea en server.properties y reinicie Kafka / Zookeeper.
offsets.topic.replication.factor=1
Este problema también impacta la ingesta de datos desde el kafka mediante el uso de canales y el sumidero de datos en HDFS.
Para solucionar el problema anterior:
- Detener a los corredores Kafka
- Conéctese al clúster zookeeper y elimine / brokers z node
- Reiniciar los corredores kafka
No hay ningún problema con respecto a la versión del cliente kafka y la versión de scala que estamos usando el clúster. Zookeeper puede tener información incorrecta acerca de los hosts del broker.
Para verificar la acción:
Crear tema en kafka.
kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --desde-comenzar
Abre un canal productor y alimenta algunos mensajes a él.
kafka-console-producer --broker-list slavenode03.cdh.com:9092 --topic rkkrishnaa3210
Abra un canal de consumidores para consumir el mensaje de un tema específico.
kafka-console-consumer --bootstrap-server slavenode01.cdh.com:9092 --topic rkkrishnaa3210 --desde-comenzar
Para probar esto desde el canal:
Flume agente de configuración:
rk.sources = source1
rk.channels = channel1
rk.sinks = sink1
rk.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
rk.sources.source1.zookeeperConnect = ip-20-0-21-161.ec2.internal:2181
rk.sources.source1.topic = rkkrishnaa321
rk.sources.source1.groupId = flume1
rk.sources.source1.channels = channel1
rk.sources.source1.interceptors = i1
rk.sources.source1.interceptors.i1.type = timestamp
rk.sources.source1.kafka.consumer.timeout.ms = 100
rk.channels.channel1.type = memory
rk.channels.channel1.capacity = 10000
rk.channels.channel1.transactionCapacity = 1000
rk.sinks.sink1.type = hdfs
rk.sinks.sink1.hdfs.path = /user/ce_rk/kafka/%{topic}/%y-%m-%d
rk.sinks.sink1.hdfs.rollInterval = 5
rk.sinks.sink1.hdfs.rollSize = 0
rk.sinks.sink1.hdfs.rollCount = 0
rk.sinks.sink1.hdfs.fileType = DataStream
rk.sinks.sink1.channel = channel1
Ejecutar agente de canalización:
flume-ng agent --conf . -f flume.conf -Dflume.root.logger=DEBUG,console -n rk
Observe los registros del consumidor de que el mensaje del tema está escrito en HDFS.
18/02/16 05:21:14 INFO internals.AbstractCoordinator: Successfully joined group flume1 with generation 1
18/02/16 05:21:14 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [rkkrishnaa3210-0] for group flume1
18/02/16 05:21:14 INFO kafka.SourceRebalanceListener: topic rkkrishnaa3210 - partition 0 assigned.
18/02/16 05:21:14 INFO kafka.KafkaSource: Kafka source source1 started.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: source1: Successfully registered new MBean.
18/02/16 05:21:14 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: source1 started
18/02/16 05:21:41 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/16 05:21:42 INFO hdfs.BucketWriter: Creating /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Closing /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp
18/02/16 05:21:48 INFO hdfs.BucketWriter: Renaming /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920.tmp to /user/ce_rk/kafka/rkkrishnaa3210/18-02-16/FlumeData.1518758501920
18/02/16 05:21:48 INFO hdfs.HDFSEventSink: Writer callback called.
Me acabo de encontrar con este problema y la solución fue eliminar /brokers
en zookeeper y reiniciar los nodos kafka.
bin/zookeeper-shell <zk-host>:2181
y entonces
rmr /brokers
No estoy seguro de por qué esto lo resuelve.
Cuando habilité el registro de depuración, vi este mensaje de error una y otra vez en el consumidor:
2017-07-07 01:20:12 DEBUG AbstractCoordinator:548 - Sending GroupCoordinator request for group test to broker xx.xx.xx.xx:9092 (id: 1007 rack: null) 2017-07-07 01:20:12 DEBUG AbstractCoordinator:559 - Received GroupCoordinator response ClientResponse(receivedTimeMs=1499390412231, latencyMs=84, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=13,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group test 2017-07-07 01:20:12 DEBUG AbstractCoordinator:581 - Group coordinator lookup for group test failed: The group coordinator is not available. 2017-07-07 01:20:12 DEBUG AbstractCoordinator:215 - Coordinator discovery failed for group test, refreshing metadata
Para mí, la solución descrita en este hilo funcionó - https://.com/a/51540528/7568227
Comprobar si
offsets.topic.replication.factor
(o probablemente otros parámetros de configuración relacionados con la replicación) no es mayor que el número de intermediarios. Ese fue el problema en mi caso.
No hubo necesidad de usar --partition 0 después de esta corrección.
De lo contrario, recomiendo seguir el procedimiento de depuración descrito en el hilo mencionado.
Su localhost es el foo aquí. Si reemplaza la palabra localhost por el nombre de host real, debería funcionar.
Me gusta esto:
productor
./bin/kafka-console-producer.sh --broker-list /
sandbox-hdp.hortonworks.com:9092 --topic test
consumidor:
./bin/kafka-console-consumer.sh --topic test --from-beginning /
--bootstrap-server bin/kafka-console-consumer.sh --new-consumer /
--topic test --from-beginning /
--bootstrap-server localhost:9092
Tengo el mismo problema, ahora me he dado cuenta.
Cuando usa --zookeeper, se supone que se le proporcionará una dirección de zookeeper como parámetro.
Cuando utiliza --bootstrap-server, se supone que se le debe proporcionar la dirección del agente como parámetro.
Utilice esto: bin / kafka-console-consumer.sh --bootstrap-server localhost: 9092 --tema de prueba --desde-comienzo
Nota: elimine "--new-consumer" de su comando
Para referencia, consulte aquí: https://kafka.apache.org/quickstart