只是为了澄清,我是卡夫卡的新手,很抱歉,如果我的问题似乎没有记录,我正在阅读教程、文档和我能理解的一切。
我试图从 GlobalStore 读取所有值以更新其值,然后使用已存在的 StateStore 来放置这些新的更新值。
我正在尝试这样做,因为当我这样做时:
this.stateStore.all();
我只有1/10的数据,如果我理解正确的话,这是因为我有10个分区,而ss,只读取一个(虽然我不太明白为什么)
这是我的全局表:
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
getInputTopic(),
getDataTopic(),
getToEsTopic());
builder.globalTable(
getDataTopic(),
Consumed.with(Serdes.String(), fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
这是 addStateStore,我无法删除它,因为它在代码的其他地方使用:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),
Serdes.String(),
fooSerdes));
...
return builder;
}
因此,从理论上讲,我想做的是删除也使用相同主题的 StateStore,并使用我的 data.process 主题之一放置我的数据,问题是该处理器使用此 StateStore 执行其他操作,所以我不能用核武器攻击它。
我在这里迷路了,任何光都会有很大帮助。谢谢 !
胡说叔叔
相关分类