KStreams:如何获取记录的(原始)主题?

我有以下


//Config setup

Properties props = ...; //setup


List<String> topicList = Arrays.asList({"A", "B", "C"});


StreamBuilder builder = new StreamBuilder();

KStream<String, String> source = builder.stream(topicList);


source

  .map((k,v) -> {


    //How can i get the topic of the record here


  })

  .to((k,v,r) -> {//busy code for topic routing});


new KafkaStream(builder.build(), properties).start();


德玛西亚99
浏览 78回答 1
1回答

慕哥6287543

您可以使用ProcessorContext.topic()获取所需的主题名称。要访问 ProcessorContext,请使用 KStream.process() 为其提供适当的Processor实现。您也可以使用 KStream.transform():KStream<InputKeyType, InputValueType> stream2 = stream.transform(new TransformerSupplier<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>> get() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Transformer<InputKeyType, InputValueType, KeyValue<OutputKeyType, OutputValueType>>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; private ProcessorContext context;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void init(ProcessorContext context) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.context = context;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public KeyValue<OutputKeyType, OutputValueType> transform(InputKeyType key, InputValueType value) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.context.topic() // topic name you need&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // logic here&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new KeyValue<>(OutputKeyType key, OutputValueType value);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void close() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; });
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java