utilizar uso usar usa topic tecnologia que porque para kafka crear como caso python apache-kafka kafka-consumer-api kafka-python

python - uso - para que se usa apache kafka



¿Cómo obtener la última compensación de una partición para un tema kafka? (5)

Con kafka-python>=1.3.4 puedes usar:

kafka.KafkaConsumer.end_offsets(partitions)

Obtener el último desplazamiento para las particiones dadas. El último desplazamiento de una partición es el desplazamiento del mensaje siguiente, es decir, el desplazamiento del último mensaje disponible + 1.

from kafka import TopicPartition from kafka.consumer import KafkaConsumer con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.end_offsets(ps)

Estoy utilizando el consumidor de alto nivel de Python para Kafka y quiero conocer las últimas compensaciones para cada partición de un tema. Sin embargo no puedo hacerlo funcionar.

from kafka import TopicPartition from kafka.consumer import KafkaConsumer con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning()

Pero la salida que obtengo es

For partition 0 highwater is None For partition 1 highwater is None For partition 2 highwater is None For partition 3 highwater is None For partition 4 highwater is None For partition 5 highwater is None .... For partition 96 highwater is None For partition 97 highwater is None For partition 98 highwater is None For partition 99 highwater is None Subscription = None con.seek_to_beginning() = None con.seek_to_end() = None

Tengo un enfoque alternativo utilizando assign pero el resultado es el mismo

con = KafkaConsumer(bootstrap_servers = brokers) ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)] con.assign(ps) for p in ps: print "For partition %s highwater is %s"%(p.partition,con.highwater(p)) print "Subscription = %s"%con.subscription() print "con.seek_to_beginning() = %s"%con.seek_to_beginning() print "con.seek_to_end() = %s"%con.seek_to_end()

Según algunos documentos, puedo obtener este comportamiento si no se ha emitido una fetch . Pero no puedo encontrar una manera de forzar eso. ¿Qué estoy haciendo mal?

¿O existe una forma diferente / más simple de obtener las últimas compensaciones para un tema?


Finalmente, después de pasar un día en esto y varios inicios en falso, pude encontrar una solución y hacer que funcione. Publicándola para que otros puedan referirse a ella.

from kafka import SimpleClient from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy from kafka.common import OffsetRequestPayload client = SimpleClient(brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) for r in offsets_responses: print "partition = %s, offset = %s"%(r.partition, r.offsets[0])


Otra forma de lograr esto es encuestando al consumidor para obtener el último desplazamiento consumido y luego usar el método seek_to_end para obtener la partición de desplazamiento disponible más reciente.

from kafka import KafkaConsumer consumer = KafkaConsumer(''my-topic'', group_id=''my-group'', bootstrap_servers=[''localhost:9092'']) consumer.poll() consumer.seek_to_end()

Este método es particularmente útil cuando se usan grupos de consumidores.

FUENTES:

  1. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.poll
  2. https://kafka-python.readthedocs.io/en/master/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.seek_to_end

Si desea utilizar los scripts de shell de Kafka presentes en kafka / bin, puede obtener las compensaciones más recientes y más pequeñas utilizando kafka-run-class.sh.

Para obtener el último comando de desplazamiento se verá así

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname

Para obtener el comando de desplazamiento más pequeño se verá así

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname

Puede encontrar más información sobre Get Offsets Shell en el siguiente link

¡Espero que esto ayude!


from kafka import KafkaConsumer, TopicPartition TOPIC = ''MYTOPIC'' GROUP = ''MYGROUP'' BOOTSTRAP_SERVERS = [''kafka01:9092'', ''kafka02:9092''] consumer = KafkaConsumer( bootstrap_servers=BOOTSTRAP_SERVERS, group_id=GROUP, enable_auto_commit=False ) for p in consumer.partitions_for_topic(TOPIC): tp = TopicPartition(TOPIC, p) consumer.assign([tp]) committed = consumer.committed(tp) consumer.seek_to_end(tp) last_offset = consumer.position(tp) print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed))) consumer.close(autocommit=False)