我正在开发一个 Kafka 流应用程序,但在弄清楚如何使聚合工作时遇到了一些麻烦。
我有一个 KStream bankTransactions,其中键是类型String,值是类型JsonNode,所以我配置了我的应用程序的 Serdes
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
我想在KTable<String, Long>键相同但值Long将从我的 Json 中提取的值中聚合值。
所以首先我写道:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
我在运行时收到以下错误:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我知道 Kafka 抱怨是因为我正在尝试使用默认的 Json serdes 来序列化Long. 所以从confluent的文档中阅读我尝试了这个
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
但后来我在编译时遇到错误:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
我尝试了不同的方式来编写此代码(例如,使用Serdes.long()而不是 my longSerdes,尝试将类型参数化Materialize,甚至尝试将我的初始化程序和聚合器编写为函数,Java 7 风格),但我无法弄清楚我做错了什么。
aggregate所以我的问题很简单:当它们不是默认的 Serdes 时,如何正确指定应该使用的 Serdes?
倚天杖
相关分类