猿问

如何在 spring-cloud-stream 中的 kafka 进程拓扑中使用交互式查询?

是否可以在 Spring Cloud Stream 中使用带有 @EnableBinding 注释的类或在带有 @StreamListener 的方法中使用交互式查询(InteractiveQueryService)?我尝试在提供的 KStreamMusicSampleApplication类和处理方法中实例化 ReadOnlyKeyValueStore,但它始终为空。

我的@StreamListener 方法正在侦听一堆KTables 和KStreams,并且在流程拓扑(例如过滤)期间,我必须检查来自KStream 的密钥是否已经存在于特定KTable 中。

我试图弄清楚如何扫描传入的 KTable 以检查密钥是否已经存在但没有运气。然后我遇到了 InteractiveQueryService,它的 get() 方法可用于检查 KTable 中的 state store materializedAs 中是否存在密钥。问题是我无法从流程拓扑(@EnableBinding 或@StreamListener)访问它。它只能从这些注释之外访问,例如 RestController。

有没有办法扫描传入的 KTable 以检查键或值是否存在?如果没有,那么我们可以在流程拓扑中访问 InteractiveQueryService 吗?


PIPIONE
浏览 87回答 1
1回答

莫回无

InteractiveQueryServiceSpring Cloud Stream 中的StreamListener. 正如您所提到的,它应该在您的主要拓扑之外使用。但是,对于您描述的用例,您仍然可以使用主流程中的状态存储。例如,如果您有一个传入KStream和 aKTable被具体化为状态存储,那么您可以调用并process以KStream这种方式访问状态存储。这是一个粗略的代码来实现这一点。您需要将其转换为适合您的特定用例,但这是一个想法。ReadOnlyKeyValueStore<Object, String> store;&nbsp;input.process(() -> new Processor<Object, Product>() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void init(ProcessorContext processorContext) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void process(Object key, Object value) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //find the key&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; store.get(key);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void close() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (state != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state.close();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, "my-store");
随时随地看视频慕课网APP

相关分类

Java
我要回答