章节索引 :

RabbitMQ 中消息确认与返回机制概述

1. 前言

Hello,大家好。本小节会为大家介绍 RabbitMQ 中的消息确认机制与消息返回机制,这两个机制是针对消息来说的不同场景而诞生的,其目的就是为了保证 RabbitMQ 中消息的可靠性投递,以及消息在发生特定异常时的补偿策略。

这两种机制是 RabbitMQ 自带的补偿机制,在我们开发应用程序时,如果遇到了对应的业务场景,我们直接来用就可以了,就不需要去使用额外的工具来弥补了。

本节主要内容:

  • 消息确认机制概述;

  • 消息返回机制概述;

2. 消息确认机制概述

基础概念:

消息确认机制,是描述消息与 RabbitMQ Server 之间的关系的一种保障机制,其主要内容就是用来监听,当我们应用程序中的数据,即消息,被发送到 RabbitMQ Server 中之后返回给生产端的一种消息监听机制。消息确认机制描述了一种消息是否已经被发送到 RabbitMQ Server 中以及 RabbitMQ Server 与生产端之间的关系。

从上述消息确认机制的基本概念可以得出,消息确认机制的作用就是:监听生产端的消息是否已经发送到了 RabbitMQ Server 中,如果消息没有被发送到 RabbitMQ Server 中,则消息确认机制不会给生产端返回任何确认应答,相反,如果消息被成功发送到了 RabbitMQ Server 中,则消息确认机制会给生产端返回一个确认应答,以通知生产端,消息已经发送到了 RabbitMQ Server 中,概念图如下所示:

根据上图,消息在被成功发送到 RabbitMQ Server 中之后,RabbitMQ Server 就会给生产端返回一个确认应答,这个确认应答会包含两种结果,一种就是消息发送到了 RabbitMQ Server ,RabbitMQ Server 收到了该消息,这时会给生产者返回 ack 的确认应答, 表示消息已经被接收。

另一种就是消息没有发送到 RabbitMQ Server ,RabbitMQ Server 没有收到该消息,这时会给生产者返回一个 nack 的确认应答,即 no ack , 表示没有接收到该消息。

我们在了解了消息确认机制的基础概念和作用之后,我们还需要了解在 RabbitMQ 中,如何通过代码来实现 RabbitMQ 的消息确认机制。

代码实现:

实现消息确认机制,只需要在生产端进行配置即可,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
channel.confirmSelect();
channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener(){
    @Override
    public void handleAck(long l, boolean b) throws IOException{
        // do something...
    }

    @Override
    public void handleNack(long l, boolean b) throws IOException{
        // do something...
    }
});

代码解释:

第 1-5 行,我们使用 ConnectionFactory 创建了一个客户端连接 RabbitMQ Server 的连接。

第 6 行,我们使用建立好的连接,来创建了一个频道 channel 。

第 7 行,我们使用 channel 的 basicPublish 方法来将我们的消息发送到 RabbitMQ Server 中。

第 8 行,我们为 channel 绑定了一个消息确认机制的监听器,即 addConfirmListener ,且我们通过 new ConfirmListener 匿名内部类的方式,来重写了消息确认监听器中的 handleAck 方法和 handleNack 方法,其中,handleAck 方法表示消息已经被 RabbitMQ Server 接收可以返回 ack 的确认应答,handleNack 方法则表示方法没有被 RabbitMQ Server 接收可以返回 nack 的确认应答。

Tips: 1. 配置消息确认机制我们需要先配置 confirmSelect 方法来声明消息确认机制,接着我们需要为 channel 添加 addConfirmListener 消息确认监听器,并重写其中的 handleAck 和 handleNack 方法,最后需要根据 RabbitMQ Server 返回的确认应答在上述两个方法中完成我们需要处理的业务逻辑;
2. 如果需要启用消息确认机制,那么我们就不能自主的去关闭频道 channel 和 连接 connection,因为消息确认机制的返回结果是异步返回的,如果我们在向 RabbitMQ Server 发送了消息之后,就关闭了对应的 channel 和 connection ,那么我们就收不到任何消息确认的结果了。

3. 消息返回机制概述

基础概念:

消息返回机制,是描述不可达的消息与生产者之间的一种保障策略,其主要内容就是用来监听,RabbitMQ Server 中是否存在不可达的消息,并根据监听结果返回给生产端的一种监听机制,即消息返回机制描述了一种 RabbitMQ Server 中的不可达消息与生产端的关系。

从上述消息返回机制的基本概念可以得出,消息返回机制的作用就是:监听生产端发动到 RabbitMQ Server 中的消息是否可达,如果消息不可达,则返回一个信号通知生产端,相反,如果消息可达,则不会通知生产端任何信号,概念图如下所示:

