如何模拟 KafkaTemplate 的结果

我有一种发送 kafka 消息的方法是这样的:


@Async

public void sendMessage(String topicName, Message message) {

    ListenableFuture<SendResult<String, Message >> future = kafkaTemplate.send(topicName, message);


    future.addCallback(new ListenableFutureCallback<>() {


        @Override

        public void onSuccess(SendResult<String, Message > result) {

            //do nothing

        }


        @Override

        public void onFailure(Throwable ex) {

            log.error("something wrong happened"!);

        }

    });

}

现在我正在为它编写单元测试。我还想测试这两个回调方法onSuccess和onFailure方法,所以我的想法是模拟 KafkaTemplate,例如:


KafkaTemplate kafkaTemplate = Mockito.mock(KafkaTemplate.class);

但是现在我陷入了这两种情况的模拟结果:


when(kafkaTemplate.send(anyString(), any(Message.class))).thenReturn(????);

thenReturn我应该在案例成功和案例失败的方法中输入什么?有人有想法吗?非常感谢!


Qyouu
浏览 179回答 1
1回答

holdtom

您可以模拟模板,但最好模拟界面。&nbsp; &nbsp; Sender sender = new Sender();&nbsp; &nbsp; KafkaOperations template = mock(KafkaOperations.class);&nbsp; &nbsp; SettableListenableFuture<SendResult<String, String>> future = new SettableListenableFuture<>();&nbsp; &nbsp; when(template.send(anyString(), any(Message.class))).thenReturn(future);&nbsp; &nbsp; sender.setTemplate(template);&nbsp; &nbsp; sender.send(...);&nbsp; &nbsp; future.set(new SendResult<>(...));&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; ...or...&nbsp; &nbsp; future.setException(...编辑更新为CompletableFuture(Apache Kafka 3.0.x 及更高版本的 Spring)...public class Sender {&nbsp; &nbsp; private&nbsp; KafkaOperations<String, String> template;&nbsp; &nbsp; public void setTemplate(KafkaOperations<String, String> template) {&nbsp; &nbsp; &nbsp; &nbsp; this.template = template;&nbsp; &nbsp; }&nbsp; &nbsp; public void send(String topic, Message<?> data) {&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture<SendResult<String, String>> future = this.template.send(data);&nbsp; &nbsp; &nbsp; &nbsp; future.whenComplete((result, ex) -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ex == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(result);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(ex.getClass().getSimpleName() + "(" + ex.getMessage() + ")");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; }}@ExtendWith(OutputCaptureExtension.class)public class So57475464ApplicationTests {&nbsp; &nbsp; @Test&nbsp; &nbsp; public void test(CapturedOutput captureOutput) {&nbsp; &nbsp; &nbsp; &nbsp; Message message = new GenericMessage<>("foo");&nbsp; &nbsp; &nbsp; &nbsp; Sender sender = new Sender();&nbsp; &nbsp; &nbsp; &nbsp; KafkaOperations template = mock(KafkaOperations.class);&nbsp; &nbsp; &nbsp; &nbsp; CompletableFuture<SendResult<String, String>> future = new CompletableFuture<>();&nbsp; &nbsp; &nbsp; &nbsp; given(template.send(any(Message.class))).willReturn(future);&nbsp; &nbsp; &nbsp; &nbsp; sender.setTemplate(template);&nbsp; &nbsp; &nbsp; &nbsp; sender.send("foo", message);&nbsp; &nbsp; &nbsp; &nbsp; future.completeExceptionally(new RuntimeException("foo"));&nbsp; &nbsp; &nbsp; &nbsp; assertThat(captureOutput).contains("RuntimeException(foo)");&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java