章节索引 :

消息容器介绍

1. 前言

Hello,大家好。本小节会为同学们介绍 RabbitMQ 在 Spring 生态中的消息容器,消息容器是 RabbitMQ 在 Spring 生态中的第三个核心元素,消息容器指的不是一种特定的容纳消息的容器,而是一系列提供容纳消息、监听消息等其他对消息指标进行监控的工具,接下来就让我们来看看到底什么是消息容器吧。

本节主要内容:

  • 消息容器基础概念概述;

  • 常用消息容器基础配置概述。

2. 消息容器基础概念概述

基础概念:

消息容器,即容纳消息的容器。通过对上述部分小节的学习,我们已经知道,在 原生 RabbitMQ 知识体系中,并没有单独地一个消息容器的概念,只有消息队列的概念。

但是,在 Spring-AMQP 中,消息容器的概念和原生 RabbitMQ 知识体系中的消息队列的概念完全不一样。Spring-AMQP 中的消息容器指的是:可以提供对消息队列、消息、消费者、消息签收模式进行控制,以及消息的全方位监听的一系列工具的统称。

我们知道,在 RabbitMQ 中,充当核心角色的就是我们应用程序中的数据,也就是消息,那么,对消息以及消息队列进行全方位监控的工具,在 Spring-AMQP 中就被称为消息容器。

消息容器的主要作用就是对经过 RabbitTemplate 消息模板发送到 RabbitMQ Server 中的消息进行监听,当然,要确保 RabbitTemplate 和消息容器所使用的是同一个 RabbitAdmin 构造的连接,这样才能对消息进行监控。

在介绍完消息容器的基础概念之后,下面让我们来看一下如何对消息容器进行简单的配置吧。

3. 常用消息容器基础配置概述

还是像上节小节一样,要想在 Spring 中使用消息容器,需要将 Spring-AMQP 和 AMQP-Stater 的依赖先引入进来,方便起见,同学们可以直接拷贝下放代码:

3.1 引入消息容器

以 Maven 引入方式为例,引入代码如下所示:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>3.6.5</version>
</dependency>

在将这两个依赖进行引入之后,我们就可以对消息容器进行配置了。

3.2 常用消息容器基础配置

在 Spring-AMQP 中,消息容器共有这五种:AbstractMessageListenerContainer、DirectMessageListenerContainer、DirectReplyToMessageListenerContainer、SimpleMessageListenerContainer,以及 MessageListenerContainer 接口。

在这五种消息容器中,常用的消息容器只有一种,就是 SimpleMessageListenerContainer ,由于 SimpleMessageListenerContainer 消息容器配置起来简单方便,所支持的配置的属性比较全面,所以该消息容器是所有消息容器中使用频率最高的,也是实际工作中使用最多的一种消息容器。

本节以 SimpleMessageListenerContainer 消息容器为例,来介绍一下 SimpleMessageListenerContainer 消息容器该如何使用吧。

Tips: SimpleMessageListenerContainer 消息容器是众多消息容器中最基本的消息容器,我们把 SimpleMessageListenerContainer 消息容器的基本使用了解之后,其他的消息容器就无师自通了,只不过其他的消息容器所提供的配置属性或多或少罢了。

初始化 SimpleMessageListenerContainer 消息容器

像 RabbitAdmin 和 RabbitTemplate 一样,要想使用消息容器,需要先对消息容器进行初始化,这个初始化过程非常简单,初始化 SimpleMessageListenerContainer 消息容器的代码如下所示:

代码实现:

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
  SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
  return simpleMessageListenerContainer;
}

代码解释:

第 1 行,我们使用 Spring 的 Bean 注解将我们声明的 simpleMessageListenerContainer 方法注入到 Spring 容器中,这样 Spring 容器就可以监听到我们注入的配置。

第 2 行,我们使用 Spring-AMQP 中的 SimpleMessageListenerContainer 类,来声明了一个名为 simpleMessageListenerContainer 的方法,用来对 simpleMessageListenerContainer 消息容器进行初始化。

第 3 行,我们实例化了一个 simpleMessageListenerContainer 实例,该实例是 Spring-AMQP 中对 simpleMessageListenerContainer 消息容器进行初始化的实例,要想使用 SimpleMessageListenerContainer ,就必须要初始化该实例。

第 4 行,我们将初始化好的 simpleMessageListenerContainer 实例进行返回。

SimpleMessageListenerContainer 基本使用

配置 SimpleMessageListenerContainer 消息容器,和之前的配置 RabbitAdmin 以及 RabbitTemplate 不同,我们只需要直接在上述初始化方法中去填充我们需要的配置属性即可,这里以基本常用属性为例,代码如下所示:

simpleMessageListenerContainer.setQueues(new Queue("test_001"), new Queue("test_002"), new Queue("test_003"));
simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setMaxConcurrentConsumers(3);
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
  @Override
  public String createConsumerTag(String queue) {
  return queue + "_" + "created consumer tag";
  }
});
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
  @Override
  public void onMessage(Message message, Channel channel) throws Exception {
  String msg = new String(message.getBody());
  // do someting...
  }
  simpleMessageListenerContainer.setAutoStartup(true)
});

代码解释:

第 1-2 行,我们使用 simpleMessageListenerContainer 的 setQueues 方法,来设置我们需要进行监听的队列,这里新建了三个队列,名称分别为:test_001 ,test_002 ,test_003。

