Как отправить итоговый результат агрегации кафка-streamов из windows KTable с временным окном?

Я хотел бы сделать следующее:

  1. Потребляйте записи из темы номеров (Long’s)
  2. Совокупность (подсчет) значений для каждого 5-секундного windows
  3. Отправить результат агрегации FINAL в другую тему

Мой код выглядит так:

KStream longs = builder.stream( Serdes.String(), Serdes.Long(), "longs"); // In one ktable, count by key, on a five second tumbling window. KTable<Windowed, Long> longCounts = longs.countByKey(TimeWindows.of("longCounts", 5000L)); // Finally, sink to the long-avgs topic. longCounts.toStream((wk, v) -> wk.key()) .to("long-counts"); 

Похоже, все работает так, как ожидалось, но агрегаты отправляются в тему назначения для каждой входящей записи. Мой вопрос в том, как я могу отправить только окончательный результат агрегирования для каждого windows?

One Solution collect form web for “Как отправить итоговый результат агрегации кафка-streamов из windows KTable с временным окном?”

В streamах Кафки нет такой вещи, как «конечная агрегация». Окна постоянно открываются, чтобы обрабатывать задерживающиеся записи (конечно, windows не сохраняются навсегда, они отбрасываются до истечения времени их хранения), однако при отбрасывании windows особых действий нет.

Дополнительную информацию см. В документации Confluent: http://docs.confluent.io/current/streams/

Таким образом, для каждого обновления для агрегации создается результирующая запись (потому что streamи Kafka также обновляют результат агрегирования на поздних прибывающих записях). Ваш «конечный результат» будет последним результатом записи (до того, как окно будет отброшено). В зависимости от вашего варианта использования ручное устранение дублирования было бы способом разрешения проблемы (с использованием API нижнего рычага, transform() или process() )

Это сообщение в блоге также может помочь: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

Еще одно сообщение в блоге, рассматривающее этот вопрос без использования пунктуации: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Interesting Posts

Ubuntu перезапускает случайно

Как обойти стирание стилей на Scala? Или, почему я не могу получить параметр типа моих коллекций?

Установка VS2010 SP1. «Функция, которую вы пытаетесь использовать, находится на сетевом ресурсе, который недоступен»

android: перемещение вида при перемещении касания (ACTION_MOVE)

Подключите два компьютера Win 7 напрямую с помощью оптоволокна и без коммутатора

Декартово произведение из 2 списков в Haskell

Вычислить среднее по группе

Одновременно используйте беспроводные и проводные соединения

Когда использовать ref и когда это не нужно в C #

Является ли сокращение максимального использования процессора в Power Options безопасным?

Какие виртуальные машины существуют для Linux помимо VirtualBox?

Как группировать значки в меню «Пуск» Windows 8?

Сканирование Brother 8880DN

Что такое тест на единицу, интеграционный тест, тест на дым, регрессионный тест?

Являются ли дни прохождения const std :: string & в качестве параметра более?

Давайте будем гением компьютера.