猿问

这种聚合在 Kafka 流中是如何工作的?

我是 Apache Kafka 的新手。我阅读了一个 Steam 应用程序的代码,偶然发现了聚合操作。我试图自己理解它,如果我的解释正确,我需要确认。


下面提供了从主题和聚合中读取的代码片段,


// json Serde

final Serializer<JsonNode> jsonSerializer = new JsonSerializer();

final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();

final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);



KStreamBuilder builder = new KStreamBuilder();


// read from the topic 'bank-transactions' as `KStream`. I provided the producer below 

KStream<String, JsonNode> bankTransactions = builder.stream(Serdes.String(), jsonSerde, "bank-transactions");


// we define the grouping and aggregation here 

KTable<String, JsonNode> bankBalance = bankTransactions.groupByKey(Serdes.String(), jsonSerde)

    .aggregate(

            () -> initialBalance,

            (key, transaction, balance) -> newBalance(transaction, balance),

            jsonSerde,

            "bank-balance-agg"

    );

bank-transactions主题的数据流产生如下,


public static ProducerRecord<String, String> newRandomTransaction(String name) {

    // creates an empty json {}

    ObjectNode transaction = JsonNodeFactory.instance.objectNode();


    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);


    // Instant.now() is to get the current time using Java 8

    Instant now = Instant.now();


    // we write the data to the json document

    transaction.put("name", name);

    transaction.put("amount", amount);

    transaction.put("time", now.toString());


    return new ProducerRecord<>("bank-transactions", name, transaction.toString());

}


我有两个关于分组和聚合的问题,


一种。是否按groupByKey分组,Serdes.String()并且jsonSerde仅对 Steam 数据执行序列化和反序列化?该Serdes.String()是在名称字符串newRandomTransaction的方法。


湾 我的断言是该行的key, transaction内部aggregation功能(key, transaction, balance) -> newBalance(transaction, balance) 是从bank-transactions主题中读取的,而该balance功能来自initialBalance上一行。那是对的吗?


尽管它无缝运行,但我在尝试调试该应用程序时也感到困惑。


郎朗坤
浏览 189回答 1
1回答

弑天下

groupByKey 是否按 Serdes.String() 分组,而 jsonSerde 仅对 Steam 数据执行序列化和反序列化?是的,groupByKey 是按键分组,这些键可以反序列化并作为字符串进行比较我的断言是行 (key, transaction, balance) -> newBalance(transaction, balance) 的聚合函数中的关键,交易是从银行交易主题中读取的,余额来自上一行的 initialBalance几乎。初始化器在第一个参数上,是的,但是聚合的结果在应用程序的整个执行过程中都被结转,无休止地聚合。换句话说,您从initialBalancealways开始,然后对于每个相同的键,将其transaction余额添加到该balance键的当前累积值中。如果您还没有看到重复的密钥,那么它才会被添加到初始余额中是的,您的输入主题是由 KStreamsbuilder.stream方法指定的
随时随地看视频慕课网APP

相关分类

Java
我要回答