texto puedo porque por mensajes mensaje mandar internet gratis fotos enviar desde con como java apache-kafka

java - porque - ¿Cómo puedo enviar mensajes grandes con Kafka(más de 15 MB)?



porque no puedo enviar fotos por mensaje de texto (6)

Envío mensajes de cadena a Kafka V. 0.8 con la API de Java Producer. Si el tamaño del mensaje es de aproximadamente 15 MB, obtengo una MessageSizeTooLargeException . He intentado configurar message.max.bytes en 40 MB, pero sigo teniendo la excepción. Pequeños mensajes trabajados sin problemas.

(La excepción aparece en el productor, no tengo un consumidor en esta aplicación).

¿Qué puedo hacer para deshacerme de esta excepción?

Mi ejemplo de configuración del productor

private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); }

Registro de errores:

4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224] kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) at kafka.producer.Producer.send(Unknown Source) at kafka.javaapi.producer.Producer.send(Unknown Source)


Es necesario anular las siguientes propiedades:

Configuración de agente ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • mensaje.max.bytes

Configuraciones del consumidor ($ KAFKA_HOME / config / consumer.properties)
Este paso no funcionó para mí. Lo agrego a la aplicación del consumidor y estaba funcionando bien

  • fetch.message.max.bytes

Reinicie el servidor.

consulte esta documentación para obtener más información: http://kafka.apache.org/08/configuration.html


La idea es tener el mismo tamaño de mensaje que se envía desde Kafka Producer a Kafka Broker y luego Kafka Consumer lo recibe.

Productor de Kafka -> Kafka Broker -> Kafka Consumer

Supongamos que si el requisito es enviar 15 MB de mensaje, entonces el Productor, el Agente y el Consumidor, los tres, deben estar sincronizados.

Kafka Producer envía 15 MB -> Kafka Broker permite / almacena 15 MB -> Kafka Consumer recibe 15 MB

Por lo tanto, la configuración debería ser A.) En el agente: mensaje.max.bytes = 15728640 replica.fetch.max.bytes = 15728640

B.) En el consumidor: fetch.message.max.bytes = 15728640


La respuesta de @laughing_man es bastante precisa. Pero todavía quería dar una recomendación.

Kafka no está destinado a manejar mensajes grandes.

Mi recomendación es que su API debe usar el almacenamiento en la nube Ex S3, y simplemente enviar a Kafka o a cualquier agente de mensajes una referencia de S3. Debe encontrar un lugar para conservar sus datos, tal vez sea una unidad de red, tal vez sea lo que sea, pero no debería ser intermediario de mensajes.

Ahora, si no quieres ir con la solución anterior

El tamaño máximo del mensaje es de 1 MB (la configuración de sus corredores se llama message.max.bytes ) Apache Kafka . Si realmente lo necesitara mucho, podría aumentar eso y asegurarse de aumentar los buffers de red para sus productores y consumidores.

Y si realmente te importa dividir tu mensaje, asegúrate de que cada división de mensajes tenga exactamente la misma clave para que se envíe a la misma partición, y el contenido de tu mensaje debe informar una "identificación de pieza" para que tu consumidor pueda reconstruir completamente el mensaje .

También puede explorar la compresión si su mensaje está basado en texto (gzip, snappy, compresión lz4), lo que puede reducir el tamaño de los datos, pero no de forma mágica.

Nuevamente, debe usar un sistema externo para almacenar esos datos y simplemente enviar una referencia externa a Kafka. Esa es una arquitectura muy común, y una con la que debe ir y ser ampliamente aceptada.

Tenga en cuenta que Kafka funciona mejor solo si los mensajes son enormes en cantidad pero no en tamaño.


Necesitas ajustar tres (o cuatro) propiedades:

  • Lado del consumidor: fetch.message.max.bytes : esto determinará el tamaño más grande de un mensaje que el consumidor puede buscar.
  • Lado del replica.fetch.max.bytes : replica.fetch.max.bytes : esto permitirá que las réplicas en los agentes envíen mensajes dentro del clúster y se aseguren de que los mensajes se repliquen correctamente. Si esto es demasiado pequeño, entonces el mensaje nunca se replicará y, por lo tanto, el consumidor nunca verá el mensaje porque el mensaje nunca se confirmará (replicará completamente).
  • Lado del agente: message.max.bytes : este es el tamaño más grande del mensaje que puede recibir el agente de un productor.
  • Lado del agente (por tema): max.message.bytes : este es el tamaño más grande del mensaje que el agente le permitirá agregar al tema. Este tamaño es validado en pre-compresión. (El valor predeterminado es el message.max.bytes del message.max.bytes .)

Descubrí el camino difícil con el número 2: no recibes NINGUNA excepción, mensaje o advertencia de Kafka, así que asegúrate de tener esto en cuenta cuando estés enviando mensajes grandes.


Se requieren cambios menores para Kafka 0.10 y el nuevo consumidor en comparación con la respuesta de laughing_man :

  • Corredor: sin cambios, todavía necesita aumentar las propiedades message.max.bytes y replica.fetch.max.bytes . message.max.bytes tiene que ser igual o menor (*) que replica.fetch.max.bytes .
  • Productor: Aumente max.request.size para enviar el mensaje más grande.
  • Consumidor: Aumente max.partition.fetch.bytes para recibir mensajes más grandes.

(*) Lea los comentarios para obtener más información sobre message.max.bytes <= replica.fetch.max.bytes


Una cosa clave para recordar es que el atributo message.max.bytes debe estar sincronizado con la propiedad fetch.message.max.bytes del consumidor. el tamaño de recuperación debe ser al menos tan grande como el tamaño máximo del mensaje, de lo contrario, podría haber una situación en la que los productores puedan enviar mensajes más grandes de lo que el consumidor puede consumir / recuperar. Puede que valga la pena echarle un vistazo.
¿Qué versión de Kafka estás usando? También proporcione algunos detalles más traza que está recibiendo. ¿hay algo como ... payload size of xxxx larger than 1000000 en el registro?