我有一个自定义的Kafka Consumer,用于将一些请求发送到REST API。根据API的响应,我要么提交偏移量,要么跳过不提交的消息。
最小示例:
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
// Sending a POST request and retrieving the answer
// ...
if (responseCode.startsWith("2")) {
try {
consumer.commitSync();
} catch(CommitFailedException ex) {
ex.printStackTrace();
}
} else {
// Do Nothing
}
}
}
现在,当来自REST API的响应未以开头时,2不会提交偏移量,但是不会重新使用该消息。如何强制使用者重新使用未提交的偏移量的邮件?
繁星coding
相关分类