手记

消息中间件之ActiveMQ

1、JMS(Java Message Service,Java消息服务)

1.1 定义

Java消息服务(Java Message Service,即JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM) 的API, 用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的APl。

1.2 JMS的对象模型

名称 描述
ConnectionFactory 连接工厂
Connection 连接
Session 会话
Destination 目的
MessageProducer 生产者
MessageConsumer 消费者
Message 消息
Broker 消息中间件的实例(ActiveMQ)

1.3 JMS消息模型

  1. Point-to-Point (P2P) /点对点

  2. Publish/Subscribe (Pub/Sub) /主题(发布订阅)

1.4 JMS的消息结构

消息头、消息属性、消息体

  1. 消息头

  2. 消息属性:可以理解为消息的附加消息头,属性名可以自定义

  3. 消息体

2、ActiveMQ概念

2.1 定义

ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。

ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

2.2 特性

  1. 支持多种编程语言
  2. 支持多种传输协议
  3. 有多种持久化方式

2.3 ActiveMQ支持哪些协议

  1. ActiveMQ支持多种协议传输和传输方式,允许客户端使用多种协议连接。
  2. ActiveMQ支持的协议: AUTO、 OpenWire、 AMQP、Stomp、 MQTT等。
  3. ActiveMQ支持的基础传输方式: VM、 TCP、SSL、 UDP 、Peer、 Multicast、 HTTP(S)等,以及更高级的Failover、Fanout、 Discovery、 ZeroConf方式。

2.4 OpenWire协议

2.4.1 OpenWire协议是什么

OpenWire是Apache的一种跨语言的协议,允许从不同的语言和平台访问ActiveMQ,是ActiveMQ 4.x以后的版本默认的传输协议。

2.4.2 OpenWire协议如何使用

OpenWire 支持TCP、SSL、 NIO、UDP、VM等传输方式,直接配置这些连接,就是使用的OpenWire协议,OpenWire有 自己的配置参数,客户端和服务器端配置的参数名都通过缀“wireFormat.” 表示。

示例

OpenWire的配置参数说明

属性 默认值 描述
stackTraceEnabled true 是否应该把已经发生并且跟踪到的堆栈异常,通过Broker发送到客户端
tcpNoDelayEnabled true socket的NoDelay参数
cacheEnabled true 如果不断重复的值进行缓存,以便少编组(马上要进行的发送)发生
tightEncodingEnabled true 根据CPU使用情况,自动调整传输内容大小(压缩比例)
prefixPacketSize true 在每个包被编组前(马上要被发送),每个包的大小是否应该作为前缀连接的最大空闲时间,以毫秒为单位。
maxInactivityDuration 30000 broker服务会根据配置关闭超时的连接。同时也可以通过心跳机制来保持连接。值<=0则禁用活动连接的监测。
maxlnactivityDurationInitalDelay 10000 连接建立之后,多久开始进行超时检测
cacheSize 1024 如果能被缓存,那么这个规定了缓存的最大数量。此属性中在ActiveMQ的4.1中开始添加使用可发送最大帧大小
maxFrameSize MAX_ LONG 可以帮助防止OOM DOS攻击

2.5 为什么使用MQTT协议

MQTT的结构简单,相对于其它消息协议,它更加轻量级。适合在计算能力有限、低带宽、不可靠的网络环境使用。

2.5.1 MQTT的发布订阅模型

2.5.2 MQTT服务质量

服务质量(QoS) 级别是一种关于发送者和接收者之间信息投递的保证协议。MQTT中有三种QoS级别:

  1. 至多一次(0)
  2. 至少一次(1)
  3. 只有一次(2)

QoS是MQTT的一个主要功能,它使得在不可靠的网络下进行通信变得更为简单,因为即便是在非常不可靠的网络下,协议也可以掌控是否需要重发消息并保证消息到达。它也能帮助客户端根据网络环境和程序逻辑来自由选择QoS.

2.5.3 ActiveMQ中如何使用MQTT协议

ActiveMQ 服务器端配置

<transportConnectors>
	<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
	<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize= 104857600"/>
</transportConnectors>

MQTT配置参数说明

属性 默认值 描述
maxFrameSize 268435456 (v5.12.0)可以发送的最大帧大小。协议限制为256MB,其值不能设置得更高。可以帮助防止OOM DOS攻击

配置示例

<transportConnector name="mqtt" uri="mtt://localhost:1883?wireFormat.maxFrameSize=100000"/>

MQTT使用NIO

<transportConnector name="mgtt+nio" uri="mtt+nio://localhost:1883"/>

MQTT使用NIO + SSL

<transportConnector name="mqtt+nio" uri="mqtt+niossl://localhost:1883"/>

2.5.4 Spring使用MQTT

Spring Integration提供了MQTT协议的支持,通过Maven添加依赖即可使用。

<dependency>
    <groupld>org.springframework.integration</groupld>
    <artifactld>spring-integration-mqtt</artifactld>
    <version>5.1.1.RELEASE</version>
</dependency>

2.6 AUTO协议是什么

AUTO自动检测协议从ActiveMQ 5.13.0开始,ActiveMQ 开始支持协议格式检测,可以自动检测OpenWire、STOMP、 AMQP和MQTT。允许这4种类型的客户端共享一个传输

AUTO使用TCP

<transportConnector name="auto" uri="auto://localhost:5671"/>

AUTO使用SSL

<transportConnector name="auto+ssl" uri="autssl://localhost:5671"/>

AUTO使用NIO

<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>

AUTO使用NIO+SSL

<transportConnector name="auto+nio+ssl" uri="auto+nioss://localhost:5671"/>

3、ActiveMQ高可用集群方案

3.1 ActiveMQ有哪些集群部署方式

3.2 Master-Slave部署方式

共享同一个文件系统

共享同一个数据库

3.3 Broker-Cluster部署方式

3.4 Master-Slave与Broker-Cluster相结合的部署方式

4、ActiveMQ持久化机制

4.1 Queue类型的持久化机制

4.2 Topic类型的持久化机制

4.3 存储方式

4.3.1 JDBC方式

将消息存储到数据库中,例如: Mysql、 SQL Server、Oracle、 DB2等

优点 缺点
方便管理 性能低
可以支持强一致性 /

4.3.2 AMQ方式

基于文件的存储方式,它具有写入速度快和容易恢复的特点,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。

优点 缺点
性能高于JDBC 索引占用磁盘空间量大
/ 重建索引速度非常慢

4.3.3 LevelDB方式

LevelDB是Google开发的一套用于持久化数据的高性能类库.LevelDB并不是一-种服务,用户需要自行实现Server。是单进程的服务,能够处理十亿级别规模Key-Value型数据,占用内存小。

特点

  1. 基于K-V存储
  2. Key值有序存储
  3. 操作接口简单
  4. 支持数据快照
  5. 支持数据压缩

L evelDB的结构

ActiveMQ配置LevelDB:修改配置文件${ACTIVEMQ_ HOME}/conf/activemq.xml

<persistenceAdapter>
	<levelDB directory="${activemq.data}/activemq-data"/>
</persistenceAdapter>

5、ActiveMQ事务实现机制

ActiveMQ事务实现的是最终一致性

生产者端实现机制如下:

消费者端实现机制如下:

0人推荐
随时随地看视频
慕课网APP