我使用此配置创建一个新消费者:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": addresses,
"group.id": "my_group",
"auto.offset.reset": "earliest",
})
topic := "testTopic"
if err = c.SubscribeTopics([]string{topic}, nil); err != nil {
panic(err)
}
然后我根据以下代码生成事件并使用一个事件:
events := []map[string]string{
{
"name": "Foo",
},
{
"name": "Bar",
},
}
err = p.ProduceEvent(events[0])//there is a wrapper to produce events
err = p.ProduceEvent(events[1])
res, err := c.ReadMessage(100 * time.Second)
time.Sleep(20 * time.Second)
c.Close()
当我用 描述该组时 watch /home/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my_group --describe。每一步的结果是:
产生事件后:
当我消费一个事件时:
关闭消费者后:
我不明白为什么最后滞后为零!我只消耗了一个事件。这对我来说很奇怪,那Close
会改变偏移量。任何线索表示赞赏。
拉风的咖菲猫
相关分类