run kafka images hub docker apache-zookeeper apache-kafka google-cloud-platform kubernetes

docker - images - Kafka en Kubernetes multinodo



kubectl run (4)

Así que mi objetivo aquí es establecer un grupo de varios agentes kafka de forma distribuida. Pero no veo la manera de hacer que los agentes se den cuenta el uno del otro.

Por lo que yo entiendo, cada corredor necesita una identificación separada en su configuración, que no puedo garantizar o configurar si lanzo los contenedores de kubernetes.

¿También necesitan tener el mismo publicised_host?

¿Hay algún parámetro que me falta que deba cambiarse para que los nodos se descubran entre sí?

¿Sería viable realizar dicha configuración al final del Dockerfile con un script? ¿Y / o un volumen compartido?

Actualmente estoy tratando de hacer esto con la imagen de spotify / kafka que tiene una combinación preconfigurada de zookeeper + kafka, en Kubernetes vainilla.


Esto aparece prominentemente en mis búsquedas, pero contiene información bastante desactualizada. Para actualizar esto con una solución más moderna, debe usar una implementación StatefulSet , que generará pods que tienen un contador de enteros en lugar de un hash en su nombre, por ej. kafka-controller-0.

Este es, por supuesto, el nombre de host, por lo que a partir de ahí es una cuestión sencilla de extraer un identificador de intermediario fijo e invariable utilizando awk:

hostname | awk -F''-'' ''{print $3}''

Los contenedores más populares disponibles para Kafka en estos días tienen un comando de ID de corredor.


Hice esto usando Docker-Componer (La diferencia para Kubernetes sería que pasaría la ID a través de su service.yaml y tendría 2 servicios):

kafka1: build: kafka-0.8.1/ ports: - 9092 links: - zookeeper environment: - ID=1 kafka2: build: kafka-0.8.1/ ports: - 9092 links: - zookeeper environment: - ID=2

Config:

broker.id=${ID} port=9092 advertised.host.name=${HOST} advertised.port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kafka/kafka-logs-${ID} num.partitions=200 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=${DOCKER_ZOOKEEPER_1_PORT_2181_TCP_ADDR}:${DOCKER_ZOOKEEPER_1_PORT_2181_TCP_PORT} zookeeper.connection.timeout.ms=6000

sh:

#!/bin/bash echo "Running config" export HOST=`grep $HOSTNAME /etc/hosts | awk ''{print $1}''` export ID=${ID:?} perl -p -i -e ''s//$/{([^}]+)/}/defined $ENV{$1} ? $ENV{$1} : $&/eg'' < /broker.template > $KAFKA_HOME/config/server.properties echo "Done" echo "starting kafka with:" echo "$KAFKA_HOME/config/server.properties" echo "" cat $KAFKA_HOME/config/server.properties $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties


Mi solución para esto ha sido utilizar la dirección IP como identificación : recortar los puntos y obtener una identificación única que también está disponible fuera del contenedor en otros contenedores.

Con un Servicio, puede obtener acceso a las direcciones IP de los contenedores múltiples (consulte mi respuesta aquí sobre cómo hacer esto: ¿cuál es la mejor manera de permitir que las cápsulas de kubenetes se comuniquen entre sí?

para que pueda obtener sus identificaciones también si utiliza direcciones IP como ID única. El único problema es que las identificaciones no son continuas o comienzan en 0, pero zookeeper / kafka no parece importarle.

EDIT 1:

El seguimiento se refiere a la configuración de Zookeeper:

Cada nodo ZK necesita saber de los otros nodos. El servicio de descubrimiento de Kubernetes conoce los nodos que están dentro de un Servicio, por lo que la idea es iniciar un Servicio con los nodos ZK.

Este Servicio debe iniciarse ANTES de crear el ReplicationController (RC) de los pods de Zookeeper.

El script de inicio del contenedor ZK necesitará:

  • espere a que el servicio de descubrimiento llene el servicio ZK con sus nodos (eso toma unos segundos, por ahora acabo de agregar un descanso 10 al comienzo de mi script de inicio pero de manera más confiable debería buscar que el servicio tenga al menos 3 nodos) en eso.)
  • busque los contenedores que forman el Servicio en el servicio de descubrimiento: esto se hace consultando el API. la variable de entorno KUBERNETES_SERVICE_HOST está disponible en cada contenedor. El punto final para encontrar la descripción del servicio es entonces