根据上图,消息在被成功发送到 RabbitMQ Server 中之后,如果消息在经过当前配置的 exchangeName 或 routingKey 没有找到指定的交换机,或没有匹配到对应的消息队列,那么这个消息就被称为不可达的消息,如果此时配置了消息返回机制,那么此时 RabbitMQ Server 会返回给生产端一个信号,信号中包括消息不可达的原因,以及消息本身的内容。

我们在了解了消息返回机制的基础概念和作用之后,我们还需要了解在 RabbitMQ 中,如何通过代码来实现 RabbitMQ 的消息返回机制。

代码实现:

实现消息返回机制,也是只需要在生产端进行配置即可,代码如下:

// 省略客户端连接 RabbitMQ Server 的过程
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
        // do something...
    );
    }
});
channel.basicPublish(exchangeName, routingKey + "01", true, null, msg.getBytes());

代码解释:

第 1-2 行,我们创建了客户端连接 RabbitMQ Server 的连接,并且创建了一个 channel 。

第 3 行,我们为 channel 添加了 addReturnListener 消息返回的监听器,并且通过 new ReturnListener 匿名内部类的方式来重写 handleReturn 方法。

第 12 行,我们通过 channel 的 basicPublish 方法,将我们的消息发送出去,其中,在 basicPublish 方法中,我们只需要了解前三个参数即可:第一个参数表示交换机的名称,第二个参数表示路由 Key 的名称,第三个参数是 mandatory 属性,表示是否开启消息返回机制,如果这个属性被置为 false ,则消息返回监听器就不会生效。

Tips: 1. 要使用消息返回机制,就一定要配置 basicPublish 方法中的第三个参数的值为 true ,否则,即使添加了 addReturnListener 监听器,不可达的消息也不会被监听到;
2. 如果我们没有配置第三个参数的属性为 true ,那么,当 RabbitMQ Server 中存在不可达消息时,RabbitMQ 就会自动将该消息删除。

4. 小结

本小节为各位同学介绍了 RabbitMQ 中的消息确认机制,以及消息返回机制。从消息确认机制和消息返回机制的基础概念开始,到不同机制的代码实现结束,详细介绍了什么是消息确认机制,以及什么是消息返回机制,且通过不同机制的代码实现,分别阐述了如何通过代码来对两种机制进行配置。希望同学们可以完全理解两种机制的基础概念和实现方式。

消息确认机制和消息返回机会的基本内容就全部介绍完毕了,但是有一种现象同学们可以课下尝试一下,那就是当两种机制都进行配置之后,如果我们的消息不可达了,那么消息确认的监听器还会监听到吗?

RabbitMQ 简介
RabbitMQ 简介
RabbitMQ 基础
Win环境-SpringBoot集成MQ Mac OS环境下RabbitMQ的安装与集成 Linux环境下RabbitMQ安装与服务命令实操 RabbitMQ 核心基础概念详解 RabbitMQ 基础核心配置文件介绍 RabbitMQ 消息发送原理概述 RabbitMQ 消息发送模式详解 RabbitMQ 交换机详解 RabbitMQ 消息监控平台介绍
RabbitMQ 基础特性与进阶
RabbitMQ的幂等性概念 RabbitMQ中消息确认与返回机制 RabbitMQ中消费者ACK与重回队列机制 RabbitMQ中的TTL消息是什么 死信队列基础概念详解与配置
RabbitMQ 整合 Spring 生态链
RabbitAdmin基础概念详解与配置 RabbitTemplate基础概念详解与配置 消息容器介绍 消息适配器概念讲解与基本属性介绍 消息适配器应用实操 消息转换器概念讲解与基本属性介绍 消息转换器应用实操
RabbitMQ 集群基础
Warren模式与Shovel模式介绍 Mirror模式与Federation模式介绍 RabbitMQ集群配置文件概述 KeepAlived组件基础属性介绍 HaProxy组件基础属性介绍 RabbitMQ集群故障排查与恢复概述
RabbitMQ 实战
消息发送模式实战之直接模式与主题模式 消息发送模式实战之发布订阅模式 消息发送模式实战之普通队列模式与工作队列模式 使用RabbitMQ优化用户登录功能 使用RabbitMQ优化用户注册功能 RabbitMQ集成KeepAlived组件实操 RabbitMQ集群集成HaProxy组件实操 使用RabbitMQ打造扛得住的高并发环境(一) 使用RabbitMQ打造扛得住的高并发环境(二) 使用RabbitMQ打造扛得住的高并发环境(三)