手记

MQ消息中间件资料详解:新手入门教程

概述

MQ消息中间件是一种软件系统,它位于发送消息的应用程序和接收消息的应用程序之间,提供标准化的消息传递机制。本文将详细介绍MQ消息中间件的作用、优势、常见类型,以及安装、配置和使用教程等详细内容,帮助读者全面了解MQ消息中间件。

MQ消息中间件简介

什么是MQ消息中间件

MQ消息中间件是一种软件系统,它位于发送消息的应用程序和接收消息的应用程序之间,提供了一种标准化的消息传递机制。消息队列(Message Queue)允许应用程序即使在不同的平台或网络上也能相互通信,而不必处理通信细节。

MQ消息中间件的作用和优势

MQ消息中间件的主要作用是提供异步通信机制,解耦应用程序的不同部分,从而提高系统的可扩展性和可靠性。以下是MQ消息中间件的一些主要优势:

  1. 解耦性:消息发送者和接收者不需要同时在线,这使得系统更加灵活,并且更容易扩展。
  2. 负载均衡:通过分发消息到多个消费者,可以有效地分配处理负载,提高性能。
  3. 重试逻辑:通过消息队列可以实现消息的自动重试机制,确保消息不会丢失。
  4. 可靠性:消息中间件通常提供持久化存储和消息确认机制,确保消息的可靠传递。

常见的MQ消息中间件类型

常见的MQ消息中间件包括RabbitMQ、ActiveMQ、Kafka和RocketMQ等。每种消息中间件都有其特定的特点和应用场景:

  • RabbitMQ:一个实现了高级消息队列协议(AMQP)的开源消息代理,支持多种消息传递模式,包括点对点、发布/订阅等。
  • ActiveMQ:Apache软件基金会的一个项目,实现了JMS API,支持多种传输协议,包括TCP、NIO和SSL。
  • Kafka:由LinkedIn开发,后开源,以其高吞吐量和持久化数据的能力而闻名,主要用于日志聚合和流处理。
  • RocketMQ:阿里巴巴开源的一个分布式消息中间件,具有高吞吐量、低延迟和高可用性等特点。
MQ消息中间件的安装与配置

选择合适的MQ消息中间件

选择合适的MQ消息中间件取决于项目的需求。例如,如果需要实时处理大量数据流,Kafka可能是一个好选择;如果需要一个易于使用且功能全面的消息代理,RabbitMQ可能更适合。以下是一些选择MQ消息中间件时需要考虑的因素:

  • 性能要求:消息传递的速度和吞吐量。
  • 功能需求:例如,是否需要持久化消息、事务支持、消息分发策略等。
  • 社区支持:选择一个有活跃社区和广泛支持的中间件。

下载与安装过程详解

这里以RabbitMQ为例,展示如何下载和安装MQ消息中间件。

下载RabbitMQ

  • 访问RabbitMQ官网,选择合适的版本进行下载。
  • 对于Linux系统,可以使用包管理器安装:

    # 对于Ubuntu/Debian系统
    sudo apt-get update
    sudo apt-get install rabbitmq-server
    
    # 对于CentOS/RHEL系统
    sudo yum install rabbitmq-server
  • 对于Windows系统,下载Windows二进制分发包,解压后双击运行rabbitmq-service.bat安装服务。

验证安装

安装完成后,可以通过以下命令验证是否安装成功:

rabbitmqctl status

如果输出显示RabbitMQ正在运行,则说明安装成功。

基本配置步骤

RabbitMQ的配置文件通常位于/etc/rabbitmq/rabbitmq.conf。可以编辑该文件进行配置,例如设置监听端口、启用管理插件等。

启用管理插件

默认情况下,RabbitMQ的管理插件是禁用的。可以通过以下命令启用:

rabbitmq-plugins enable rabbitmq_management

然后,可以通过浏览器访问http://<RabbitMQ服务器IP>:15672来管理RabbitMQ。

下载与安装ActiveMQ

  • 访问Apache ActiveMQ官网下载ActiveMQ。
  • 对于Linux系统,可以使用以下命令安装:

    wget https://downloads.apache.org/activemq/5.16.3/apache-activemq-5.16.3-bin.tar.gz
    tar -xvzf apache-activemq-5.16.3-bin.tar.gz
    cd apache-activemq-5.16.3
    ./bin/activemq start
  • 对于Windows系统,下载Windows安装包,解压后双击运行activemq.bat启动服务。

验证安装

