我们有一个由 3 个盒子组成的 2.3 Kafka 集群。当我们几天前将它升级到 2.3 时,我们注意到那些日志消息导致两个代理上的一个主题分区的 replicaFetcher 线程崩溃:
[2019-08-09 15:02:43,520] ERROR [ReplicaFetcher replicaId=4, leaderId=3, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-21 at offset 57542337 (kafka.server.R
eplicaFetcherThread)
kafka.common.UnexpectedAppendOffsetException: Unexpected offset in append to __consumer_offsets-21. First offset 57542333 is less than the next offset 57542337. First 10 offsets in append: List(57542333,
57542334, 57542335, 57542336, 57542337, 57542338, 57542339, 57542340, 57542341, 57542342), last offset in append: 57570869. Log start offset = 56949140
at kafka.log.Log.$anonfun$append$2(Log.scala:929)
at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
影响是一个 broker 不能成为这个主题分区的 ISR(实际上第二个 broker 有同样的问题,所以我们只有一个 ISR,它是领导者)。
我仍然对这条消息感到困惑,我无法正确理解它,所以我无法找到解决此问题的正确方法。我真的很想了解这里发生了什么,但不确定我是否理解以下代码:
if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
当 replicaFetcher 必须附加记录时,它如何访问 nextOffset 信息..?不确定这个分析到底做了什么(要追加的当前记录?):
val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
和这个 :
nextOffsetMetadata
这是下一批记录吗?它如何访问任何“下一个”记录元数据?
如果有人可以澄清这一点,那就太好了。与此同时,解决这个问题的方法会很好,但我还是更愿意清楚地理解它。
编辑:
经过一些研究,有些事情变得更加清晰。nextOffset 只是活动段的最新偏移量+1(这些元数据来自 loadSegments() 调用)。
综上所述,这里发生了什么:副本从领导者那里获取段,其起始偏移量低于活动段的最新偏移量。所以这是我的问题,为什么副本不只是截断?
慕容708150
湖上湖
相关分类