Как отправить большие сообщения с помощью Kafka (более 15 МБ)?
Я отправляю String-сообщения в Kafka V. 0.8 с API Java Producer. Если размер сообщения составляет около 15 МБ, я получаю сообщение MessageSizeTooLargeException
. Я попытался установить message.max.bytes
на 40 МБ, но я все еще получаю исключение. Маленькие сообщения работали без проблем.
(Исключение появляется у производителя, у меня нет потребителя в этом приложении.)
Что я могу сделать, чтобы избавиться от этого исключения?
- Повернуть изображение вокруг символа (JAVA)
- Java: установить таймаут на определенный блок кода?
- возможно ли выполнить ответы JSON только с JDK или HttpComponents?
- Прямоугольное соответствие Regex - Java
- Как получить текущую дату / время в Java
Мой пример конфигурации производителя
private ProducerConfig kafkaConfig() { Properties props = new Properties(); props.put("metadata.broker.list", BROKERS); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); props.put("message.max.bytes", "" + 1024 * 1024 * 40); return new ProducerConfig(props); }
Журнал ошибок:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException 5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224] kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) at kafka.producer.Producer.send(Unknown Source) at kafka.javaapi.producer.Producer.send(Unknown Source)
- Вычислить прошедшее время в Java / Groovy
- Можно ли отключить HttpSession в web.xml?
- FixedThreadPool против CachedThreadPool: меньшее из двух зол
- Eclipse не будет открываться из-за переменных среды
- Как подключить несколько файлов к электронной почте с помощью JavaMail?
- iText / BouncyCastle ClassNotFound org.bouncycastle.asn1.DEREncodable и org.bouncycastle.tsp.TimeStampTokenInfo
- Предоставление имени JMenuItem его ActionListener
- Как понимать происходит - до согласованного
Вам необходимо настроить три (или четыре) свойства:
- Сторона потребителя:
fetch.message.max.bytes
– это определяет наибольший размер сообщения, которое может быть выбрано потребителем. - Сторона брокера:
replica.fetch.max.bytes
– это позволит репликам в брокерах отправлять сообщения в кластере и проверять правильность репликации сообщений. Если это слишком мало, тогда сообщение никогда не будет реплицировано, и поэтому потребитель никогда не увидит сообщение, потому что сообщение никогда не будет выполнено (полностью реплицировано). - Сторона брокера:
message.max.bytes
– это самый большой размер сообщения, которое может получить брокер от производителя. - Сторона брокера (по теме):
max.message.bytes
– это самый большой размер сообщения, которое брокер позволит добавить в эту тему. Этот размер проверяется на предварительное сжатие. (По умолчанию сообщениеmessage.max.bytes
).
Я нашел трудный путь по номеру 2 – вы не получаете никаких исключений, сообщений или предупреждений от Kafka, поэтому не забудьте подумать об этом, когда отправляете большие сообщения.
Незначительные изменения, необходимые для Kafka 0.10 и нового потребителя по сравнению с ответом laughing_man :
- Брокер: никаких изменений, вам все равно необходимо увеличить свойства
message.max.bytes
иreplica.fetch.max.bytes
.message.max.bytes
должен быть равен или меньше (*), чемreplica.fetch.max.bytes
. - Продюсер: Увеличьте
max.request.size
чтобы отправить сообщение большего размера. - Потребитель. Увеличьте
max.partition.fetch.bytes
чтобы получать большие сообщения.
(*) Прочитайте комментарии, чтобы узнать больше о message.max.bytes
<= replica.fetch.max.bytes
Вам необходимо переопределить следующие свойства:
Брокерские конфигурации ($ KAFKA_HOME / config / server.properties)
- replica.fetch.max.bytes
- message.max.bytes
Конфигурации потребителей ($ KAFKA_HOME / config / consumer.properties)
Этот шаг не помог мне. Я добавляю его в потребительское приложение, и он отлично работает
- fetch.message.max.bytes
Перезагрузите сервер.
посмотрите эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html
Идея состоит в том, чтобы иметь равный размер сообщения, отправляемого от Kafka Producer в Kafka Broker, а затем полученным Kafka Consumer, т.е.
Производитель Кафки -> Кафка Брокер -> Кафка Потребитель
Предположим, что если требуется отправить 15 МБ сообщения, то Продюсер, Брокер и Потребитель, все три, должны быть синхронизированы.
Kafka Producer отправляет 15 МБ -> Kafka Broker разрешает / хранит 15 MB -> Kafka Consumer получает 15 МБ
Поэтому настройка должна быть равна A.) В брокере: message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640
B.) Потребитель: fetch.message.max.bytes = 15728640
Необходимо помнить, что атрибут message.max.bytes
должен быть синхронизирован с свойством fetch.message.max.bytes
. размер выборки должен быть как минимум равным максимальному размеру сообщения, иначе может возникнуть ситуация, когда производители могут отправлять сообщения, большие, чем потребитель может потреблять / извлекать. Возможно, стоит взглянуть на это.
Какую версию Kafka вы используете? Также предоставите более подробную информацию о том, что вы получаете. есть ли что-то вроде … payload size of xxxx larger than 1000000
в журнал?