章节索引 :

RabbitMQ 中消费者ACK与重回队列机制

1. 前言

Hello,大家好。本小节会为大家介绍 RabbitMQ 中的消费者 ACK 与消息的重回队列机制。消费者 ACK 与我们上节介绍的消息确认机制类似,都是针对消息而言的,而消息的重回队列机制确是针对特殊的消息类型而来。

消费者 ACK 与消息重回队列机制和上节中介绍的消息确认机制与消息返回机制意义相同,都属于 RabbitMQ 自带的补偿机制,只不过他们是针对于不同的消息来说的,下面就让我们来看一下究竟什么是 RabbitMQ 中的消费者 ACK 与重回队列机制。

本节主要内容:

  • 什么是消费者 ACK;

  • 消息重回队列机制概述;

2. 什么是消费者 ACK

基础概念:

消费者 ACK ,是描述消息与消费者之间的一种确认关系,其主要内容就是用来监听,消息是否已经被消费者成功消费了。

我们都知道,当消息成功被发送到 RabbitMQ Server 中,并且经交换机和频道,被路由到了相应的消息队里之后,这些消息就需要等待合适的消费者来接收这些消息,并最终将这些消息进行消费。消费者 ACK 就是在这个过程中间充当了一种监听器的作用,主要就是用来监听这些消息被消费者进行消费的结果,并将消息消费的结果返回给消费者。

从上述基础概念中,可以很清晰的得出消费者 ACK 的概念图,如下图所示:

根据上图,消息在被成功发送到 RabbitMQ Server 中之后,消费端从相应的消息队列中获取到了消息,并在消息被消费之后,给消费端返回了 ACK 信号,这个 ACK 信号的返回一共分为两种结果。

第一种 ACK 信号返回结果就是消息已经被成功消费了,这个时候返回给消费端的是 ack 信号,即消息消费成功的确认信号;另一种 ACK 信号返回结果是消息没有被消费成功,这个时候返回给消费端的是 nack 信号,即消息没有被消费者消费成功。

对于这两种返回信号来说,第一种 ACK 返回信息是 RabbitMQ Server 中的消息一旦被成功消费后就会返回给消费端,这个过程是自动的;而另一种 ACK 信号,由于消息在没有被成功消息后,RabbitMQ 会有自带的解决措施,所以此种 ACK 信号需要我们手动来选择,到底是使用 RabbitMQ 自带的解决措施,还是使用消费者 ACK 机制。

而无论 RabbitMQ Server 返回给了消费端哪种信号,都需要我们手动来获取消息队列中的消息,并且手动对这些消息进行消费。

我们在了解了消费者 ACK 机制的基础概念和作用之后,我们还需要了解在 RabbitMQ 中,如何通过代码来实现 RabbitMQ 的消费者 ACK 机制。

代码实现:

实现消费者 ACK 机制,只需要在消费端进行配置即可,代码如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
DefaultConsumer defaultConsumer = new DefaultCnsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};

代码解释:

第 1-5 行,我们使用 ConnectionFactory 创建了一个客户端连接 RabbitMQ Server 的连接。

第 6 行,我们使用建立好的连接,来创建了一个频道 channel 。

第 7-10 行,我们使用手动创建 DefaultConsumer 的方式来手动从消息队列中获取消息并进行消费,其中,此种手动创建 Consumer 的方式需要通过匿名内部类的方式来实现,在类的内部,还需要重写 handleDelivery 方法,该方法的第一个参数,是消费者的标签,第二个参数是当前消费者所处的环境信息,第三个参数和第四个参数分别为消息的 properties 参数和消息体。

第 12 行,我们使用了 channel 的 basicAck 方法,来对消息进行消费 ACK ,即一旦消息被成功消费,就会通过 basicAck 方法将第一种 ACK 信号返回给消费端。

Tips: 1.handleDeliery 方法是进行消息有没有被消费的方法,我们只有重写了该方法,才能手动监听到消息有没有被消费;
2. channel 的 basicAck 方法的第一个参数就是当前消费者的标签,第二个参数表示是否启用 RabbitMQ 的消费者自动 ACK ,如果要想手动对消费端进行 ACK ,那么这个属性一定不要开启,即将该属性的值设置为 false ,关闭消费端的 autoAck 才行。

3. 消息重回队列机制概述

基础概念:

消息重回队列机制,是描述那些没有被成功消费的消息与消费端之间的一种保障策略, 其主要目的就是为了存储那些没有被消费者成功消费掉的消息,即在 RabbitMQ 诸多的消息队列中,专门用来存储那些没有被消费者成功消费掉的消息的队列,这种队列就被称为重回队列。

