有没有办法直接从处理器内部将数据发送到 Kafka 主题?

我试图在 Kafka Streams 的帮助下实现以下逻辑:

  1. 听一些来自主题的参考数据,例如。ref-data-topicStateStore从中创建一个全局。

  2. 收听来自另一个主题的消息,data-topic这些消息必须根据 ref 数据进行验证并发送到successerrors主题

下面是示例伪代码:

class SomeProcessor implements Processor<String, String> {


    private KeyValueStore<String, String> refDataStore;


    @Override

    public void init(final ProcessorContext context) {

        refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");

    }


    @Override

    public void process(String key String value) {

        Object refData = refDataStore.get("some_key");


        // business logic here


        if(ok) {

           sendValueToTopic("success");

        } else {

           sendValueToTopic("errors");

        }

    }

}

或者实现这种理想行为的规范方法是什么?


就像我现在想到的另一种方法是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,让客户端处理例如validationStatus接收到的消息。


虽然,我真的很想有一个包含两个主题的解决方案,因为例如,在这种情况下,我可以使用 Kafka Connectsuccess topic直接链接到某个数据存储并以error topic某种方式进行处理。同样,在只有一个主题的方法中,我不知道如何实现这个“store_only_successfully_validated_entities”用例。


有什么想法和建议吗?


猛跑小猪
浏览 118回答 1
1回答
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java