我已经实现了一个简单的 Kafka 死信记录处理器。
使用控制台生产者生成的记录时,它可以完美运行。
但是我发现我们的 Kafka Streams 应用程序不能保证为接收器主题生成记录,每生成一条记录,偏移量就会增加 1。
死信处理器背景:
我有一个场景,在发布处理它所需的所有数据之前可能会收到记录。当流应用程序不匹配记录以进行处理时,它们将移动到死信主题,而不是继续向下流。当新数据发布时,我们将来自死信主题的最新消息转储回流应用程序的源主题,以便使用新数据进行重新处理。
死信处理器:
在运行应用程序开始时记录每个分区的结束偏移量
结束偏移量标记停止处理给定死信主题的记录的点,以避免在重新处理的记录返回死信主题时出现无限循环。
应用程序从上次通过消费者组运行产生的最后一个偏移量恢复。
应用程序正在使用事务并KafkaProducer#sendOffsetsToTransaction
提交最后产生的偏移量。
为了跟踪我的范围内的所有记录何时为主题的分区处理,我的服务将其最后产生的从生产者的偏移量与消费者保存的结束偏移量映射进行比较。当我们到达结束偏移量时,消费者通过暂停该分区KafkaConsumer#pause
,当所有分区都暂停时(意味着它们达到保存的结束偏移量)然后调用它退出。
偏移量和消费者位置 Kafka 为分区中的每条记录维护一个数字偏移量。此偏移量充当该分区内记录的唯一标识符,并且还表示消费者在分区中的位置。例如,位置 5 的消费者已经消费了偏移量为 0 到 4 的记录,接下来将接收偏移量为 5 的记录。
Kafka Producer API引用下一个偏移量也总是 +1。
将指定偏移量列表发送给消费者组协调器,并将这些偏移量标记为当前事务的一部分。仅当事务成功提交时,这些偏移量才会被视为已提交。提交的偏移量应该是您的应用程序将使用的下一条消息,即 lastProcessedMessageOffset + 1。
但是您可以在我的调试器中清楚地看到,单个分区消耗的记录不是一次递增 1...
我想这可能是一个 Kafka 配置问题,max.message.bytes
但没有一个真正有意义。然后我想也许是因为加入,但没有看到任何方式会改变制片人的运作方式。
不确定它是否相关,但我们所有的 Kafka 应用程序都在使用 Avro 和 Schema Registry...
无论生产方法如何,偏移量是否应该始终递增 1,或者使用 Kafka 流 API 是否可能无法提供与普通生产者消费者客户端相同的保证?
有什么我完全想念的吗?
繁花不似锦
万千封印
相关分类