卡夫卡流分组依据和串联

我有一个接收记录的 Kafka 流,我想根据特定字段连接消息。


流中的消息如下所示:


Key: 2099

Payload{

  email: tom@emample.com

  eventCode: 2099

}

预期输出:


key: 2099

Payload{

    emails: tom@example, bill@acme.com, jane@example.com

}

我可以让溪流运行良好,我只是不确定lamda应该包含什么。


这就是我迄今为止所做的。我不确定我是否应该使用映射,聚合或减少或组合这些操作。


final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);


inputStream

        .groupByKey()

        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))


                                  // Not sure what to do here …..


}).to (OUTPUT_TOPIC );


慕后森
浏览 88回答 1
1回答

莫回无

它可能是这样的东西inputStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000))).aggregate(PayloadAggr::new, new Aggregator<String, Payload, PayloadAggr>() {&nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; public PayloadAggr apply(String key, Payload newValue, PayloadAggr result) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result.setKey(key);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(result.getEmails()==null){&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result.setEmails(newValue.getEmail());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }else{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result.setEmails(result.getEmails() + "," + newValue.getEmail());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return result;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }, .../* You serdes and store */}).toStream().to(OUTPUT_TOPIC);
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java