手记

手写RocketMQ:从入门到实践的简单教程

概述

本文将详细介绍RocketMQ的基本概念、特点和应用场景,并指导如何手写搭建RocketMQ环境、创建消息生产者和消费者。同时,文章深入讲解RocketMQ的消息模型与消息队列管理,提供实践案例及常见问题的解决方案,帮助读者更好地理解和实现RocketMQ。

RocketMQ简介
RocketMQ的基本概念

RocketMQ是由阿里巴巴开源的一款高吞吐量的分布式消息中间件,设计用于大规模分布式系统中的消息发布与订阅。RocketMQ具备低延迟、高可用和高并发的特点,在海量消息堆积和高并发场景下稳定运行。RocketMQ兼容JMS、JMX、JDBC等多种协议,支持无缝接入现有应用架构。

RocketMQ的特点和优势
  • 高吞吐量:RocketMQ能够支持每秒数万消息的吞吐量,适用于大规模分布式系统。
  • 高性能:RocketMQ具备极低的消息延迟,可以达到毫秒级别的消息推送。
  • 高可用性:RocketMQ采用主从复制机制,确保消息可靠传输。当主节点发生故障时,可以从从节点接管任务。
  • 灵活的消息模型:RocketMQ支持多种消息模型,包括顺序消息、定时消息和批量消息等。
  • 丰富的客户端工具:RocketMQ提供了多种语言的客户端,如Java、Python、C++等,方便不同语言的应用集成。
RocketMQ的应用场景
  • 电商交易:在订单创建、支付通知和物流更新等场景中,RocketMQ确保消息的可靠传输。
  • 金融交易:在支付和证券交易等高并发场景中,RocketMQ的高性能和高可用性尤为重要。
  • 日志采集:在日志采集和集中处理场景中,RocketMQ能够高效传输大量日志数据。
  • 实时计算:在实时计算与数据流处理场景中,RocketMQ支持实时消息的高效传输。
  • 系统解耦:在系统解耦场景中,RocketMQ可以作为消息总线,实现不同服务之间的松耦合。
环境搭建
操作系统和Java环境的要求
  • 操作系统:RocketMQ支持Windows、Linux、macOS等多种操作系统。
  • Java环境:RocketMQ需要Java 1.8及以上版本。确保Java环境已正确安装,并且java -version命令可以正常输出Java版本信息。
下载RocketMQ源码
  1. 访问RocketMQ的GitHub仓库页面:https://github.com/apache/rocketmq
  2. 点击“Code”按钮,选择“Download ZIP”功能,下载压缩包。
  3. 解压压缩包后,进入解压后的目录,例如:
    cd rocketmq
搭建开发环境
  1. 编译源码:使用Maven编译RocketMQ源码。
    mvn clean install -DskipTests
  2. 启动NameServer:RocketMQ的消息路由中心,NameServer负责维护Broker的元数据信息。
    nohup sh bin/mqnamesrv &
  3. 启动Broker:RocketMQ的消息存储和传输节点,需要启动至少一个Broker实例。
    nohup sh bin/mqbroker -n localhost:9876 &
  4. 验证启动:可以使用以下命令验证NameServer和Broker是否启动成功。
    ps aux | grep mqnamesrv
    ps aux | grep mqbroker

    如果输出中包含相关进程,则启动成功。

  5. 日志查看:RocketMQ的日志输出在logs目录下,可以查看详细日志信息。
    tail -f logs/rocketmq.log
手写消息生产者
创建消息生产者实例

消息生产者负责向特定主题发送消息,首先需要创建一个消息生产者实例。

