有没有办法从 Kafka 主题获取最后一条消息?

我有一个具有多个分区的 Kafka 主题,我想知道 Java 中是否有一种方法可以获取该主题的最后一条消息。我不关心分区,我只想获取最新消息。

我已经尝试过@KafkaListener,但它仅在主题更新时才获取消息。如果应用程序打开后没有发布任何内容,则不会返回任何内容。

也许倾听者根本就不是解决问题的正确方法?


潇湘沐
浏览 261回答 2
2回答

牛魔王的故事

以下片段对我有用。你可以试试这个。评论里有解释。&nbsp; &nbsp; &nbsp; &nbsp; KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);&nbsp; &nbsp; &nbsp; &nbsp; consumer.subscribe(Collections.singletonList(topic));&nbsp; &nbsp; &nbsp; &nbsp; consumer.poll(Duration.ofSeconds(10));&nbsp; &nbsp; &nbsp; &nbsp; consumer.assignment().forEach(System.out::println);&nbsp; &nbsp; &nbsp; &nbsp; AtomicLong maxTimestamp = new AtomicLong();&nbsp; &nbsp; &nbsp; &nbsp; AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();&nbsp; &nbsp; &nbsp; &nbsp; // get the last offsets for each partition&nbsp; &nbsp; &nbsp; &nbsp; consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("offset: "+offset);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // seek to the last offset of each partition&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // poll to get the last record in each partition&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; consumer.poll(Duration.ofSeconds(10)).forEach(record -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the latest record in the 'topic' is the one with the highest timestamp&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (record.timestamp() > maxTimestamp.get()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; maxTimestamp.set(record.timestamp());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; latestRecord.set(record);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(latestRecord.get());

森林海

您必须使用每个分区中的最新消息,然后在客户端进行比较(使用消息上的时间戳,如果包含它)。原因是 Kafka 不保证分区间的顺序。在分区内,您可以确定偏移量最大的消息是最新推送到该分区的消息。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java