对使用 kstream 连接的 kafka 拓扑进行单元测试

我有一个做两个 kstream 连接的拓扑,我面临的问题是当尝试使用 TopologyTestDriver 进行单元测试时,发送几个带有 pipeInput 然后 readOutput 的 ConsumerRecords。它似乎不起作用。

我认为这可能是因为联接在我们在测试中不使用的实际 kafka 中使用了内部的 Rocksdb。

所以我一直在寻找解决方案,但找不到任何解决方案。

注意:这种测试方法在移除 kstream-kstream 连接时效果很好。


DIEA
浏览 149回答 2
2回答

ITMISS

我有一个做两个 kstream 连接的拓扑,我面临的问题是当尝试使用 TopologyTestDriver 进行单元测试时,发送几个带有 pipeInput 然后 readOutput 的 ConsumerRecords。它似乎不起作用。按照设计,但不幸的是,在您的情况下,这TopologyTestDriver并不是 Kafka Streams 引擎在运行时如何工作的 100% 准确模型。值得注意的是,新的传入事件的处理顺序存在一些差异。例如,在尝试测试某些连接时,这确实会导致问题,因为这些操作依赖于特定的处理顺序(例如,在流表连接中,表应该在流之前已经有一个键 'alice' 的条目- 'alice' 的辅助事件到达,否则流端 'alice' 的连接输出将不包含任何表端数据)。所以我一直在寻找解决方案,但找不到任何解决方案。我建议使用启动嵌入式 Kafka 集群的测试,然后使用“真正的”Kafka Streams 引擎(即,不是TopologyTestDriver. 实际上,这意味着您正在将测试从单元测试更改为集成/系统测试:您的测试将启动一个成熟的 Kafka Streams 拓扑,该拓扑与与您的测试在同一台机器上运行的嵌入式 Kafka 集群通信。请参阅 Apache Kafka 项目中的 Kafka Streams 集成测试,其中EmbeddedKafkaCluster和IntegrationTestUtils是工具的核心部分。连接的具体测试示例是StreamTableJoinIntegrationTest(有一些与连接相关的集成测试)及其父级AbstractJoinIntegrationTest。(对于它的价值,在https://github.com/confluentinc/kafka-streams-examples#examples-integration-tests有进一步的集成测试示例,其中包括在使用 Apache Avro 作为您的数据格式等)但是,除非我弄错了,否则集成测试及其工具不包含在 Kafka Streams 的测试实用程序工件中(即,org.apache.kafka:kafka-streams-test-utils)。因此,您必须将一些复制粘贴到您自己的代码库中。

aluckdog

你看过 Kafka Streams 单元测试 [1] 吗?这是关于输入数据并使用模拟处理器检查最终结果。例如对于以下流连接:&nbsp; &nbsp; &nbsp; &nbsp; stream1 = builder.stream(topic1, consumed);&nbsp; &nbsp; &nbsp; &nbsp; stream2 = builder.stream(topic2, consumed);&nbsp; &nbsp; &nbsp; &nbsp; joined = stream1.outerJoin(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stream2,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MockValueJoiner.TOSTRING_JOINER,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; JoinWindows.of(ofMillis(100)),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));&nbsp; &nbsp; &nbsp; &nbsp; joined.process(supplier);然后,您可以开始将输入项通过管道传输到第一个或第二个主题中,并检查每个连续的输入管道,处理器可以检查什么:// push two items to the primary stream; the other window is empty&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // w1 = {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // w2 = {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // --> w1 = { 0:A0, 1:A1 }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //&nbsp; &nbsp; &nbsp;w2 = {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < 2; i++) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; processor.checkAndClearProcessResult(EMPTY);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // push two items to the other stream; this should produce two items&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // w1 = { 0:A0, 1:A1 }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // w2 = {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // --> w1 = { 0:A0, 1:A1 }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //&nbsp; &nbsp; &nbsp;w2 = { 0:a0, 1:a1 }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < 2; i++) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new KeyValueTimestamp<>(1, "A1+a1", 0));我希望这有帮助。参考文献:[1] https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L279
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java