import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class MessageProducer {
    public void createMessageProducer() {
        // 创建生产者实例,参数为ProducerGroup名称
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");

        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");

        // 启动生产者实例
        try {
            producer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

参数 ProducerGroup 是用于标识一组生产者的标识符,用于控制生产和消息的分发策略。

创建消息发送者

创建消息发送者实例后,需要配置消息的详细信息,如主题、消息体等。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class MessageProducer {
    public void sendMessage() {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
        producer.setNamesrvAddr("localhost:9876");

        try {
            producer.start();

            // 创建消息实体
            Message msg = new Message("TestTopic", // topic
                                      "TagA",      // tag
                                      "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // body

            // 发送消息
            SendResult sendResult = producer.send(msg);
            System.out.println("消息发送成功:" + sendResult);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭生产者实例
            producer.shutdown();
        }
    }

    public static void main(String[] args) {
        MessageProducer producer = new MessageProducer();
        producer.sendMessage();
    }
}
消息发送的实现

通过调用producer.send()方法,将消息发送到指定的主题。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class MessageProducer {
    public void sendMessage() {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
        producer.setNamesrvAddr("localhost:9876");

        try {
            producer.start();

            Message msg = new Message("TestTopic", // topic
                                      "TagA",      // tag
                                      "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // body

            SendResult sendResult = producer.send(msg);
            System.out.println("消息发送成功:" + sendResult);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }

    public static void main(String[] args) {
        MessageProducer producer = new MessageProducer();
        producer.sendMessage();
    }
}
手写消息消费者
创建消息消费者实例

消息消费者负责接收和处理消息,首先需要创建一个消息消费者实例。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class MessageConsumer {
    public void createMessageConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");

        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅指定主题的消息
        consumer.subscribe("TestTopic", "*");

        // 设置从队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("接收到新消息: %s %s", msg.getTopic(), msg.getBody());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者实例
        try {
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

参数 ConsumerGroup 是用于标识一组消费者订阅者的标识符,用于控制消费和消息的分发策略。

创建消息监听器

消息监听器是消息处理的核心逻辑所在,负责处理接收到的消息。

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class MessageConsumer {
    public void createMessageListener() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TestTopic", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("接收到消息: %s %s", msg.getTopic(), msg.getBody());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        try {
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        MessageConsumer consumer = new MessageConsumer();
        consumer.createMessageListener();
    }
}
消息接收的实现

通过调用consumer.start()方法,启动消费者实例,并开始接收消息。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class MessageConsumer {
    public void startConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TestTopic", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("接收到消息: %s %s", msg.getTopic(), msg.getBody());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        try {
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        MessageConsumer consumer = new MessageConsumer();
        consumer.startConsumer();
    }
}
消息模型与消息队列管理
消息模型介绍

RocketMQ支持多种消息模型,包括单向消息、发布/订阅消息和顺序消息等。

  • 单向消息:消息发送后,不等待确认消息返回,是一种尽力送达的消息模式。
  • 发布/订阅消息:支持多个消费者订阅,实现负载均衡。
  • 顺序消息:消息按照发送顺序进行消费,确保消息的顺序性。
消息队列的创建与管理

消息队列的创建与管理是RocketMQ的重要功能之一,可以通过以下步骤来操作:

  1. 创建主题:通过调用CreateTopicRequest创建新的主题。
  2. 查询主题:通过调用QueryTopicRequest查询现有主题信息。
  3. 删除主题:通过调用DeleteTopicRequest删除已创建的主题。
  4. 修改主题:通过调用UpdateTopicRequest修改主题的属性。
    
    import org.apache.rocketmq.client.admin.ConsumeStats;
    import org.apache.rocketmq.client.admin.SendStats;
    import org.apache.rocketmq.client.admin.SubscriptionGroupStats;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.admin.ConsumeStats;
    import org.apache.rocketmq.common.admin.SendStats;
    import org.apache.rocketmq.common.admin.SubscriptionGroupStats;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class TopicManagement {
public void manageTopic() {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");

    try {
        producer.start();

        // 创建主题
        CreateTopicRequest request = new CreateTopicRequest();
        request.setTopic("TestTopic");
        request.setTopicSysFlag(0);
        request.setReadQueueNums(8);
        request.setWriteQueueNums(8);
        request.setPerm(0);
        request.setTopicFilterType(MessageModel.BROADCASTING);

        // 查询主题
        TopicList topicList = new TopicList();
        topicList.setTopicName("TestTopic");
        topicList.setBrokerName("BrokerName");

        // 删除主题
        DeleteTopicRequest deleteRequest = new DeleteTopicRequest();
        deleteRequest.setTopic("TestTopic");

        // 修改主题
        UpdateTopicRequest updateRequest = new UpdateTopicRequest();
        updateRequest.setTopic("TestTopic");
        updateRequest.setTopicSysFlag(0);
        updateRequest.setReadQueueNums(16);
        updateRequest.setWriteQueueNums(16);
        updateRequest.setTopicFilterType(MessageModel.BROADCASTING);

        // 查询消费状态
        ConsumeStats result = producer.getDefaultMQProducerImpl().getAdmin().queryConsumeStats("TestTopic");
        System.out.println(result);

        // 查询发送状态
        SendStats sendResult = producer.getDefaultMQProducerImpl().getAdmin().querySendStats("TestTopic");
        System.out.println(sendResult);

        // 查询订阅组状态
        SubscriptionGroupStatsResult statsResult = producer.getDefaultMQProducerImpl().getAdmin().querySubscriptionGroupStats("ConsumerGroup");
        System.out.println(statsResult);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        producer.shutdown();
    }
}

public static void main(String[] args) {
    TopicManagement management = new TopicManagement();
    management.manageTopic();
}

}


## 消息路由与负载均衡
RocketMQ的消息路由与负载均衡机制保证了消息的可靠传输与高效消费。
- **消息路由**:消息路由主要由NameServer和Broker实现,NameServer负责维护Broker的元数据信息,Broker负责消息的存储与传输。
- **负载均衡**:通过消息队列的分发与负载均衡机制,确保消息在多个消费者之间均匀分布。

# 实践案例与常见问题解决
## 实际开发中的案例讲解
在实际开发中,RocketMQ经常用于分布式系统的消息传输,如订单系统、支付系统等。
```java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class OrderProducer {
    public void sendOrderMessage() {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
        producer.setNamesrvAddr("localhost:9876");

        try {
            producer.start();

            Message msg = new Message("OrderTopic", // topic
                                      "TagOrder",   // tag
                                      "订单消息".getBytes(RemotingHelper.DEFAULT_CHARSET)); // body

            SendResult sendResult = producer.send(msg);
            System.out.println("订单消息发送成功:" + sendResult);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }

    public static void main(String[] args) {
        OrderProducer producer = new OrderProducer();
        producer.sendOrderMessage();
    }
}
常见问题与解决方案
  • 生产者启动失败:检查Java环境是否正确安装,确保NameServer地址配置正确。
  • 消息未被消费:检查消费者是否启动成功,确认消费者是否订阅了正确的主题。
  • 消息发送失败:确认Broker是否正常运行,检查网络连接是否通畅。
性能优化技巧
  • 批处理发送:使用批量发送消息,减少网络交互次数,提高发送效率。
  • 异步发送:使用异步发送消息,避免阻塞主线程,提升系统响应速度。
  • 消息压缩:对消息内容进行压缩,减少传输开销,提高网络传输效率。
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;

public class PerformanceOptimization {
public void optimizedSendMessage() {
DefaultMQProducer producer = new DefaultMQProducer("OptimizedProducerGroup");
producer.setNamesrvAddr("localhost:9876");

    try {
        producer.start();

        // 批量发送消息
        Message[] msgs = new Message[10];
        for (int i = 0; i < 10; i++) {
            msgs[i] = new Message("OptimizedTopic", // topic
                                  "TagOptimized",  // tag
                                  ("优化消息 " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // body
        }

        // 异步发送消息
        SendResult[] results = producer.send(msgs);
        for (SendResult result : results) {
            System.out.println("消息发送成功:" + result);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        producer.shutdown();
    }
}

public static void main(String[] args) {
    PerformanceOptimization optimization = new PerformanceOptimization();
    optimization.optimizedSendMessage();
}

}

0人推荐
随时随地看视频
慕课网APP