如何处理 Kafka Streams 中的不同时区?

因此,我正在评估 Kafka Streams 以及它可以做些什么来查看它是否适合我的用例,因为我需要每隔 15 分钟、每小时、每天聚合一次传感器的数据,并且由于它的 Windowing 功能而发现它很有用。因为我可以通过应用创建窗口,windowedBy()KGroupedStream问题是窗口是在 UTC 中创建的,我希望我的数据按其原始时区而不是按 UTC 时区分组,因为它阻碍了聚合,所以任何人都可以帮助我解决这个问题。



HUWWW
浏览 214回答 2
2回答

明月笑刀无情

您可以使用自定义“移动”时间戳TimestampExtractor- 在将结果写回输出主题之前,您可以使用 aTransformer并通过context.forward(key, value, To.all().withTimestamps()).功能请求票:https ://issues.apache.org/jira/browse/KAFKA-7911

侃侃无极

因此,为了解决这个问题,我创建了自定义TimestampExtractor并使用它来更改流窗口创建时间以记录来自有效负载的时间,如下所示。public class RecordTimeStampExtractor implements TimestampExtractor {&nbsp; &nbsp; @Override&nbsp; &nbsp; public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {&nbsp; &nbsp; &nbsp; &nbsp; JsonObject data = (JsonObject) new JsonParser().parse(record.value().toString());&nbsp; &nbsp; &nbsp; &nbsp; Timestamp recordTimestamp = Timestamp.valueOf(data.get(Constant.SLOT).getAsString());&nbsp; &nbsp; &nbsp; &nbsp; return recordTimestamp.getTime();&nbsp; &nbsp; }}所以现在我已经用我的本地时区测试了它,因为昨天是 IST 05:30,它的工作正常,kafka 流也正在根据记录时间戳创建窗口。也将使用其他时区进行测试并更新答案
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java