那么为什么被称为重回队列呢?因为,在 RabbitMQ 中,如果一条消息没有被消费者成功消费掉,那么这条消息除了会被放到重回队列中之外,RabbitMQ Server 还会从该重回队列中获取到这些消息,并且将这些没有被消费成功的消息重新发送到 RabbitMQ Server 中,再次由消费者进行消费,直到这些消息成功地被消费者消费。

从上述消息重回队列的基础概念可以得出,消息重回队列的原理图,如下图所示:

我们只看上图的消费端部分,当消息队列中的消息没有被消费者成功消费后,首先 RabbitMQ Server 会返回给消费端一个 nack 的信号;其次,该条没有被消费的消息会被放入重回队列中;最后,RabbitMQ Server 会从这个重回队列中获取该条消息,并重新发送到 RabbitMQ Server 中等待消费。

上述过程就是我们的重回队列机制处理未被消费的消息的全过程,同学们需要对此有所了解才行。

接下来我们来看一下,消息重回队列如何来进行代码配置。

代码实现:

实现消息重回队列机制,也是只需要在消费端进行配置即可,代码如下:

// 省略客户端连接 RabbitMQ Server 的过程
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
DefaultConsumer defaultConsumer = new DefaultCnsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (properties.getHeaders().get("type") != "utf-8") {
            channel.basicNack(envelope.getDeliveryTag(), false, true)
        }
    }
};

代码解释:

第 1-2 行,我们创建了客户端连接 RabbitMQ Server 的连接,并且创建了一个 channel 。

第 8-11 行,我们在 handleDelivery 方法的内部,添加了一个判断条件,当满足该条件时,意味着该条消息没有被成功消费,返回给消费端 nack 信号,这个过程是通过 channel 的 basicNack 方法实现。

Tips: 1. 在 channel 的 basicNack 方法中,方法的第二个参数表示是否批量确认,这里设置为了 false ,表示不进行批量确认,第三个参数表示是否启用消息重回队列,这里设置为了 true ,表示启用重回队列,默认情况下,该属性是 false ,即不启用重回队列;
2. 在实际工作中,我们一般也会关闭消息重回队列,使用 RabbitMQ 自带的措施和打印日志的方式来进行补偿,RabbitMQ 默认会将那些没有被成功消费的消息删除,不会放入重回队列中。

4. 小结

本小节为同学们介绍了 RabbitMQ 中的消费者 ACK ,以及消息重回队里机制。从消费者 ACK 与消息重回队列机制的基础概念开始,到不同机制的代码实现结束,详细介绍了什么是消费者 ACK、什么是重回队列,以及消息重回队列机制,且通过不同机制的代码实现,分别阐述了如何通过代码来对两种机制进行配置。希望同学们可以完全理解两种机制的基础概念和实现方式。

RabbitMQ 简介
RabbitMQ 简介
RabbitMQ 基础
Win环境-SpringBoot集成MQ Mac OS环境下RabbitMQ的安装与集成 Linux环境下RabbitMQ安装与服务命令实操 RabbitMQ 核心基础概念详解 RabbitMQ 基础核心配置文件介绍 RabbitMQ 消息发送原理概述 RabbitMQ 消息发送模式详解 RabbitMQ 交换机详解 RabbitMQ 消息监控平台介绍
RabbitMQ 基础特性与进阶
RabbitMQ的幂等性概念 RabbitMQ中消息确认与返回机制 RabbitMQ中消费者ACK与重回队列机制 RabbitMQ中的TTL消息是什么 死信队列基础概念详解与配置
RabbitMQ 整合 Spring 生态链
RabbitAdmin基础概念详解与配置 RabbitTemplate基础概念详解与配置 消息容器介绍 消息适配器概念讲解与基本属性介绍 消息适配器应用实操 消息转换器概念讲解与基本属性介绍 消息转换器应用实操
RabbitMQ 集群基础
Warren模式与Shovel模式介绍 Mirror模式与Federation模式介绍 RabbitMQ集群配置文件概述 KeepAlived组件基础属性介绍 HaProxy组件基础属性介绍 RabbitMQ集群故障排查与恢复概述
RabbitMQ 实战
消息发送模式实战之直接模式与主题模式 消息发送模式实战之发布订阅模式 消息发送模式实战之普通队列模式与工作队列模式 使用RabbitMQ优化用户登录功能 使用RabbitMQ优化用户注册功能 RabbitMQ集成KeepAlived组件实操 RabbitMQ集群集成HaProxy组件实操 使用RabbitMQ打造扛得住的高并发环境(一) 使用RabbitMQ打造扛得住的高并发环境(二) 使用RabbitMQ打造扛得住的高并发环境(三)