启动后,可以通过浏览器访问http://<ActiveMQ服务器IP>:8161/admin来管理ActiveMQ。

下载与安装Kafka

  • 访问Kafka官网下载Kafka。
  • 对于Linux系统,可以使用以下命令安装:
    wget https://downloads.apache.org/kafka/2.13.0/kafka_2.13-2.8.0.tgz
    tar -xvzf kafka_2.13-2.8.0.tgz
    cd kafka_2.13-2.8.0
  • 对于Windows系统,下载Windows安装包,解压后双击运行bin\windows\kafka-server-start.bat启动服务。

验证安装

启动后,可以通过以下命令验证是否安装成功:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

下载与安装RocketMQ

  • 访问RocketMQ官网下载RocketMQ。
  • 对于Linux系统,可以使用以下命令安装:

    wget https://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.9.3/apache-rocketmq-4.9.3-bin.tar.gz
    tar -xvzf apache-rocketmq-4.9.3-bin.tar.gz
    cd apache-rocketmq-4.9.3
    sh bin/mqadmin.sh startbroker -n localhost:9876
  • 对于Windows系统,下载Windows安装包,解压后双击运行bin\mqadmin.bat启动服务。

验证安装

启动后,可以通过以下命令验证是否安装成功:

sh bin/mqadmin.sh brokerList -n localhost:9876
MQ消息中间件的核心概念

消息队列与消息主题

消息队列(Message Queue)是消息中间件的核心概念之一,用于存储和传输消息。生产者将消息发送到队列,消费者从队列中获取并处理消息。

消息主题(Message Topic)通常与发布/订阅模型相关,生产者将消息发送到主题,多个订阅者可以订阅该主题并接收消息。

生产者与消费者的角色

  • 生产者:负责生成消息并将其发送到消息队列或主题。
  • 消费者:从消息队列或主题中接收消息并处理这些消息。

以下是一个简单的RabbitMQ示例,展示了生产者和消费者的交互:

# 生产者代码示例
import pika

def send_message():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()

send_message()

# 消费者代码示例
import pika

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

def consume_message():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    print('Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

consume_message()

同样的概念在ActiveMQ中:

# 生产者代码示例
import jms

connection = jms.Connection('tcp://localhost:61616')
connection.start()
session = connection.create_session()
destination = session.create_queue('hello')
producer = session.create_producer(destination)
producer.send(jms.TextMessage('Hello, ActiveMQ!'))
connection.close()

# 消费者代码示例
import jms

connection = jms.Connection('tcp://localhost:61616')
connection.start()
session = connection.create_session()
destination = session.create_queue('hello')
consumer = session.create_consumer(destination)
while True:
    message = consumer.receive()
    if message:
        print(f"Received message: {message.text}")
    else:
        break
connection.close()

同样的概念在Kafka中:

# 生产者代码示例
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
future = producer.send('hello', b'Hello, Kafka!')
future.get()
producer.close()

# 消费者代码示例
from kafka import KafkaConsumer

consumer = KafkaConsumer('hello', bootstrap_servers='localhost:9092')
for message in consumer:
    print(f"Received message: {message.value}")
    break
consumer.close()

同样的概念在RocketMQ中:

# 生产者代码示例
from rocketmq import Producer, Message

producer = Producer('ProducerGroup')
producer.set_name_server_address('localhost:9876')
producer.start()
message = Message('TopicTest', 'TagTest', 'Hello RocketMQ')
producer.send_sync(message)
producer.shutdown()

# 消费者代码示例
from rocketmq import Consumer, MessageModel

consumer = Consumer('ConsumerGroup')
consumer.set_name_server_address('localhost:9876')
consumer.subscribe('TopicTest', 'TagTest', callback)
consumer.start()
while True:
    pass

事务处理与消息确认

事务处理确保消息在发送到队列后才被标记为已发送,防止消息丢失。消息确认机制确保消息只有在消费者成功处理后才从队列中移除。

以下是一个使用事务处理和消息确认的示例:

# 生产者代码示例(事务处理)
import pika

def send_message():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    channel.tx_select()  # 开始事务
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
    channel.tx_commit()  # 提交事务
    print(" [x] Sent 'Hello World!'")
    connection.close()

send_message()

# 消费者代码示例(消息确认)
import pika

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag) . # 确认消息

