昨天,我从日志中发现,在 Kafka 组协调员发起组重新平衡后,kafka 正在重新消费一些消息。这些消息已在两天前被消耗(从日志中确认)。
日志中还报告了另外两个重新平衡,但它们不再重新使用消息。那么为什么第一次重新平衡会导致重新消费消息呢?存在哪些问题?
我正在使用 golang kafka 客户端。这是代码
config := sarama.NewConfig() config.Version = version config.Consumer.Offsets.Initial = sarama.OffsetOldest
并且我们在声明消息之前处理消息,所以我们似乎正在为 kafka 使用“至少发送一次”策略。我们在一台机器上有三个代理,在另一台机器上只有一个消费者线程(go 例程)。
对于这种现象有什么解释吗?我认为这些消息肯定已经提交了,因为它们是在两天前被消耗的,或者为什么 kafka 会在没有提交的情况下保留偏移量超过两天?
消费代码示例:
func (consumer *Consumer) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
realHanlder(message) // consumed data here
session.MarkMessage(message, "") // mark offset
}
return nil
}
添加:
应用程序重新启动后发生重新平衡。还有另外两次重新启动并没有导致重新启动
卡夫卡的配置
log.retention.check.interval.ms=300000
log.retention.hours=168zookeeper.connection.timeout.ms
=
6000group.initial.rebalance.delay.ms=
0delete.topic.enable = true
auto.create.topics .enable=假
ITMISS
相关分类