手记

老司机带你入门RabbitMQ消息中间件

ActiveMQ(性能一般)

  • ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线,并且它一个完全支持JMS规范的消息中间件。
  • 其丰富的API、多种集群构建模式使得他成为业界老牌消息中间件,在中小型企业中应用广泛!
  • MQ衡量指标:服务性能(一般,如果对并发要求不是特别大,可以考虑使用)、数据存储、集群架构
  • 特点:
    - 1、支持多种语言编写客户端
    - 2、对spring的支持,很容易和spring整合
    - 3、支持多种传输协议:TCP,SSL,NIO,UDP等
    - 4、支持AJAX
  • 消息形式:
    - 1、点对点(queue)
    - 2、一对多(topic)

Kafka(高性能)

Kafka是Linkedln开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
kafka使用page cache进行文件存储,进而实现高性能读写。关注性能,可靠性关注不高

RockerMQ(收费)

RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache顶级项目,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景

  • 维护是一个痛点,需要专门的团队
  • 特点
  • 可以保证消息顺序性
  • 提供丰富的消息拉取和推送的模式
  • 高效的水平扩展
  • 多种架构模式可供选择:双Master,Msater-Slave,2m2s,多主多从
  • 同步双写,异步复制,存储方式,零拷贝
  • 分布式事务,主从自动切换

RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

RabbitMQ与AMQP协议(Advanced Message Queuing Protocol高级消息队列协议)

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
Erlang的优点:Erlang有着和原生Socket一样的延迟

  • 可靠性消息投递模式(confirm),返回模式(return)
  • 与SpringAMQP完美的整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
Erlang的优点:Erlang有着和原生Socket一样的延迟


AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

核心概念

Server:又称Broker,接受客户端的连接,实现AMQP实体服务
Connection:连接,应用程序与Broker的网络连接
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。
Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容。
Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个VirtualHost 里面不能有相同名称的Exchange或Queue
Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列

Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key

Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息

Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

消息生产与消费

交换机详解

队列,绑定,虚拟主机,消息

高级特性

迷思?

  • 消息如何保障100%的投递成功?
  • 幂等性概念详解
  • 在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
  • Confirm确认消息,Return返回消息

什么是生产端的可靠性投递?

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善的消息进行补偿机制
    方案一:

对于大规模并发场景下,加事务不是很好的方案

缺点:两次持久化操作,业务入库就可以,消息入库确实没有必要(性能优先)
失败:快速失败机制。当消息发送失败,可以设置时间间隔和重发次数然后进行重发机制或者是记录错误状态。
方案二:消息的延迟投递,做二次确认,回调检查

  • 数据库操作完成(入库)再发送消息
  • 互联网大厂不加任何事务
  • 第四步是生成的新的消息,不是使用旧的数据
  • Callback作为一个补偿服务

幕等性

我们可以借鉴数据库的乐观锁机制:
比如我们执行一条更新库存的SQL语句

UPDATE T REPS SET COUNT=COUNT-,1,VERSION=
VERSION+1
WHERE VERSION=1

消费端幕等性保障

消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息

业界主流的幂等性操作:

  • 唯一ID+指纹码机制,利用数据库主键去重
    • SELECT COUNT(1)FROM T ORDER WHERE ID=唯一ID+指纹码
    • 好处:实现简单
    • 坏处:高并发下有数据库写入的性能瓶颈
    • 解决方案:跟进ID进行分库分表进行算法路由
  • 利用Redis的原子性去实现
    • 使用Redis进行幂等,需要考虑的问题
    • 第一:我们是否要进行数据落库,如果落库的话,关键解决的
    • 问题是数据库和缓存如何做到原子性?
    • 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?

Confirm 确认消息

消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!

  • 第一步:在channel上开启确认模式:channel.confirmSelect()
  • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

Reutrn消息机制

  • Return Listener 用于处理一些不可路由的消息!
  • 我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
  • 但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener!
  • 在基础API中有一个关键的配置项:
  • Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!

消费端自定义监听

我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理!
但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!

消费端限流

假设一个场景,首先,我们Rabbitmq服务器有上万条未处理的消息,
我们随便打开一个消费者客户端,会出现下面情况:
巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这
么多数据!


RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。

void BasicQos(uint prefetchSize(消息大小),ushort prefetchCount(一次处理多少消息,一般为1),bool global(限流在什么时候应用的)); 
  • prefetchSize:0
  • prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
  • global:true\false 是否将上面设置应用于channel简单点说,就是上面限制是channel级别的还是consumer级别
  • prefetchSize 和globa这两项,rabbitmq没有实现,暂且不研究prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。

消息的ACK与重回队列

消息的限流

TTL消息

死信队列

6人推荐
随时随地看视频
慕课网APP

热门评论

 期待更新...

继续写呀 大佬

查看全部评论