我试图在 Kafka Streams 的帮助下实现以下逻辑:
听一些来自主题的参考数据,例如。ref-data-topic
并StateStore
从中创建一个全局。
收听来自另一个主题的消息,data-topic
这些消息必须根据 ref 数据进行验证并发送到success
或errors
主题。
下面是示例伪代码:
class SomeProcessor implements Processor<String, String> {
private KeyValueStore<String, String> refDataStore;
@Override
public void init(final ProcessorContext context) {
refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
}
@Override
public void process(String key String value) {
Object refData = refDataStore.get("some_key");
// business logic here
if(ok) {
sendValueToTopic("success");
} else {
sendValueToTopic("errors");
}
}
}
或者实现这种理想行为的规范方法是什么?
就像我现在想到的另一种方法是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,让客户端处理例如validationStatus接收到的消息。
虽然,我真的很想有一个包含两个主题的解决方案,因为例如,在这种情况下,我可以使用 Kafka Connectsuccess topic直接链接到某个数据存储并以error topic某种方式进行处理。同样,在只有一个主题的方法中,我不知道如何实现这个“store_only_successfully_validated_entities”用例。
有什么想法和建议吗?
相关分类