使用 kafka 流根据消息密钥将消息发送到主题

我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题。前任。Kafka 中的流包含名称作为键,记录作为值。我想根据记录的键将这些记录分散到不同的主题

数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),预期

  • topic1 :名称: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})

  • 主题2:sean->(sean -> {seansRecord})

  • 主题3:玛丽 ->(玛丽 -> {marysRecord})

下面是我现在执行此操作的方式,但由于名称列表是 hudge,所以速度很慢。另外,即使记录了几个名字,我也需要遍历整个列表请提出修复建议

 for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    }


跃然一笑
浏览 134回答 3
3回答

神不在的星期二

我认为你应该使用KStream::to(final TopicNameExtractor<K, V> topicExtractor)函数。它使您能够计算每条消息的主题名称。示例代码:final&nbsp;KStream<String,&nbsp;String>&nbsp;stream&nbsp;=&nbsp;???; stream.to((key,&nbsp;value,&nbsp;recordContext)&nbsp;->&nbsp;key);

墨色风雨

如果您需要为每个用户生成聚合数据,则无需为每个用户写入单独的主题。您最好在源流上编写聚合。这样,您就不会最终得到每个键一个主题,但您仍然可以独立地对每个用户运行操作。Serde<UserRecord> recordSerde = ... KStream<Stream, UserAggregate> aggregateByName = recordsByName    .groupByKey(Grouped.with(Serdes.String(), recordSerde))    .aggregate(...)    .toStream()这种方法将扩展到数百万用户,这是您目前无法通过每个用户一个主题的方法实现的。

慕田峪9158850

我想你正在寻找的是KStream#branch。以下未经测试,但显示了总体思路// get a list of predicates to branch a topic onfinal List<String> names = Arrays.asList("jhon", "sean", "mary");final Predicate[] predicates = names.stream()    .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))    .toArray(Predicate[]::new);// example inputfinal KStream<Object, Object> stream = new StreamsBuilder().stream("names");// split the topicKStream<String, Object>[] branches = stream.branch(predicates);for (int i = 0; i < names.size(); i++) {    branches[i].to(names.get(i));}// KStream branches[0] contains all records whose keys are "jhon"// KStream branches[1] contains all records whose keys are "sean"...
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java