我有一个 akka 应用程序(在 JAVA 中),用于commitablePartitionedSource
使用来自 kafka 主题的消息。我有几个消费者群体,可以为多个主题吸引消费者。这是由动态配置驱动的,我可以在其中暂时关闭消费者,并可能在稍后重新启动它们。
当这个消费者重新启动时,我只想阅读新消息,而不是从我离开的地方开始。
有没有办法从 akka-alpakka 消费者那里获取 kafkaConsumer 对象,以便我可以在处理之前使用 seekToEnd()?请让我知道是否还有其他方法可以实现这一目标?也许使用 akka 配置或不同类型的消费者?我不想维护自己的偏移量(希望不是唯一的选择)
我的配置设置为latest
在我启动消费者组时获取偏移量,但由于我正在关闭并重新启动单个消费者,它总是从我停止的地方开始消费。
我尝试为一个主题创建一个消费者组,但我有很多主题,而且结果非常耗费资源。我还寻找一种方法来清除存储在 kafka 中的该主题的偏移量,但没有成功。
叮当猫咪
明月笑刀无情
相关分类