我有一个接收记录的 Kafka 流,我想根据特定字段连接消息。
流中的消息如下所示:
Key: 2099
Payload{
email: tom@emample.com
eventCode: 2099
}
预期输出:
key: 2099
Payload{
emails: tom@example, bill@acme.com, jane@example.com
}
我可以让溪流运行良好,我只是不确定lamda应该包含什么。
这就是我迄今为止所做的。我不确定我是否应该使用映射,聚合或减少或组合这些操作。
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);
inputStream
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))
// Not sure what to do here …..
}).to (OUTPUT_TOPIC );
莫回无
相关分类