具有 confluent-kafka-go 更改偏移量的 kafka 消费者

我使用此配置创建一个新消费者:


c, err := kafka.NewConsumer(&kafka.ConfigMap{

                 "bootstrap.servers": addresses,

                 "group.id":          "my_group",

                 "auto.offset.reset": "earliest",

         })

topic := "testTopic"

if err = c.SubscribeTopics([]string{topic}, nil); err != nil {

    panic(err)

}

然后我根据以下代码生成事件并使用一个事件:


events := []map[string]string{

{                                     

        "name":       "Foo",

},                                                 

{                                                  

        "name":       "Bar",                                                       

},                                                 

}                                                          

                                                                                                    

err = p.ProduceEvent(events[0])//there is a wrapper to produce events       

err = p.ProduceEvent(events[1])                                    

                                                                                   

res, err := c.ReadMessage(100 * time.Second)                                              

time.Sleep(20 * time.Second)                                                       

                                                                                                              

c.Close()                                  


当我用 描述该组时  watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group  --describe。每一步的结果是:


  1. 产生事件后: http://img.mukewang.com/63b3f5390001725014570113.jpg

  2. 当我消费一个事件时: http://img2.mukewang.com/63b3f54100010fc014530114.jpg

  3. 关闭消费者后: http://img.mukewang.com/63b3f5470001083d12080104.jpg

我不明白为什么最后滞后为零!我只消耗了一个事件。这对我来说很奇怪,那Close会改变偏移量。任何线索表示赞赏。


慕妹3146593
浏览 233回答 1
1回答

拉风的咖菲猫

ReadMessage包裹Poll。Poll获取一批消息并在本地缓冲它们。由于您已将消费者配置为自动提交偏移量,因此它将提交所有获取的消息,甚至是那些在本地缓存且您的应用程序仍未处理的消息。这就是为什么您看到关闭消费者后没有延迟。librdkafka(因此confluent-kafka-go)没有办法配置max.pool.records,所以如果你想准确控制哪些偏移量被提交,你需要禁用自动提交偏移量并使用手动提交它们StoreOffsets:https ://github.com/confluentinc/confluent- kafka-go/issues/380#issuecomment-539903016
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go