theme tema plantillas para pagina hacer desde crear con como cero bootstrap bluuweb aprender java apache-zookeeper apache-kafka

plantillas - Cómo crear un tema en Kafka a través de Java



falcon master wordpress (4)

La API de AdminUtils está en desuso. Hay una nueva API AdminZkClient que podemos usar para administrar temas en el servidor Kafka.

String zookeeperHost = "127.0.0.1:2181"; Boolean isSucre = false; int sessionTimeoutMs = 200000; int connectionTimeoutMs = 15000; int maxInFlightRequests = 10; Time time = Time.SYSTEM; String metricGroup = "myGroup"; String metricType = "myType"; KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs, connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType); AdminZkClient adminZkClient = new AdminZkClient(zkClient); String topicName1 = "myTopic"; int partitions = 3; int replication = 1; Properties topicConfig = new Properties(); adminZkClient.createTopic(topicName1,partitions,replication, topicConfig,RackAwareMode.Disabled$.MODULE$);

Puede consultar este enlace para obtener más información: https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

Quiero crear un tema en Kafka (kafka_2.8.0-0.8.1.1) a través de java. Funciona bien si creo un tema en el símbolo del sistema, y ​​si envío un mensaje a través de java api. Pero quiero crear un tema a través de java api. Después de una larga búsqueda encontré debajo del código,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Intenté el código anterior y se muestra que el tema está creado, pero no puedo insertar el mensaje en el tema. ¿Algún error en mi código? ¿O cualquier otra forma de lograr lo anterior?


Lo arreglé .. Después de una larga investigación ..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

A partir del código anterior, ZkClient creará un tema, pero esta información del tema no tendrá conocimiento del kafka. Entonces, lo que tenemos que hacer es, necesitamos crear un objeto para ZkClient de la siguiente manera,

Primero importa la siguiente declaración,

import kafka.utils.ZKStringSerializer$;

y crear objeto para ZkClient de la siguiente manera,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

Edición 1: (para comentario @ajkret)

El código anterior no funcionará para kafka> 0.9 ya que la API ha sido cambiada, use el código siguiente para kafka> 0.9

import java.util.Properties; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; public class KafkaTopicCreationInJava { public static void main(String[] args) throws Exception { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = "testTopic"; int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } } }


Para aquellos que intentan lograr esto en kafka v0.10.2.1 y tienen problemas con el error de serialización '' java.io.StreamCorruptedException: invalid stream header: 3139322E '' a continuación se muestra un código de trabajo de ejemplo con las importaciones necesarias.

import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer; import kafka.utils.ZkUtils; public static void createTopic(String topicName, int numPartitions, int numReplication) { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "199.98.916.902:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs); //Ref: https://gist.github.com/jjkoshy/3842975 zkClient.setZkSerializer(new ZkSerializer() { @Override public byte[] serialize(Object o) throws ZkMarshallingError { return ZKStringSerializer.serialize(o); } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { return ZKStringSerializer.deserialize(bytes); } }); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Enforced$.MODULE$); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } }


Solo un puntero para cualquiera que vea esto con una versión actualizada de Kafka (Al momento de escribir esto, estaba usando Kafka v0.10.0.0) .

Tienes que cambiar;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, topicConfiguration);

A los siguientes;

AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplications, true, Enforced$.MODULE$);

También es una buena idea cerrar la conexión una vez finalizada;

zkClient.close();