猿问

Kafka-Streaming:如何收集消息对并写入新主题

这是kafka-streaming的初学者问题。


您将如何使用 java kafka-streaming 库收集消息对并将它们写入新的输出主题?


我在想这样的事情:


private void accumulateTwo(KStream<String, String> messages) {

    Optional<String> accumulator = Optional.empty();

    messages.mapValues(value -> {

        if (accumulator.isPresent()) {

            String tmp = accumulator.get();

            accumulator = Optional.empty();

            return Optional.of(new Tuple<>(tmp, value));

        }

        else {

            accumulator = Optional.of(value);

            return Optional.empty();

        }

    }).filter((key, value) -> value.isPresent()).to("pairs");

然而这行不通,因为 Java Lambda 表达式中的变量必须是 final 的。


有任何想法吗?


哆啦的时光机
浏览 102回答 2
2回答

森栏

编辑:正如评论中所建议的,需要另外三个步骤:该Transformer状态存储中必须明确地保存其状态。它将从 中获取对状态存储的引用ProcessorContext,并在init方法中传递该引用。国家商店必须在&nbsp;StreamsBuilder必须在transform方法中传递状态存储的名称。在这个例子中,存储我们看到的最后一条消息就足够了。我们KeyValueStore为此使用 a&nbsp;,它在每个时间点都有零个或一个条目。public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {&nbsp; &nbsp; private String storeName;&nbsp; &nbsp; public PairTransformerSupplier(String storeName) {&nbsp; &nbsp; &nbsp; &nbsp; this.storeName = storeName;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {&nbsp; &nbsp; &nbsp; &nbsp; return new PairTransformer<>(storeName);&nbsp; &nbsp; }}public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {&nbsp; &nbsp; private ProcessorContext context;&nbsp; &nbsp; private String storeName;&nbsp; &nbsp; private KeyValueStore<Integer, V> stateStore;&nbsp; &nbsp; public PairTransformer(String storeName) {&nbsp; &nbsp; &nbsp; &nbsp; this.storeName = storeName;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void init(ProcessorContext context) {&nbsp; &nbsp; &nbsp; &nbsp; this.context = context;&nbsp; &nbsp; &nbsp; &nbsp; stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public KeyValue<K, Pair<V, V>> transform(K key, V value) {&nbsp; &nbsp; &nbsp; &nbsp; // 1. Update the store to remember the last message seen.&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; if (stateStore.get(1) == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stateStore.put(1, value); return null;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value));&nbsp; &nbsp; &nbsp; &nbsp; stateStore.put(1, null);&nbsp; &nbsp; &nbsp; &nbsp; return result;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void close() { }}public KStream<String, String> sampleStream(StreamsBuilder builder) {&nbsp; &nbsp; KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));&nbsp; &nbsp; // 2. Create the state store and register it with the streams builder.&nbsp;&nbsp; &nbsp; KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName);&nbsp; &nbsp; StoreBuilder storeBuilder = new KeyValueStoreBuilder<>(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; store,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Serdes.IntegerSerde(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Serdes.StringSerde(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Time.SYSTEM&nbsp; &nbsp; );&nbsp; &nbsp; builder.addStateStore(storeBuilder);&nbsp; &nbsp; transformToPairs(messages);&nbsp; &nbsp; return messages;}private void transformToPairs(KStream<String, String> messages) {&nbsp; &nbsp; // 3. reference the name of the state store when calling transform(...)&nbsp; &nbsp; KStream<String, Pair<String, String>> pairs = messages.transform(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new PairTransformerSupplier<>(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stateStoreName&nbsp; &nbsp; );&nbsp; &nbsp; KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null);&nbsp; &nbsp; KStream<String, String> serialized = filtered.mapValues(Pair::toString);&nbsp; &nbsp; serialized.to(outputTopic);}可以使用控制台消费者观察状态存储的变化:./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092完整源代码在这里:https : //github.com/1123/spring-kafka-stream-with-state-store原答案:org.apache.kafka.streams.kstream.ValueMapper接口的 JavaDoc声明它用于无状态的逐条记录转换,而org.apache.kafka.streams.kstream.Transformer另一方面,接口是用于将输入记录有状态映射到零、一个或多个新输出记录。因此我猜这个Transformer接口是收集消息对的合适选择。这可能仅在流应用程序出现故障和重新启动的情况下才有意义,以便它们可以从 Kafka 恢复状态。因此,这是基于org.apache.kafka.streams.kstream.Transformer接口的另一种解决方案:class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {&nbsp; &nbsp; @Override&nbsp; &nbsp; public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {&nbsp; &nbsp; &nbsp; &nbsp; return new PairTransformer<>();&nbsp; &nbsp; }}public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {&nbsp; &nbsp; private V left;&nbsp; &nbsp; @Override&nbsp; &nbsp; public void init(ProcessorContext context) {&nbsp; &nbsp; &nbsp; &nbsp; left = null;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public KeyValue<K, Pair<V, V>> transform(K key, V value) {&nbsp; &nbsp; &nbsp; &nbsp; if (left == null) { left = value; return null; }&nbsp; &nbsp; &nbsp; &nbsp; KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value));&nbsp; &nbsp; &nbsp; &nbsp; left = null;&nbsp; &nbsp; &nbsp; &nbsp; return result;&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public KeyValue<K, Pair<V, V>> punctuate(long timestamp) {&nbsp; &nbsp; &nbsp; &nbsp; return null;&nbsp; &nbsp; }&nbsp; &nbsp; public void close() { }}然后按如下方式使用 PairTransformerSupplier:private void accumulateTwo(KStream<String, String> messages) {&nbsp; &nbsp; messages.transform(new PairTransformerSupplier<>())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .filter((key, value) -> value != null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .mapValues(Pair::toString)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .to("pairs");}但是,在具有单个分区的主题的单个进程中尝试这两种解决方案会产生完全相同的结果。我还没有尝试过具有多个分区和多个流消费者的主题。

慕妹3146593

您应该能够编写一个累加器类class Accumulator implements ValueMapper<String, Optional<Tuple<String>>> {&nbsp; &nbsp; private String key;&nbsp; &nbsp; public Optional<Tuple<String>> get(String item) {&nbsp; &nbsp; &nbsp; &nbsp; if (key == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; key = item;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return Optional.empty();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; Optional<Tuple<String>> result = Optional.of(new Tuple<>(key, item));&nbsp; &nbsp; &nbsp; &nbsp; key = null;&nbsp; &nbsp; &nbsp; &nbsp; return result;&nbsp; &nbsp; }}然后处理messages.mapValues(new Accumulator())&nbsp; &nbsp; &nbsp; &nbsp; .filter(Optional::isPresent) // I don't think your filter is correct&nbsp; &nbsp; &nbsp; &nbsp; .to("pairs");
随时随地看视频慕课网APP

相关分类

Java
我要回答