Как отправить большие сообщения с помощью Kafka (более 15 МБ)?

Я отправляю String-сообщения в Kafka V. 0.8 с API Java Producer. Если размер сообщения составляет около 15 МБ, я получаю сообщение MessageSizeTooLargeException . Я попытался установить message.max.bytes на 40 МБ, но я все еще получаю исключение. Маленькие сообщения работали без проблем.

(Исключение появляется у производителя, у меня нет потребителя в этом приложении.)

Что я могу сделать, чтобы избавиться от этого исключения?

Мой пример конфигурации производителя

 private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); } 

Журнал ошибок:

 4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224] kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) at kafka.producer.Producer.send(Unknown Source) at kafka.javaapi.producer.Producer.send(Unknown Source) 

Вам необходимо настроить три (или четыре) свойства:

  • Сторона потребителя: fetch.message.max.bytes – это определяет наибольший размер сообщения, которое может быть выбрано потребителем.
  • Сторона брокера: replica.fetch.max.bytes – это позволит репликам в брокерах отправлять сообщения в кластере и проверять правильность репликации сообщений. Если это слишком мало, тогда сообщение никогда не будет реплицировано, и поэтому потребитель никогда не увидит сообщение, потому что сообщение никогда не будет выполнено (полностью реплицировано).
  • Сторона брокера: message.max.bytes – это самый большой размер сообщения, которое может получить брокер от производителя.
  • Сторона брокера (по теме): max.message.bytes – это самый большой размер сообщения, которое брокер позволит добавить в эту тему. Этот размер проверяется на предварительное сжатие. (По умолчанию сообщение message.max.bytes ).

Я нашел трудный путь по номеру 2 – вы не получаете никаких исключений, сообщений или предупреждений от Kafka, поэтому не забудьте подумать об этом, когда отправляете большие сообщения.

Незначительные изменения, необходимые для Kafka 0.10 и нового потребителя по сравнению с ответом laughing_man :

  • Брокер: никаких изменений, вам все равно необходимо увеличить свойства message.max.bytes и replica.fetch.max.bytes . message.max.bytes должен быть равен или меньше (*), чем replica.fetch.max.bytes .
  • Продюсер: Увеличьте max.request.size чтобы отправить сообщение большего размера.
  • Потребитель. Увеличьте max.partition.fetch.bytes чтобы получать большие сообщения.

(*) Прочитайте комментарии, чтобы узнать больше о message.max.bytes <= replica.fetch.max.bytes

Вам необходимо переопределить следующие свойства:

Брокерские конфигурации ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Конфигурации потребителей ($ KAFKA_HOME / config / consumer.properties)
Этот шаг не помог мне. Я добавляю его в потребительское приложение, и он отлично работает

  • fetch.message.max.bytes

Перезагрузите сервер.

посмотрите эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html

Идея состоит в том, чтобы иметь равный размер сообщения, отправляемого от Kafka Producer в Kafka Broker, а затем полученным Kafka Consumer, т.е.

Производитель Кафки -> Кафка Брокер -> Кафка Потребитель

Предположим, что если требуется отправить 15 МБ сообщения, то Продюсер, Брокер и Потребитель, все три, должны быть синхронизированы.

Kafka Producer отправляет 15 МБ -> Kafka Broker разрешает / хранит 15 MB -> Kafka Consumer получает 15 МБ

Поэтому настройка должна быть равна A.) В брокере: message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640

B.) Потребитель: fetch.message.max.bytes = 15728640

Необходимо помнить, что атрибут message.max.bytes должен быть синхронизирован с свойством fetch.message.max.bytes . размер выборки должен быть как минимум равным максимальному размеру сообщения, иначе может возникнуть ситуация, когда производители могут отправлять сообщения, большие, чем потребитель может потреблять / извлекать. Возможно, стоит взглянуть на это.
Какую версию Kafka вы используете? Также предоставите более подробную информацию о том, что вы получаете. есть ли что-то вроде … payload size of xxxx larger than 1000000 в журнал?

  • Отключено появление и появление предупреждений Java на приложениях Java Web Start
  • как печатать выбранные строки JTable
  • Как изменить формат даты на Java?
  • Редкие матрицы / массивы в Java
  • В Java, как я могу анализировать XML как строку вместо файла?
  • Используйте EL $ {XY} непосредственно в scriptlet
  • Поиск в ArrayList с пользовательскими объектами для определенных строк
  • Java: Синтаксис и смысл за «[B @ 1ef9157»? Binary / адрес?
  • Как отладить приложение Spring Boot с Eclipse?
  • Получите ценность ячейки, как она была представлена ​​в excel
  • Разница между @Before, @BeforeClass, @BeforeEach и @BeforeAll
  • Давайте будем гением компьютера.