def consume_message():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)
    print('Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

consume_message()

在ActiveMQ中:

# 生产者代码示例
import jms

connection = jms.Connection('tcp://localhost:61616')
connection.start()
session = connection.create_session()
destination = session.create_queue('hello')
producer = session.create_producer(destination)
producer.send(jms.TextMessage('Hello, ActiveMQ!'))
connection.commit()

# 消费者代码示例
import jms

connection = jms.Connection('tcp://localhost:61616')
connection.start()
session = connection.create_session()
destination = session.create_queue('hello')
consumer = session.create_consumer(destination)
message = consumer.receive()
if message:
    print(f"Received message: {message.text}")
connection.commit()

在Kafka中:

# 生产者代码示例
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
future = producer.send('hello', b'Hello, Kafka!')
future.get()
producer.flush()

# 消费者代码示例
from kafka import KafkaConsumer

consumer = KafkaConsumer('hello', bootstrap_servers='localhost:9092')
for message in consumer:
    print(f"Received message: {message.value}")
    break
consumer.close()

在RocketMQ中:

# 生产者代码示例
from rocketmq import Producer

producer = Producer('ProducerGroup')
producer.set_name_server_address('localhost:9876')
producer.start()
message = Message('TopicTest', 'TagTest', 'Hello RocketMQ')
producer.send(message)
producer.shutdown()

# 消费者代码示例
from rocketmq import Consumer

consumer = Consumer('ConsumerGroup')
consumer.set_name_server_address('localhost:9876')
consumer.subscribe('TopicTest', 'TagTest', callback)
consumer.start()
while True:
    pass
MQ消息中间件的简单使用教程

创建并发送消息

创建并发送消息的基本步骤如下:

  1. 连接到消息中间件:建立与消息中间件的连接。
  2. 创建或声明队列:确保队列已经存在,如果没有就创建新的队列。
  3. 发送消息:将消息发送到队列。

以下是一个使用Python发送消息到RabbitMQ的示例:

import pika

def send_message(message):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test_queue')
    channel.basic_publish(exchange='', routing_key='test_queue', body=message)
    print(f"Sent message: {message}")
    connection.close()

send_message("Hello, RabbitMQ!")

同样的步骤在ActiveMQ中:

# 发送消息到ActiveMQ
import jms

connection = jms.Connection('tcp://localhost:61616')
connection.start()
session = connection.create_session()
destination = session.create_queue('test_queue')
producer = session.create_producer(destination)
producer.send(jms.TextMessage('Hello, ActiveMQ!'))
connection.close()

同样的步骤在Kafka中:

# 发送消息到Kafka
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
future = producer.send('test_queue', b'Hello, Kafka!')
future.get()
producer.close()

同样的步骤在RocketMQ中:

# 发送消息到RocketMQ
from rocketmq import Producer, Message

producer = Producer('ProducerGroup')
producer.set_name_server_address('localhost:9876')
producer.start()
message = Message('TopicTest', 'TagTest', 'Hello RocketMQ')
producer.send_sync(message)
producer.shutdown()

接收并处理消息

接收并处理消息的基本步骤如下:

  1. 连接到消息中间件:建立与消息中间件的连接。
  2. 声明队列:确保队列已经存在。
  3. 接收消息:从队列中接收消息并处理。

以下是一个使用Python接收并处理消息的示例:

import pika

def callback(ch, method, properties, body):
    print(f"Received message: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_message():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test_queue')
    channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=False)
    print('Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

consume_message()

同样的步骤在ActiveMQ中:

# 接收并处理消息
import jms

connection = jms.Connection('tcp://localhost:61616')
connection.start()
session = connection.create_session()
destination = session.create_queue('test_queue')
consumer = session.create_consumer(destination)
while True:
    message = consumer.receive()
    if message:
        print(f"Received message: {message.text}")
    else:
        break
connection.close()

同样的步骤在Kafka中:

# 接收并处理消息
from kafka import KafkaConsumer

consumer = KafkaConsumer('test_queue', bootstrap_servers='localhost:9092')
for message in consumer:
    print(f"Received message: {message.value}")
    break
consumer.close()

同样的步骤在RocketMQ中:

# 接收并处理消息
from rocketmq import Consumer, MessageModel

consumer = Consumer('ConsumerGroup')
consumer.set_name_server_address('localhost:9876')
consumer.subscribe('TopicTest', 'TagTest', callback)
consumer.start()
while True:
    pass

异常处理与调试技巧

在处理消息时,可能会遇到各种异常情况,如网络错误、消息格式错误等。异常处理和调试技巧对于确保系统稳定性非常重要。

以下是一个处理异常的示例:

import pika

def callback(ch, method, properties, body):
    try:
        print(f"Received message: {body.decode()}")
    except Exception as e:
        print(f"Error processing message: {e}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_message():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test_queue')
    channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=False)
    print('Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

consume_message()

同样的处理方式在ActiveMQ中:

# 异常处理
import jms

connection = jms.Connection('tcp://localhost:61616')
connection.start()
session = connection.create_session()
destination = session.create_queue('test_queue')
consumer = session.create_consumer(destination)
try:
    while True:
        message = consumer.receive()
        if message:
            print(f"Received message: {message.text}")
except Exception as e:
    print(f"Error processing message: {e}")
connection.close()

同样的处理方式在Kafka中:

# 异常处理
from kafka import KafkaConsumer

consumer = KafkaConsumer('test_queue', bootstrap_servers='localhost:9092')
try:
    for message in consumer:
        print(f"Received message: {message.value}")
        break
except Exception as e:
    print(f"Error processing message: {e}")
consumer.close()

同样的处理方式在RocketMQ中:

# 异常处理
from rocketmq import Consumer, MessageModel

def callback(message, context):
    try:
        print(f"Received message: {message}")
    except Exception as e:
        print(f"Error processing message: {e}")

consumer = Consumer('ConsumerGroup')
consumer.set_name_server_address('localhost:9876')
consumer.subscribe('TopicTest', 'TagTest', callback)
consumer.start()
while True:
    pass

调试技巧包括使用日志记录、断点调试等方法来追踪和分析问题。

MQ消息中间件的性能优化

资源分配与负载均衡

资源分配和负载均衡是提高MQ消息中间件性能的关键。合理地分配资源可以确保系统在高负载下也能稳定运行。

  • 增加队列:为不同的消息类型创建不同的队列,实现消息的分类处理。
  • 负载均衡:通过将消息分发到多个消费者,实现负载均衡。

在RabbitMQ中:

rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'

在ActiveMQ中:

  • 增加队列
    <queue physicalName="myQueue"/>
  • 负载均衡
    <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryManager>
                    <memoryUsage>
                        <percentage>50</percentage>
                    </memoryUsage>
                </memoryManager>
            </memoryUsage>
        </systemUsage>
    </systemUsage>

在Kafka中:

  • 增加队列
    num.partitions=10
  • 负载均衡
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 10

在RocketMQ中:

  • 增加队列
    topic=TopicTest
    queueNum=10
  • 负载均衡
    brokerRole=ASYNC_MASTER

消息持久化与备份

消息持久化确保即使在系统崩溃后,消息也不会丢失。备份则确保数据的安全性。

持久化消息

在RabbitMQ中,可以通过设置队列的durable属性来实现消息持久化:

channel.queue_declare(queue='test_queue', durable=True)

备份

可以通过配置定时备份日志文件、使用磁盘冗余等方式实现备份。

在ActiveMQ中:

  • 持久化消息
    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue="myQueue">
                    <pendingQueueMaxSize>20000</pendingQueueMaxSize>
                    <pendingQueueMemoryLimit>100mb</pendingQueueMemoryLimit>
                    <memoryLimit>1mb</memoryLimit>
                </policyEntry>
            </policyEntries>
        </policyMap>
    </destinationPolicy>
  • 备份
    bin/activemq backup

在Kafka中:

  • 持久化消息
    log.retention.hours=72
  • 备份
    bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --topic test

在RocketMQ中:

  • 持久化消息
    persistent=true
  • 备份
    bin/mqadmin brokerList -n localhost:9876 | grep "backup"

性能监控与调优策略

性能监控是调优的基础。通过监控关键指标,如消息传递速率、队列长度等,可以及时发现并解决问题。

监控工具

RabbitMQ提供了多种监控工具,如rabbitmqctl命令行工具、Web管理界面等。

调优策略

  • 调整队列参数:如设置合适的队列大小。
  • 优化网络配置:减少网络延迟,提高网络传输效率。
  • 使用消息压缩:减少传输的数据量。

在ActiveMQ中:

  • 监控工具
    • Web控制台http://<ActiveMQ服务器IP>:8161
    • JMX监控器jconsole
  • 调优策略
    • 调整队列参数
      <destinationPolicy>
          <policyMap>
              <policyEntries>
                  <policyEntry queue="myQueue">
                      <pendingQueueMaxSize>20000</pendingQueueMaxSize>
                      <pendingQueueMemoryLimit>100mb</pendingQueueMemoryLimit>
                      <memoryLimit>1mb</memoryLimit>
                  </policyEntry>
              </policyEntries>
          </policyMap>
      </destinationPolicy>
    • 优化网络配置
      <transportConnectors>
          <transportConnector name="openwire" uri="tcp://localhost:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
      </transportConnectors>

在Kafka中:

  • 监控工具
    • Kafka自带监控脚本bin/kafka-run-class.sh kafka.tools.JmxTool
    • Kafka Managerhttps://github.com/yahoo/kafka-manager
  • 调优策略
    • 调整队列参数
      log.retention.hours=72
    • 优化网络配置
      socket.timeout.ms=30000

在RocketMQ中:

  • 监控工具
    • RocketMQ控制台http://<RocketMQ服务器IP>:8080
    • JMX监控器jconsole
  • 调优策略
    • 调整队列参数
      topic=TopicTest
      queueNum=10
    • 优化网络配置
      brokerRole=ASYNC_MASTER
常见问题与解决方案

常见错误及其解决方法

常见的错误包括连接失败、消息丢失等。以下是一些解决方法:

  • 连接失败:检查网络配置,确保MQ消息中间件服务正在运行。
  • 消息丢失:检查消息持久化设置,确保消息被正确地保存到磁盘。

在RabbitMQ中:

  • 连接失败
    • 确保RabbitMQ服务正在运行。
    • 检查网络设置,确保网络连接正常。
  • 消息丢失
    • 检查队列的durable属性是否设置为true
    • 确保消息持久化设置正确。

在ActiveMQ中:

  • 连接失败
    • 确保ActiveMQ服务正在运行。
    • 检查网络设置,确保网络连接正常。
  • 消息丢失
    • 确保持久化设置正确。
    • 确保消息队列的配置正确。

在Kafka中:

  • 连接失败
    • 确保Kafka服务正在运行。
    • 检查网络设置,确保网络连接正常。
  • 消息丢失
    • 确保消息持久化设置正确。
    • 确保消息队列的配置正确。

在RocketMQ中:

  • 连接失败
    • 确保RocketMQ服务正在运行。
    • 检查网络设置,确保网络连接正常。
  • 消息丢失
    • 确保消息持久化设置正确。
    • 确保消息队列的配置正确。

高可用性与容错性设计

高可用性设计确保系统在单点故障的情况下仍能正常运行。容错性设计则是在发生错误时能够及时检测并恢复。

  • 故障转移:实现主从模式,主节点失败后自动切换到从节点。
  • 心跳检测:定期检测节点状态,确保节点之间的通信正常。

在RabbitMQ中:

  • 故障转移
    rabbitmq-queues -p / test
  • 心跳检测
    rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'

在ActiveMQ中:

  • 故障转移
    bin/activemq failover
  • 心跳检测
    bin/activemq heartbeat

在Kafka中:

  • 故障转移
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move kafka --new-topology
  • 心跳检测
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

在RocketMQ中:

  • 故障转移
    bin/mqadmin brokerList -n localhost:9876 | grep "slave"
  • 心跳检测
    bin/mqadmin brokerList -n localhost:9876 | grep "heartBeat"

安全性与权限管理

安全性是MQ消息中间件的重要方面。通过设置用户权限、限制访问等措施,可以提高系统的安全性。

设置用户权限

在RabbitMQ中,可以通过以下命令设置用户权限:

rabbitmqctl add_user myuser mypassword
rabbitmqctl set_permissions myuser ".*" ".*" ".*"

限制访问

通过配置防火墙规则,限制对外部网络的访问。

在ActiveMQ中:

  • 设置用户权限
    bin/activemq add-user myuser
  • 限制访问
    bin/activemq start --user=myuser --password=mypassword

在Kafka中:

  • 设置用户权限
    bin/kafka-acls.sh --add --allow-principal User:myuser --operation All --topic test
  • 限制访问
    bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name test --alter --add-config 'access.control.rules=type:deny, pattern:.*, host:*, operation:*, name:myuser'

在RocketMQ中:

  • 设置用户权限
    bin/mqadmin adduser -n myuser -p mypassword
  • 限制访问
    bin/mqadmin brokerList -n localhost:9876 | grep "security"

以上是对MQ消息中间件从入门到进阶的全面介绍,包括安装、配置、核心概念、使用教程、性能优化以及常见问题的解决方案。希望这些内容能够帮助您更好地理解和使用MQ消息中间件。

0人推荐
随时随地看视频
慕课网APP