森栏
编辑:正如评论中所建议的,需要另外三个步骤:该Transformer状态存储中必须明确地保存其状态。它将从 中获取对状态存储的引用ProcessorContext,并在init方法中传递该引用。国家商店必须在 StreamsBuilder必须在transform方法中传递状态存储的名称。在这个例子中,存储我们看到的最后一条消息就足够了。我们KeyValueStore为此使用 a ,它在每个时间点都有零个或一个条目。public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> { private String storeName; public PairTransformerSupplier(String storeName) { this.storeName = storeName; } @Override public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() { return new PairTransformer<>(storeName); }}public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> { private ProcessorContext context; private String storeName; private KeyValueStore<Integer, V> stateStore; public PairTransformer(String storeName) { this.storeName = storeName; } @Override public void init(ProcessorContext context) { this.context = context; stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName); } @Override public KeyValue<K, Pair<V, V>> transform(K key, V value) { // 1. Update the store to remember the last message seen. if (stateStore.get(1) == null) { stateStore.put(1, value); return null; } KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value)); stateStore.put(1, null); return result; } @Override public void close() { }}public KStream<String, String> sampleStream(StreamsBuilder builder) { KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); // 2. Create the state store and register it with the streams builder. KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName); StoreBuilder storeBuilder = new KeyValueStoreBuilder<>( store, new Serdes.IntegerSerde(), new Serdes.StringSerde(), Time.SYSTEM ); builder.addStateStore(storeBuilder); transformToPairs(messages); return messages;}private void transformToPairs(KStream<String, String> messages) { // 3. reference the name of the state store when calling transform(...) KStream<String, Pair<String, String>> pairs = messages.transform( new PairTransformerSupplier<>(), stateStoreName ); KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null); KStream<String, String> serialized = filtered.mapValues(Pair::toString); serialized.to(outputTopic);}可以使用控制台消费者观察状态存储的变化:./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092完整源代码在这里:https : //github.com/1123/spring-kafka-stream-with-state-store原答案:org.apache.kafka.streams.kstream.ValueMapper接口的 JavaDoc声明它用于无状态的逐条记录转换,而org.apache.kafka.streams.kstream.Transformer另一方面,接口是用于将输入记录有状态映射到零、一个或多个新输出记录。因此我猜这个Transformer接口是收集消息对的合适选择。这可能仅在流应用程序出现故障和重新启动的情况下才有意义,以便它们可以从 Kafka 恢复状态。因此,这是基于org.apache.kafka.streams.kstream.Transformer接口的另一种解决方案:class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> { @Override public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() { return new PairTransformer<>(); }}public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> { private V left; @Override public void init(ProcessorContext context) { left = null; } @Override public KeyValue<K, Pair<V, V>> transform(K key, V value) { if (left == null) { left = value; return null; } KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value)); left = null; return result; } @Override public KeyValue<K, Pair<V, V>> punctuate(long timestamp) { return null; } public void close() { }}然后按如下方式使用 PairTransformerSupplier:private void accumulateTwo(KStream<String, String> messages) { messages.transform(new PairTransformerSupplier<>()) .filter((key, value) -> value != null) .mapValues(Pair::toString) .to("pairs");}但是,在具有单个分区的主题的单个进程中尝试这两种解决方案会产生完全相同的结果。我还没有尝试过具有多个分区和多个流消费者的主题。