猿问

重新使用未提交偏移量的消息

我有一个自定义的Kafka Consumer,用于将一些请求发送到REST API。根据API的响应,我要么提交偏移量,要么跳过不提交的消息。


最小示例:


while (true) {


    ConsumerRecords<String, Object> records = consumer.poll(200);

    for (ConsumerRecord<String, Object> record : records) {


        // Sending a POST request and retrieving the answer

        // ...


        if (responseCode.startsWith("2")) {

            try { 

               consumer.commitSync();

            } catch(CommitFailedException ex) {

              ex.printStackTrace(); 

            }

        } else {

              // Do Nothing

        }

    }

}

现在,当来自REST API的响应未以开头时,2不会提交偏移量,但是不会重新使用该消息。如何强制使用者重新使用未提交的偏移量的邮件?


侃侃无极
浏览 159回答 2
2回答

繁星coding

提交偏移量只是存储使用者当前偏移量(也称为位置)的一种方法。因此,如果它停止了,它(或新的使用者实例将接管)可以找到其先前的位置并从那里重新开始消费。因此,即使您不提交,一旦收到记录,使用者的位置也会被移动。如果要重新使用某些记录,则必须更改使用者的当前位置。在Java客户端中,您可以使用来设置位置seek()。在您的方案中,您可能想计算相对于当前位置的新位置。如果是这样,您可以使用查找当前位置position()。
随时随地看视频慕课网APP

相关分类

Java
我要回答