我们有以下用例:
从主题读取(预期吞吐量是一个键每 2 秒记录一次),groupByKey 并执行 30 分钟窗口的窗口聚合,跳跃周期为 1 分钟。聚合只是附加收到的记录。
当应用程序启动时,一切正常,但在后期阶段,当聚合大小增加时,应用程序会变慢并滞后
拓扑结构:
KStream<String, Foo> numericStream = builder.stream("topic", Consumed.with(Serdes.String(), FooSerde));
static Duration WINDOW_MS = Duration.ofMinutes(30);
static Duration ADVANCE_MS = Duration.ofMinutes(15);
KStream<Windowed<String>, Foo1> windowedStream = numericStream.peek((key, value) -> System.out.println(value.getDateTime()))
.groupByKey()
.windowedBy((TimeWindows.of(WINDOW_MS).advanceBy(ADVANCE_MS)).grace(Duration.ofMillis(30)))
.aggregate(new Initializer<Foo1>() {
@Override
public Foo1 apply() {
return new Foo1();
}},
(key, value, aggregate) -> {
aggregate.append(value);
return aggregate;
},
Materialized.<String, Foo1, WindowStore<Bytes,byte[]>>as("some_name").withValueSerde(Foo1Serde))
.toStream()
.peek((key, value) -> System.out.println(" Key: "+key+ " Start: "+getISTTime(key.window().start()) + " End: "+ getISTTime(key.window().end()) +" Count: " + value.getCount() ));
每条记录的大小约为20KB。当聚合大小超过 10MB 左右时,记录的处理时间会超过 2 秒,因此会出现滞后。
COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为状态存储应始终与最新数据包保持同步,并且状态存储会被查询并且间隔不同。
如何消除应用程序的延迟,是否与 RocksDB I/O 操作有关?因为计数操作而不是聚合操作没有任何滞后
每个主题有 3 个分区,但是具有相同键的记录会转到同一分区,那么线程/多个实例会有帮助吗?
我们也在考虑在不使用窗口的情况下执行此操作,窗口是否会对较大的聚合产生这种滞后?
慕田峪7331174
相关分类