java - Kafka: escribiendo serializador personalizado
json serialization (2)
Estoy tratando de construir un POC con Kafka 0.8.1. Estoy usando mi propia clase java como un mensaje Kafka que tiene un montón de tipos de datos String. No puedo usar la clase de serializador por defecto o la clase de serializador de cadena que viene con la biblioteca Kafka. Supongo que necesito escribir mi propio serializador y alimentarlo a las propiedades del productor. Si sabe que está escribiendo un ejemplo de serializador personalizado en Kafka (en java), por favor comparta. Aprecio mucho, muchas gracias.
Las cosas necesarias para escribir un serializador personalizado son:
- Implementar el
Encoder
con un objeto especificado para el genérico.- Se requiere el suministro de un constructor
VerifiableProperties
- Se requiere el suministro de un constructor
- Reemplace el
toBytes(...)
asegurándose de que se devuelva una matriz de bytes - Inyecte la clase serializador en
ProducerConfig
Declarar un serializador personalizado para un productor
Como anotó en su pregunta, Kafka proporciona un medio para declarar un serializador específico para un productor. La clase de serializador se establece en una instancia de ProducerConfig
y esa instancia se utiliza para construir la clase de Producer
deseada.
Si sigue el Ejemplo de productor de Kafka, construirá ProducerConfig
través de un objeto Properties
. Al construir su archivo de propiedades, asegúrese de incluir:
props.put("serializer.class", "path.to.your.CustomSerializer");
Con la ruta a la clase que desea que Kafka use para serializar los mensajes antes de agregarlos al registro.
Creando un serializador personalizado que Kafka entiende
Escribir un serializador personalizado que Kafka pueda interpretar correctamente requiere la implementación de la clase Scala del Encoder[T]
que proporciona Kafka. Implementar rasgos en java es extraño , pero el siguiente método funcionó para serializar JSON en mi proyecto:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
Su pregunta hace que parezca que está usando un objeto (llamémoslo CustomMessage
) para todos los mensajes CustomMessage
a su registro. Si ese es el caso, su serializador podría verse más como esto:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
Lo que dejaría su configuración de propiedad para verse así:
props.put("serializer.class", "path.to.your.CustomSerializer");
Necesitas implementar tanto codificar como decodificar.
public class JsonEncoder implements Encoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
El codificador decodificador
public class JsonDecoder implements Decoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonDecoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public Object fromBytes(byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(bytes, Map.class);
} catch (IOException e) {
LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
}
return null;
}
}
La entrada pom
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.1.3</version>
</dependency>
Establecer el codificador predeterminado en la propiedad Kafka
properties.put("serializer.class","kafka.serializer.DefaultEncoder");
El código del escritor y lector es el siguiente
byte[] bytes = encoder.toBytes(map);
KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);
JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());