apache-kafka - applications - apache kafka windows
Clase de particionador KafKa, asigne un mensaje a la particiĆ³n dentro del tema usando la clave (3)
Soy nuevo en kafka, por lo que me disculpo si parezco estúpido, pero lo que entendí hasta ahora es ... Una secuencia de mensajes puede definirse como un tema, como una categoría. Y cada tema se divide en una o más particiones (cada partición puede tener múltiples réplicas). entonces actúan en paralelo
Desde el sitio principal de Kafka dicen
El productor puede elegir qué mensaje asignar a qué partición dentro del tema. Esto se puede hacer de forma rotatoria simplemente para equilibrar la carga o se puede hacer de acuerdo con alguna función de partición semántica (por ejemplo, en función de alguna de las claves del mensaje).
¿Significa esto que mientras consumo podré elegir el desplazamiento de mensaje de una partición en particular? Al ejecutar varias particiones, ¿es posible elegir una partición específica, es decir, la partición 0?
En Kafka 0.7 inicio rápido dicen
Enviar un mensaje con una clave de partición. Los mensajes con la misma clave se envían a la misma partición.
Y la clave se puede proporcionar al crear el productor de la siguiente manera
ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message");
producer.send(data);
Ahora, ¿cómo consumo el mensaje basado en esta clave? ¿Cuál es el impacto real de usar esta clave mientras se produce en Kafka?
Mientras creamos el productor en 0.8beta, podemos proporcionar el atributo de clase de particionador a través del archivo de configuración. La clase de particionador personalizado se puede crear implementando la interfaz del particionador kafka. Pero poco confuso cómo funciona exactamente. 0.8 doc tampoco explica mucho. ¿Algún consejo o mi falta algo?
¿Significa esto que mientras consumo podré elegir el desplazamiento de mensaje de una partición en particular? Al ejecutar varias particiones, ¿es posible elegir una partición específica, es decir, la partición 0?
Sí, puede elegir un mensaje de una partición específica de su consumidor, pero si desea que se identifique dinámicamente, entonces depende de la lógica de cómo haya implementado la Clase Partitioner en su productor.
Ahora, ¿cómo consumo el mensaje basado en esta clave? ¿Cuál es el impacto real de usar esta clave mientras se produce en Kafka?
Hay dos formas de consumir el mensaje. Uno está usando Zookeeper Host y otro es Static Host. El servidor Zookeper consume mensajes de todas las particiones. Sin embargo, si está utilizando Static Host, puede proporcionarle al intermediario el número de partición que debe consumir.
Por favor verifique abajo el ejemplo de Kafka 0.8
Productor
KeyedMessage<String, String> data = new KeyedMessage<String, String>(<<topicName>>, <<KeyForPartition>>, <<Message>>);
Clase de particion
public int partition(Object arg0, int arg1) {
// arg0 is the key given while producing, arg1 is the number of
// partition the broker has
long organizationId = Long.parseLong((String) arg0);
// if the given key is less than the no of partition available then send
// it according to the key given Else send it to the last partition
if (arg1 < organizationId) {
return (arg1 - 1);
}
// return (int) (organizationId % arg1);
return Integer.parseInt((String) arg0);
}
Así que la clase partiotioner decide dónde enviar el mensaje en función de su lógica.
Consumidor (PN: He utilizado la integración de Storm Kafka 0.8)
HostPort hosts = new HostPort("10.**.**.***",9092);
GlobalPartitionInformation gpi = new GlobalPartitionInformation();
gpi.addPartition(0, hosts);
gpi.addPartition(2, hosts);
StaticHosts statHost = new StaticHosts(gpi);
SpoutConfig spoutConf = new SpoutConfig(statHost, <<topicName>>, "/kafkastorm", <<spoutConfigId>>);
Cada tema en Kafka se divide en muchas particiones. La partición permite un consumo paralelo que aumenta el rendimiento.
El productor publica el mensaje a un tema utilizando la biblioteca cliente de productor de Kafka que equilibra los mensajes a través de las particiones disponibles utilizando un Particionador. El intermediario al que se conecta el productor se encarga de enviar el mensaje al intermediario, que es el líder de esa partición utilizando la información del propietario de la partición en zookeeper. Los consumidores utilizan la biblioteca de consumidores de alto nivel de Kafka (que maneja los cambios del líder del corredor, gestionando la información de compensación en zookeeper y descubriendo la información del propietario de la partición, etc. implícitamente) para consumir mensajes de particiones en flujos; cada flujo puede asignarse a unas pocas particiones, dependiendo de cómo el consumidor elija crear los flujos de mensajes.
Por ejemplo, si hay 10 particiones para un tema y 3 instancias de consumidor (C1, C2, C3 iniciadas en ese orden) todas pertenecientes al mismo Grupo de consumidores, podemos tener diferentes modelos de consumo que permiten leer el paralelismo de la siguiente manera
Cada consumidor utiliza un solo flujo. En este modelo, cuando C1 inicia, las 10 particiones del tema se asignan a la misma secuencia y C1 comienza a consumir desde esa secuencia. Cuando se inicia C2, Kafka reequilibra las particiones entre las dos corrientes. Por lo tanto, cada flujo se asignará a 5 particiones (dependiendo del algoritmo de rebalanceo, también podría ser de 4 contra 6) y cada consumidor consume de su flujo. De forma similar, cuando se inicia C3, las particiones se vuelven a equilibrar entre las 3 secuencias. Tenga en cuenta que en este modelo, al consumir desde un flujo asignado a más de una partición, el orden de los mensajes se mezclará entre las particiones.
Cada consumidor usa más de un flujo (por ejemplo, C1 usa 3, C2 usa 3 y C3 usa 4). En este modelo, cuando C1 se inicia, las 10 particiones se asignan a las 3 secuencias y C1 puede consumir de las 3 secuencias simultáneamente utilizando múltiples subprocesos. Cuando se inicia C2, las particiones se vuelven a equilibrar entre los 6 flujos y, al igual que cuando se inicia C3, las particiones se vuelven a equilibrar entre los 10 flujos. Cada consumidor puede consumir simultáneamente desde múltiples flujos. Tenga en cuenta que el número de flujos y particiones aquí son iguales. En caso de que el número de flujos exceda las particiones, algunos flujos no recibirán ningún mensaje ya que no se les asignará ninguna partición.
Esto es lo que he encontrado hasta ahora ..
Defina nuestra propia clase de particionador personalizado implementando la interfaz de Partitioner kafka. El método implementado tendrá dos argumentos, primero la clave que proporcionamos del productor y luego el número de partición disponible. Así que podemos definir nuestra propia lógica para establecer qué clave de mensaje va a qué partición.
Ahora, mientras creamos el productor, podemos especificar nuestra propia clase de particionador usando el atributo "partitioner.class"
props.put("partitioner.class", "path.to.custom.partitioner.class");
Si no lo mencionamos, Kafka usará su clase predeterminada e intentará distribuir el mensaje de manera uniforme entre las particiones disponibles.
También informe a Kafka cómo serializar la clave.
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
Ahora, si enviamos un mensaje usando una clave en el productor, el mensaje se enviará a una partición específica (basada en nuestra lógica escrita en la clase de particionador personalizado), y en el nivel del consumidor (SimpleConsumer) podemos especificar la partición para recuperar la mensajes especificos
En caso de que necesitemos pasar una cadena como una clave, la misma debe manejarse en la clase de particionador personalizado (tome el valor hash de la clave y luego tome los dos primeros dígitos, etc.)