1. 前言
RabbitMQ经常被用于服务模块之间的解耦以及高并发削峰场景,之前的章节讨论了不同服务模式的特点,但是在生产环境中,因为机器以及网络设备的不可靠,保证消息的可靠是待解决的问题。在特定场景下消息可能存在丢失风险,本文将介绍如何预防这类的风险。
2. RabbitMQ消息丢失的场景
面试官提问:RabbitMQ 消息队列,在哪些场景下可能会丢失消息?
题目解析:
我们可以将 RabbitMQ 消息处理的过程分为三个步骤:
(1)生产阶段:生产者生产消息并且发送到消息队列;
(2)储存阶段:消息队列存储和处理消息;
(3)消费阶段:消息队列将消息转发到消费者。
上述每个步骤都有消息丢失的风险,候选人需要按顺序分别解释不同场景可能丢失的原因以及解决方案。
2.1 生产者生产消息并且发送到消息队列
可能发生消息丢失的场景:网络故障。网络环境的不可靠导致消息发送失败,例如网络丢包、网络故障。数据在网络中传输会经过诸多网络设备,只要其中一个网络链接在数据抵达前已经流量满载,新到的数据将会阻塞一段时间段。另外比较少见的例子是施工挖断光纤或者其他原因导致硬件层面的长时间不可用。
参考解决方案是使用AMQP协议的事务机制。生产者在发出消息之后,消息是否到达RabbitMQ服务器是默认不可知的,所以在生产者发送消息之前,调用channel.txSelect
语句开启事务,如果消息发送失败,那么调用channel.txRollback
回滚事务,重新发送一条消息;如果消息发送成功,那么调用channel.txCommit
提交事务。
采用事务的缺点是增加耗时,会降低RabbitMQ的吞吐性能。
所以RabbitMQ还有一种性能改进方案,即Confirm机制,步骤如下:
(1)生产者调用channel.confirmSelect
将通信方式设置为confirm
模式;
(2)生产者发送的所有消息都会被分配一个唯一 ID;
(3)当生产者发送的消息成功投递到队列之后,RabbitMQ会发送一个确认给生产者,生产者即得知这条消息已经成功发送。
2.2 消息队列存储和处理消息
可能发生消息丢失的场景:服务器宕机。消息存储在 RabbitMQ 队列中,如果队列没有持久化,RabbitMQ 服务器重启会导致消息丢失。
参考解决方案是对消息队列持久化,分为三个步骤:
(1)Exchange 持久化:以 Direct 模式为例,将 durable 参数设置为 true。示例:
@Bean
DirectExchange testExchange() {
return new DirectExchange(Constants.EXCHANGE_NAME, true, false);
}
(2)Queue 持久化:将 durable 参数设置为 true,但是这样只能保证持久化 Queue 的元数据,但是不会持久化 Queue 里存储的消息。示例:
@Bean
public Queue testQueue() {
return new Queue(Constants.QUEUE_NAME);
}
(3)消息持久化:发送消息的时候将deliveryMode设置为2,SpringBoot中的rabbitTemplate默认设置消息是持久化,所以我们不需要手动配置,具体原因可参考源码,示例:
public enum MessageDeliveryMode {
NON_PERSISTENT,
PERSISTENT;
private MessageDeliveryMode() {
}
public static int toInt(MessageDeliveryMode mode) {
switch(mode) {
//非持久化模式
case NON_PERSISTENT:
return 1;
//持久化模式
case PERSISTENT:
return 2;
default:
return -1;
}
}
2.3 消息队列将消息转发到消费者
可能发生消息丢失的场景:消费者在收到消息之后,还没来得及处理消息的消费逻辑,所在机器就宕机了,导致内存中的消息丢失。
参考解决方案是在消费端开启手动 ACK 模式。RabbitMQ 默认采用自动 ACK 机制,在没有处理业务逻辑之前,消费者就会告知消息队列已经成功收到消息,这种方式并不符合我们的预期。
以 SpringBoot 的配置方式为例,有两种配置手动 ACK 的方式:
(1)yml文件修改全局确认模式,示例:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
(2)在自动注入 RabbitListenerContainerFactory 时开启手动ACK,示例:
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//1. 创建工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//2. 设置手动ACK模式,即AcknowledgeMode.MANUAL
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
3. 小结
本章节介绍了 RabbitMQ 作为消息队列,容易产生消息丢失的三种场景,以及针对每种场景的关键解决方案,从性质上可以分为持久化和消息确认机制。抛开题目本身来说,建议候选人通过本地环境实战来体验每种解决方案的具体编码,加强对方案的理解。