apache kafka - quickstart - ¿Cómo obtener datos del antiguo punto de compensación en Kafka?
que es zookeeper kafka (7)
Estoy usando zookeeper para obtener datos de kafka. Y aquí siempre obtengo datos del último punto de compensación. ¿Hay alguna manera de especificar el tiempo de compensación para obtener datos antiguos?
Hay una opción autooffset.reset. Acepta el más pequeño o el más grande. ¿Alguien puede explicar qué es el más pequeño y el más grande? ¿Autooffset.reset ayuda a obtener datos del antiguo punto de compensación en lugar del último punto de compensación?
¿Has probado esto?
bin / kafka-console-consumer.sh --bootstrap-servidor localhost: 9092 -estánda de prueba --desde el principio
Imprimirá todos los mensajes para el tema dado, "prueba" en este ejemplo.
Más detalles en este enlace https://kafka.apache.org/quickstart
Consulte el documento sobre configuración de kafka: http://kafka.apache.org/08/configuration.html para su consulta sobre los valores más pequeños y más grandes del parámetro de compensación.
Por cierto, mientras exploraba kafka, me preguntaba cómo reproducir todos los mensajes para un consumidor. Me refiero a si un grupo de consumidores ha sondeado todos los mensajes y quiere volver a obtenerlos.
La forma en que se puede lograr es borrar los datos del zookeeper. Use la clase kafka.utils.ZkUtils para eliminar un nodo en zookeeper. Debajo está su uso:
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
De la documentation Kafka dicen que "kafka.api.OffsetRequest.EarliestTime () encuentra el comienzo de los datos en los registros y comienza la transmisión desde allí, kafka.api.OffsetRequest.LatestTime () solo transmitirá nuevos mensajes. ese desplazamiento 0 es el desplazamiento inicial, ya que los mensajes envejecen fuera del registro a lo largo del tiempo ".
Utilice SimpleConsumerExample aquí: documentation
Pregunta similar: Kafka High Level Consumer Fetch Todos los mensajes del tema Uso de Java API (Equivalente a --desde el principio)
Esto podría ayudar
Kafka Protocol Doc es una gran fuente para jugar con request / response / Offsets / Messages: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol utiliza el ejemplo de Simple Consumer como donde el siguiente código demuestra el estado:
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = simpleConsumer.fetch(req);
establezca readOffset para iniciar el desplazamiento inicial desde. pero debe verificar el desplazamiento máximo, así como los anteriores, proporcionarán un recuento de compensaciones limitado según FetchSize en el último parámetro del método addFetch.
Los consumidores pertenecen siempre a un grupo y, para cada partición, el Zookeeper realiza un seguimiento del progreso de ese grupo de consumidores en la partición.
Para buscar desde el principio, puede eliminar todos los datos asociados con el progreso como Hussain se refirió
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
También puede especificar el desplazamiento de la partición que desea, como se especifica en core / src / main / scala / kafka / tools / UpdateOffsetsInZK.scala
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
Sin embargo, el desplazamiento no está indexado en el tiempo, pero usted sabe para cada partición que es una secuencia.
Si su mensaje contiene una marca de tiempo (y tenga en cuenta que esta marca de tiempo no tiene nada que ver con el momento en que Kafka recibió su mensaje), puede intentar hacer un indexador que intente recuperar una entrada en pasos incrementando el desplazamiento en N, y almacene el tupla (tema X, parte 2, desplazamiento 100, marca de tiempo) en algún lugar.
Cuando desee recuperar entradas desde un momento específico en el tiempo, puede aplicar una búsqueda binaria a su índice aproximado hasta que encuentre la entrada que desea y busque desde allí.
Usando KafkaConsumer puedes usar Seek, SeekToBeginning y SeekToEnd para moverte en la transmisión.
Además, si no se proporciona ninguna partición, buscará la primera compensación para todas las particiones actualmente asignadas.
Por ahora
Las preguntas frecuentes de Kafka dan una respuesta a este problema.
¿Cómo puedo obtener compensaciones de mensajes para una determinada marca de tiempo con OffsetRequest?
Kafka permite consultar las compensaciones de los mensajes por tiempo y lo hace en la granularidad del segmento. El parámetro de marca de tiempo es la marca de tiempo de unix y consultar el desplazamiento por marca de tiempo devuelve el último desplazamiento posible del mensaje que se agrega a más tardar en la marca de tiempo determinada. Hay 2 valores especiales de la marca de tiempo: el más reciente y el más antiguo. Para cualquier otro valor de la marca de tiempo de unix, Kafka obtendrá el desplazamiento de inicio del segmento de registro que se crea a más tardar en la marca de tiempo determinada. Debido a esto, y dado que la solicitud de compensación solo se sirve en la granularidad del segmento, la solicitud de recuperación de compensación arroja resultados menos precisos para segmentos de mayor tamaño.
Para obtener resultados más precisos, puede configurar el tamaño del segmento de registro en función del tiempo (log.roll.ms) en lugar del tamaño (log.segment.bytes). Sin embargo, se debe tener cuidado ya que hacerlo podría aumentar el número de manejadores de archivos debido a la rotación frecuente de segmentos de registro.
Plan futuro
Kafka agregará la marca de tiempo al formato de mensaje. Referirse a
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata