如何获取时间戳指定的kafka偏移数据

当我尝试运行时抛出空指针错误时,我试图根据时间戳获取 Kafka 主题的偏移量,


Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();

              for (TopicPartition partition : partitions) {

                timestampsToSearch.put(partition,  startTimestamp);

              }

Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);

              for (TopicPartition partition : partitions) {

                Long seekOffset = outOffsets.get(partition).offset();

consumer.seek(partition, seekOffset);

任何帮助将不胜感激。


慕桂英4014372
浏览 445回答 2
2回答

杨__羊羊

要找到与时间戳对应的偏移量,您需要使用 方法offsetsForTimes()。例如,这将打印mytopic对应于 1 秒前的分区 0 的偏移量:try&nbsp;(KafkaConsumer<String,&nbsp;String>&nbsp;consumer&nbsp;=&nbsp;new&nbsp;KafkaConsumer<>(configs);)&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;Map<TopicPartition,&nbsp;Long>&nbsp;timestamps&nbsp;=&nbsp;new&nbsp;HashMap<>(); &nbsp;&nbsp;&nbsp;&nbsp;timestamps.put(new&nbsp;TopicPartition("mytopic",&nbsp;0),&nbsp;System.currentTimeMillis()-1*1000); &nbsp;&nbsp;&nbsp;&nbsp;Map<TopicPartition,&nbsp;OffsetAndTimestamp>&nbsp;offsets&nbsp;=&nbsp;consumer.offsetsForTimes(timestamps); &nbsp;&nbsp;&nbsp;&nbsp;System.err.println(offsets); }这将显示如下内容:{offset-test-0=(timestamp=1561469319192,&nbsp;leaderEpoch=0,&nbsp;offset=100131)}

白猪掌柜的

您可以Admin.listOffsets使用OffsetSpec.forTimestamp:Map<TopicPartition, OffsetSpec> topicOffsetSpecs = new HashMap<>();TopicPartition topicPartition = new TopicPartition("topic1", 0);OffsetSpec offsetSpec = OffsetSpec.forTimestamp(timestamp);topicOffsetSpecs.put(topicPartition, offsetSpec);admin.listOffsets(topicOffsetSpecs).all().get(); // Info for given timestamp
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java