猿问

Kafka Consumer:如何以编程方式从 Go Sarama 中的特定偏移量消费

最近,我开始学习使用kafka工作。我正在从事的项目使用sarama。


为了阅读我使用的消息ConsumerGroup。


foo如果返回,我需要在一段时间后再次阅读该消息false。如何才能做到这一点?


func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {


    for message := range claim.Messages() {


            if ok := foo(message); ok {

                session.MarkMessage(message, "")

            } else {

                // ???

            }


    }


    return nil

}


RISEBY
浏览 141回答 1
1回答

白衣染霜花

Setup()您可以通过在您的消费者组的回调中包含以下内容来将消费者组的偏移量重置为较旧的偏移量:func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {     sess.ResetOffset(topic, partition, offset, "")     return nil}您也可以通过控制台实现相同的目的:kafka-consumer-groups \     --bootstrap-server localhost:9092 \     --group my-consumer-group \     --topic myTopicName \     --reset-offsets \     --to-offfset 100 \     --execute
随时随地看视频慕课网APP

相关分类

Go
我要回答