apache-kafka purge

apache kafka - Tema de Purga Kafka



apache-kafka purge (12)

Envié un mensaje que era demasiado grande para un tema de mensaje kafka en mi máquina local, ahora recibo un error:

kafka.common.InvalidMessageSizeException: invalid message size

Aumentar el fetch.size no es ideal aquí, porque en realidad no quiero aceptar mensajes tan grandes. ¿Hay alguna forma de purgar el tema en kafka?


Actualice temporalmente el tiempo de retención en el tema a un segundo:

kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000

luego espere a que la purga surta efecto (aproximadamente un minuto). Una vez purgado, restablezca el valor de retention.ms anterior.


El consejo de Thomas es genial, pero desafortunadamente, zkCli en versiones anteriores de Zookeeper (por ejemplo, 3.3.6) no parece ser compatible con rmr . Por ejemplo, compare la implementación de línea de comandos en el Zookeeper moderno con la versión 3.3 .

Si se enfrenta a una versión anterior de Zookeeper, una solución es usar una biblioteca cliente como zc.zk para Python. Para las personas que no están familiarizadas con Python, debes instalarlo usando pip o easy_install . Luego, inicie un shell de Python ( python ) y puede hacer:

import zc.zk zk = zc.zk.ZooKeeper(''localhost:2181'') zk.delete_recursive(''brokers/MyTopic'')

o incluso

zk.delete_recursive(''brokers'')

si quieres eliminar todos los temas de Kafka.


El enfoque más simple es establecer que la fecha de los archivos de registro individuales sea más antigua que el período de retención. Luego, el agente debe limpiarlos y eliminarlos en unos segundos. Esto ofrece varias ventajas:

  1. No es necesario bajar los intermediarios, es una operación en tiempo de ejecución.
  2. Evita la posibilidad de excepciones de desplazamiento no válidas (más sobre eso a continuación).

En mi experiencia con Kafka 0.7.x, eliminar los archivos de registro y reiniciar el intermediario podría generar excepciones de compensación no válidas para ciertos consumidores. Esto ocurriría porque el intermediario reinicia las compensaciones en cero (en ausencia de cualquier archivo de registro existente), y un consumidor que consumía anteriormente del tema se volvería a conectar para solicitar una compensación específica [una vez válida]. Si esta compensación cae fuera de los límites de los nuevos registros de temas, no se produce ningún daño y el consumidor se reanuda al principio o al final. Pero, si el desplazamiento cae dentro de los límites de los nuevos registros de tema, el intermediario intenta recuperar el conjunto de mensajes pero falla porque el desplazamiento no se alinea con un mensaje real.

Esto podría mitigarse eliminando también las compensaciones del consumidor en el cuidador del zoológico para ese tema. Pero si no necesita un tema virgen y solo desea eliminar el contenido existente, simplemente ''tocar'' unos cuantos registros de temas es mucho más fácil y más confiable que detener intermediarios, eliminar registros de temas y borrar ciertos nodos de zookeeper. .


En ocasiones, si tiene un clúster saturado (demasiadas particiones o datos de temas cifrados o utiliza SSL, o el controlador está en un nodo defectuoso o la conexión es inestable, demorará mucho tiempo en eliminar dicho tema .

Sigo estos pasos, particularmente si estás usando Avro.

1: ejecutar con herramientas kafka:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: Ejecutar en el nodo de registro de Schema:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: Establezca la retención del tema en la configuración original, una vez que el tema esté vacío.

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

Espero que esto ayude a alguien, ya que no se anuncia fácilmente.


Estos son los pasos que sigo para eliminar un tema llamado MyTopic :

  1. Detener al daemon Apache Kafka
  2. Elimine la carpeta de datos del tema: rm -rf /tmp/kafka-logs/MyTopic-0
  3. Elimine los metadatos del tema: zkCli.sh luego zkCli.sh rmr /brokers/MyTopic
  4. Inicie el daemon Apache Kafka

Si se pierde el paso 3, Apache Kafka continuará informando el tema como presente (por ejemplo, si ejecuta kafka-list-topic.sh ).

Probado con Apache Kafka 0.8.0.


No se pudo agregar como comentario debido al tamaño: no estoy seguro si esto es cierto, además de actualizar retention.ms y retention.bytes, pero noté que la política de limpieza de temas debe ser "eliminar" (predeterminado), si es "compacta", se va a aferrarse a los mensajes durante más tiempo, es decir, si es "compacto", también debe especificar delete.retention.ms .

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

También tuvo que controlar los desplazamientos más antiguos / más recientes para confirmar que esto sucedió con éxito, también puede verificar el du -h / tmp / kafka-logs / test-topic-3-100- *

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" ''{sum += $3} END {print sum}'' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" ''{sum += $3} END {print sum}'' 26599762

El otro problema es que primero debe obtener la configuración actual para que recuerde revertirla después de que la eliminación sea exitosa: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics


Para limpiar todos los mensajes de un tema en particular usando su grupo de aplicaciones (GroupName debe ser igual que el nombre de la aplicación kafka group).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group


Probado en Kafka 0.8.2, para el ejemplo de inicio rápido: Primero, agregue una línea al archivo server.properties en la carpeta config:

delete.topic.enable=true

entonces, puedes ejecutar este comando:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test


Sí, detenga kafka y elimine manualmente todos los archivos del subdirectorio correspondiente (es fácil encontrarlo en el directorio de datos kafka). Después de reiniciar kafka, el tema estará vacío.


Si bien la respuesta aceptada es correcta, ese método ha quedado obsoleto. La configuración del tema ahora debe hacerse a través de kafka-configs .

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

Las configuraciones configuradas mediante este método se pueden visualizar con el comando

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic


kafka no tiene un método directo para el tema de purga / limpieza (colas), pero puede hacerlo eliminando ese tema y recupándolo.

primero de asegurarse de que el archivo de sever.properties tenga y si no agregue delete.topic.enable=true

luego, Eliminar tema bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

luego créelo de nuevo.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2


Para purgar la cola, puede eliminar el tema:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

luego recréalo:

bin/kafka-topics.sh --create --zookeeper localhost:2181 / --replication-factor 1 --partitions 1 --topic test