Spring集成-发布者确认超时?

这是我目前的设置:


queue1 和 queue2 与流向 channel1 的集成流一起标记:


@Bean

public IntegrationFlow q1f() {

    return IntegrationFlows

            .from(queue1InboundAdapter())

            ...

            .channel(amqpInputChannel())

            .get();

}


@Bean

public IntegrationFlow q2f() {

    return IntegrationFlows

            .from(queue2InboundAdapter())

            ...

            .channel(amqpInputChannel())

            .get();

}

然后,一切都被聚合,然后在聚合消息被rabbitmq确认后确认:


@Bean

    public IntegrationFlow aggregatingFlow() {

        return IntegrationFlows

                .from(amqpInputChannel())

                .aggregate(...

                        .expireGroupsUponCompletion(true)

                        .sendPartialResultOnExpiry(true)

                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))

                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))

                )

                .handle(amqpOutboundEndpoint())

                .get();

    }


    @Bean

    public AmqpOutboundEndpoint amqpOutboundEndpoint() {

        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());

        outboundEndpoint.setConfirmAckChannel(manualAckChannel());

        outboundEndpoint.setConfirmCorrelationExpressionString("#root");

        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");

        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key

        return outboundEndpoint;

    }

ackTemplate()用 cf 设置springFactory.setPublisherConfirms(true);。


我看到的问题是,每 10 天一次,有一些消息卡unacknowledged在 rabbitmq 中的状态。


我的猜测是,消息的发布以某种方式等待兔子做,PUBLISHER CONFIRMS但它永远不会得到它并超时?在这种情况下,我从不 ACK 中的消息queue1。这可能吗?


因此,只需再完成一次工作流程:


[两个队列->直接通道->聚合器(保留通道和标记值)->发布到兔子->兔子通过发布者确认返回ACK->spring确认通道上的所有消息+它为聚合消息保存在内存中的值]


莫回无
浏览 168回答 2
2回答

FFIVE

