在回调中发送记录时如何在spring-kafka中修复“在xxx ms后更新元数据失败”

spring-kafka 无法在回调中发送记录


    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override

        public void onFailure(Throwable ex) {

            log.error("log error...");

        }


        @Override

        public void onSuccess(SendResult<String, String> result) {

            kafkaTemplate.send("anotherTopic", "key", "data");

        }

    });

当我在 onSuccess() 中调用 kafkaTemplate.send() 时,Kafka 抛出“更新元数据失败”,这是意料之中的


幕布斯6054654
浏览 99回答 1
1回答

莫回无

看起来您无法在回调线程上执行生产者操作kafka-producer-network-thread- 可能是生产者代码中的一些死锁 - 等待获取将使用同一线程的元数据,因此它超时。您可能需要第二个KafkaTemaplate(和生产者工厂,因为默认工厂总是返回相同的生产者)。或者只是在不同的线程上执行第二次发送......@SpringBootApplicationpublic class So54492871Application {&nbsp; &nbsp; private static final ExecutorService exec = Executors.newSingleThreadExecutor();&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; SpringApplication.run(So54492871Application.class, args);&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public NewTopic topic1() {&nbsp; &nbsp; &nbsp; &nbsp; return new NewTopic("so54492871-1", 1, (short) 1);&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public NewTopic topic2() {&nbsp; &nbsp; &nbsp; &nbsp; return new NewTopic("so54492871-2", 1, (short) 1);&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public ApplicationRunner runner(KafkaTemplate<String, String> template) {&nbsp; &nbsp; &nbsp; &nbsp; return args -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ListenableFuture<SendResult<String, String>> future = template.send("so54492871-1", "foo");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; future.addCallback(result -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + ":" + result);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exec.execute(() -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ListenableFuture<SendResult<String, String>> future2 = template.send("so54492871-2", "bar");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; future2.addCallback(result2 -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(Thread.currentThread().getName() + ":" + result2);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, ex -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(ex.getMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }, ex -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(ex.getMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.in.read();&nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java