我是 Apache Kafka 的新手。我阅读了一个 Steam 应用程序的代码,偶然发现了聚合操作。我试图自己理解它,如果我的解释正确,我需要确认。
下面提供了从主题和聚合中读取的代码片段,
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder builder = new KStreamBuilder();
// read from the topic 'bank-transactions' as `KStream`. I provided the producer below
KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
// we define the grouping and aggregation here
KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
);
bank-transactions主题的数据流产生如下,
public static ProducerRecord<String, String> newRandomTransaction(String name) {
// creates an empty json {}
ObjectNode transaction = JsonNodeFactory.instance.objectNode();
Integer amount = ThreadLocalRandom.current().nextInt(0, 100);
// Instant.now() is to get the current time using Java 8
Instant now = Instant.now();
// we write the data to the json document
transaction.put("name", name);
transaction.put("amount", amount);
transaction.put("time", now.toString());
return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
我有两个关于分组和聚合的问题,
一种。是否按groupByKey分组,Serdes.String()并且jsonSerde仅对 Steam 数据执行序列化和反序列化?该Serdes.String()是在名称字符串newRandomTransaction的方法。
湾 我的断言是该行的key, transaction内部aggregation功能(key, transaction, balance) -> newBalance(transaction, balance) 是从bank-transactions主题中读取的,而该balance功能来自initialBalance上一行。那是对的吗?
尽管它无缝运行,但我在尝试调试该应用程序时也感到困惑。
弑天下
相关分类