第 3-4 行,我们分别使用了 simpleMessageListenerContainer 的 setConcurrentConsumers 方法和 setMaxConcurrentConsumers 方法,来分别设置当前用于消费消息的消费者个数,以及最大允许用于消费消息的消费者个数,这里分别被设置成了 1 和 3 ,表示:当前用于消费消息的消费者个数为 1 个,最大允许进行消费消息的消费者个数为 3 个。

第 5 行,我们使用 simpleMessageListenerContainer 的 setDefaultRequeueRejected 方法,来设置是否开启重回队列, 关于什么是重回队列,在前面小节中都有详细介绍,这里不再赘述。当 setDefaultRequeueRejected 方法的值为 true 时,表示开启重回队列,当为 false 时,表示关闭重回队列,这里表示不适使用重回队列,即该方法的值为 false。

第 6 行,我们使用 simpleMessageListenerContainer 的 setAcknowledgeMode 方法,来设置当前消息的接收模式, 即对应原生 RabbitMQ 中的消息签收模式,是自动签收还是手动签收,这里,AcknowledgeMode.AUTO 表示自动签收消息。

第 7-10 行,我们使用 simpleMessageListenerContainer 的 setConsumerTagStrategy 方法,来为消费者设置一个标签,设置标签需要通过 new ConsumerTagStrategy 匿名内部类的方式来实现,通过重写其中的 createConsumerTag 方法来设置消费者的 Tag 标签。

第 13-17 行,我们使用 simpleMessageListenerContainer 的 setMessageListener 方法,来添加一种监听器, 添加监听器的方式也是通过匿名内部类实现;new ChannelAwareMessageListener 监听器监听的是,当消息经过 channel 时的情况,实现该监听器需要重写其中的 onMessage 方法,我们可以将业务逻辑填充在 onMessage 方法中。

setMessageListener 方法可以添加任意一种 Spring-AMQP 中支持的监听器,不单单只有 ChannelAwareMessageListener 监听器这一种,其他的监听器就交给同学们自行探索吧。

第 19 行,我们使用 simpleMessageListenerContainer 的 setAutoStartup 方法,来设置 simpleMessageListenerContainer 消息容器的启动模式,这里被设置为了 true ,表示,当 Spring 容器启动时,simpleMessageListenerContainer 消息容器也随之启动。

Tips: 1. setQueues 方法中的参数只有一个,就是消息队列,其参数类型为 Queue… ,该参数理论上可以支持无限个队列被添加进去,从而对这些队列进行监听;
2. AcknowledgeMode 的消息签收类型,不只有 AUTO 这一种,还有 NONE 和 MANUAL ,分别表示不设置和手动监听。
3. simpleMessageListenerContainer 消息容器是支持热加载的一款消息容器,即当我们的应用程序处于运行状态时,我们手动改变了某一配置属性的值,这种改动是立即生效的,不需要我们重启我们的应用程序,这点同学们注意。

4. 小结

本小节详细为同学们介绍了 Spring 生态中的消息容器这一概念,并通过结合常用的 SimpleMessageListenerContainer 消息容器,来介绍了消息容器的基本配置方法和常用配置属性,希望同学们可以对 Spring-AMQP 中的常用消息容器 SimpleMessageListenerContainer 有一个基础的认知,这样,我们以后在使用其他消息容器时,才会做到融会贯通。

RabbitMQ 简介
RabbitMQ 简介
RabbitMQ 基础
Win环境-SpringBoot集成MQ Mac OS环境下RabbitMQ的安装与集成 Linux环境下RabbitMQ安装与服务命令实操 RabbitMQ 核心基础概念详解 RabbitMQ 基础核心配置文件介绍 RabbitMQ 消息发送原理概述 RabbitMQ 消息发送模式详解 RabbitMQ 交换机详解 RabbitMQ 消息监控平台介绍
RabbitMQ 基础特性与进阶
RabbitMQ的幂等性概念 RabbitMQ中消息确认与返回机制 RabbitMQ中消费者ACK与重回队列机制 RabbitMQ中的TTL消息是什么 死信队列基础概念详解与配置
RabbitMQ 整合 Spring 生态链
RabbitAdmin基础概念详解与配置 RabbitTemplate基础概念详解与配置 消息容器介绍 消息适配器概念讲解与基本属性介绍 消息适配器应用实操 消息转换器概念讲解与基本属性介绍 消息转换器应用实操
RabbitMQ 集群基础
Warren模式与Shovel模式介绍 Mirror模式与Federation模式介绍 RabbitMQ集群配置文件概述 KeepAlived组件基础属性介绍 HaProxy组件基础属性介绍 RabbitMQ集群故障排查与恢复概述
RabbitMQ 实战
消息发送模式实战之直接模式与主题模式 消息发送模式实战之发布订阅模式 消息发送模式实战之普通队列模式与工作队列模式 使用RabbitMQ优化用户登录功能 使用RabbitMQ优化用户注册功能 RabbitMQ集成KeepAlived组件实操 RabbitMQ集群集成HaProxy组件实操 使用RabbitMQ打造扛得住的高并发环境(一) 使用RabbitMQ打造扛得住的高并发环境(二) 使用RabbitMQ打造扛得住的高并发环境(三)