猿问

如何发送时间窗口KTable的最终Kafka流聚合结果?

我想做的是:

  1. 使用数字主题(Long‘s)中的记录
  2. 汇总(计数)每5秒窗口的值
  3. 将最终聚合结果发送到另一个主题

我的代码看起来如下:

KStream<String, Long> longs = builder.stream(
        Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
        longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
        .to("long-counts");

看起来一切都像预期的那样工作,但是聚合被发送到每个传入记录的目标主题。我的问题是如何只发送每个窗口的最终聚合结果?


慕神8447489
浏览 916回答 2
2回答

慕哥9229398

:Kafka流即将推出的特性将为您提供一个配置选项(可以配置其大小的缓冲区/缓存)来控制Kafka流的下游/输出数据速率。如果设置更大的缓冲区大小,则会合并更多下游更新,从而降低下游速率。&nbsp;
随时随地看视频慕课网APP
我要回答