aluckdog
你看过 Kafka Streams 单元测试 [1] 吗?这是关于输入数据并使用模拟处理器检查最终结果。例如对于以下流连接: stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); joined = stream1.outerJoin( stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); joined.process(supplier);然后,您可以开始将输入项通过管道传输到第一个或第二个主题中,并检查每个连续的输入管道,处理器可以检查什么:// push two items to the primary stream; the other window is empty // w1 = {} // w2 = {} // --> w1 = { 0:A0, 1:A1 } // w2 = {} for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]); } processor.checkAndClearProcessResult(EMPTY); // push two items to the other stream; this should produce two items // w1 = { 0:A0, 1:A1 } // w2 = {} // --> w1 = { 0:A0, 1:A1 } // w2 = { 0:a0, 1:a1 } for (int i = 0; i < 2; i++) { inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]); } processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0), new KeyValueTimestamp<>(1, "A1+a1", 0));我希望这有帮助。参考文献:[1] https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L279