在 Spring AMQP 2.1 版(Spring Integration 5.1)中,我们添加了一个Future<?>并返回消息CorrelationData来协助处理这种事情。如果您使用的是旧版本,则可以子类CorrelationData化(并且您必须在代码中处理设置未来和返回的消息)。这与计划任务一起,可以检测丢失的确认...@SpringBootApplication@EnableSchedulingpublic class Igh2755Application {&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; SpringApplication.run(Igh2755Application.class, args);&nbsp; &nbsp; }&nbsp; &nbsp; private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();&nbsp; &nbsp; @Bean&nbsp; &nbsp; public ApplicationRunner runner(RabbitTemplate template) {&nbsp; &nbsp; &nbsp; &nbsp; return args -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SuccessCallback<? super Confirm> successCallback = confirm -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FailureCallback failureCallback = throwable -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(throwable.getMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Good - ack&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; CorrelationData correlationData = new CorrelationData("good");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData.getFuture().addCallback(successCallback, failureCallback);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.futures.put(correlationData);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; template.convertAndSend("", "foo", "data", correlationData);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Missing exchange nack, no return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData = new CorrelationData("missing exchange");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData.getFuture().addCallback(successCallback, failureCallback);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.futures.put(correlationData);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; template.convertAndSend("missing exchange", "foo", "data", correlationData);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Missing queue ack, with return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData = new CorrelationData("missing queue");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData.getFuture().addCallback(successCallback, failureCallback);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.futures.put(correlationData);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; template.convertAndSend("", "missing queue", "data", correlationData);&nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; }&nbsp; &nbsp; @Scheduled(fixedDelay = 5_000)&nbsp; &nbsp; public void checkForMissingAcks() {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Checking pending acks");&nbsp; &nbsp; &nbsp; &nbsp; CorrelationData correlationData = this.futures.poll();&nbsp; &nbsp; &nbsp; &nbsp; while (correlationData != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (correlationData.getReturnedMessage() == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Ack received OK for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Message returned for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Nack received for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (InterruptedException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.currentThread().interrupt();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Interrupted");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (ExecutionException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Failed to get an ack " + e.getCause().getMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (TimeoutException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Timed out waiting for ack for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData = this.futures.poll();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("No pending acks, exiting");&nbsp; &nbsp; }}.Checking pending acksAck received OK for goodNack received for missing exchangeMessage returned for missing queueNo pending acks, exiting使用 Spring IntegrationconfirmCorrelationExpression可以使用它来创建CorrelationData实例。编辑使用 Spring 集成...@SpringBootApplication@EnableSchedulingpublic class Igh2755Application {&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; SpringApplication.run(Igh2755Application.class, args);&nbsp; &nbsp; }&nbsp; &nbsp; private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();&nbsp; &nbsp; public interface Gate {&nbsp; &nbsp; &nbsp; &nbsp; void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; @DependsOn("flow")&nbsp; &nbsp; public ApplicationRunner runner(Gate gate) {&nbsp; &nbsp; &nbsp; &nbsp; return args -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; gate.send("", "foo", "good");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; gate.send("junque", "rk", "missing exchange");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; gate.send("", "junque", "missing queue");&nbsp; &nbsp; &nbsp; &nbsp; };&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public IntegrationFlow flow(RabbitTemplate template) {&nbsp; &nbsp; &nbsp; &nbsp; return IntegrationFlows.from(Gate.class)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .handle(Amqp.outboundAdapter(template)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .confirmCorrelationExpression("@correlationCreator.create(#root)")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .exchangeNameExpression("headers.exchange")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .routingKeyExpression("headers.rk")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .returnChannel(returns())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .confirmAckChannel(acks())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .confirmNackChannel(acks()))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .get();&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public MessageChannel acks() {&nbsp; &nbsp; &nbsp; &nbsp; return new DirectChannel();&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public MessageChannel returns() {&nbsp; &nbsp; &nbsp; &nbsp; return new DirectChannel();&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public IntegrationFlow ackFlow() {&nbsp; &nbsp; &nbsp; &nbsp; return IntegrationFlows.from("acks")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; /*&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* Work around a bug because the correlation data is wrapped and so the&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;* wrong future is completed.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*/&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .handle(m -> {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(m);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (m instanceof ErrorMessage) { // NACK&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData.getFuture().set(new Confirm(false, "Message was returned"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .get();&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public IntegrationFlow retFlow() {&nbsp; &nbsp; &nbsp; &nbsp; return IntegrationFlows.from("returns")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .handle(System.out::println)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .get();&nbsp; &nbsp; }&nbsp; &nbsp; @Bean&nbsp; &nbsp; public CorrelationCreator correlationCreator() {&nbsp; &nbsp; &nbsp; &nbsp; return new CorrelationCreator(this.futures);&nbsp; &nbsp; }&nbsp; &nbsp; public static class CorrelationCreator {&nbsp; &nbsp; &nbsp; &nbsp; private final BlockingQueue<CorrelationData> futures;&nbsp; &nbsp; &nbsp; &nbsp; public CorrelationCreator(BlockingQueue<CorrelationData> futures) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.futures = futures;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; public CorrelationData create(Message<String> message) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; CorrelationData data = new CorrelationData(message.getPayload());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.futures.add(data);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return data;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; @Scheduled(fixedDelay = 5_000)&nbsp; &nbsp; public void checkForMissingAcks() {&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Checking pending acks");&nbsp; &nbsp; &nbsp; &nbsp; CorrelationData correlationData = this.futures.poll();&nbsp; &nbsp; &nbsp; &nbsp; while (correlationData != null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (correlationData.getReturnedMessage() == null&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; && !correlationData.getId().equals("Message was returned")) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Ack received OK for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Message returned for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Nack received for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (InterruptedException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.currentThread().interrupt();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Interrupted");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (ExecutionException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Failed to get an ack " + e.getCause().getMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (TimeoutException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("Timed out waiting for ack for " + correlationData.getId());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; correlationData = this.futures.poll();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; System.out.println("No pending acks, exiting");&nbsp; &nbsp; }}

紫衣仙女

您可以将连接声明为 bean@Beanpublic ConnectionFactory createConnectionFactory(){&nbsp; &nbsp; CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);&nbsp; &nbsp; connectionFactory.setUsername("guest");&nbsp; &nbsp; connectionFactory.setPassword("guest");&nbsp; &nbsp; connectionFactory.setVirtualHost("/");&nbsp; &nbsp; connectionFactory.setPublisherReturns(true);&nbsp; &nbsp; connectionFactory.setPublisherConfirmType(ConfirmType.SIMPLE);&nbsp; &nbsp; return connectionFactory;}然后 RabbitTemplate 为@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {&nbsp; &nbsp; RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);&nbsp; &nbsp; rabbitTemplate.setMandatory(true);&nbsp; &nbsp; rabbitTemplate.setConfirmCallback(callback);&nbsp; &nbsp; return rabbitTemplate;}其中 callback 是 ConfirmCallback 接口的实现在发送时您可以等待确认System.out.println("Sending message...");&nbsp; &nbsp; &nbsp; &nbsp; rabbitTemplate.convertAndSend(rabbitMQProperties.getEXCHANGENAME(),&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rabbitMQProperties.getQUEUENAME(), "hello from rabbit");&nbsp; &nbsp; &nbsp; &nbsp; rabbitTemplate.waitForConfirms(1);waitforconfirms 将以毫秒为单位。我把它作为 1 用于测试目的。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java