URL="http(s)://$USERNAME:$PASSWORD@${KUBERNETES_SERVICE_HOST/api/v1/namespaces/${NAMESPACE}/endpoints/${SERVICE_NAME}"

donde NAMESPACE está default menos que lo hayas cambiado, y SERVICE_NAME sería el cuidador del zoológico si nombró a su servicio zookeeper.

allí se obtiene la descripción de los contenedores que forman el Servicio, con su ip en un campo "ip". Tu puedes hacer:

curl -s $URL | grep ''/"ip/"'' | awk ''{print $2}'' | awk -F/" ''{print $2}''

para obtener la lista de IP en el Servicio. Con eso, rellene el zoo.cfg en el nodo usando la ID definida anteriormente

Es posible que necesite el NOMBRE DE USUARIO y la CONTRASEÑA para llegar al punto final en servicios como el motor de contenedor de google. Deben colocarse en un volumen secreto (ver documento aquí: http://kubernetes.io/v1.0/docs/user-guide/secrets.html )

También necesitaría usar curl -s --insecure en Google Container Engine a menos que tenga la molestia de agregar el certificado CA a sus pods

Básicamente agregue el volumen al contenedor y busque los valores del archivo. (Al contrario de lo que dice el documento, NO coloque el / n al final del nombre de usuario o contraseña cuando codifique base64: simplemente complica su vida al leerlos)

EDICION 2:

Otra cosa que deberá hacer en los nodos de Kafka es obtener el IP y los nombres de host, y ponerlos en el archivo / etc / hosts. Parece que Kafka necesita conocer los nodos por nombres de host, y estos no están establecidos dentro de los nodos de servicio por defecto

EDIT 3:

Después de muchas pruebas y pensamientos, usar IP como ID puede no ser tan bueno: depende de cómo configure el almacenamiento. para cualquier tipo de servicio distribuido como zookeeper, kafka, mongo, hdfs, es posible que desee utilizar el tipo de almacenamiento emptyDir, por lo que es solo en ese nodo (montar un tipo de almacenamiento remoto frustra el propósito de distribuir estos servicios) emptyDir se relacionará con los datos en el mismo nodo, por lo que parece más lógico utilizar el NODE ID (nodo IP) como el ID, porque entonces un pod que se reinicia en el mismo nodo tendrá los datos. Eso evita la posible corrupción de los datos (si un nuevo nodo comienza a escribir en el mismo directorio que no está realmente vacío, quién sabe qué puede suceder) y también con Kafka, los temas se le asignan a un intermediario.id, si cambia la identificación del intermediario, zookeeper no actualiza el tema broker.id y parece que el tema está disponible, PERO apunta al corrector.id incorrecto y es un desastre.

Hasta ahora todavía no he encontrado cómo obtener la IP del nodo, pero creo que es posible buscar en la API buscando los nombres de los pods de servicio y luego el nodo en el que están desplegados.

EDIT 4

Para obtener el nodo IP, puede obtener el nombre de host del pod == nombre de los puntos finales API / api / v1 / namespaces / default / endpoints / como se explicó anteriormente. luego puede obtener la IP del nodo del nombre del pod con / api / v1 / namespaces / default / pods /

PD: esto está inspirado en el ejemplo del repositorio de Kubernetes (ejemplo para repensar aquí: https://github.com/kubernetes/kubernetes/tree/master/examples/rethinkdb