手记

【九月打卡】第9天 创建生产者和消费者

一、课程介绍
【课程名称】SpringBoot 2.x 实战仿B站高性能后端项目。
【章节】第三章3.27、3.28 生产者和消费者(一)(二)
【讲师】HELLOSTAR

二、课程内容
1.生产者和消费者的配置文件的编写

  • 生产者:

    ----字段:producerGroup
    生产者的分组名称。相同的分组名称表明生产者实例在概念上归属于同一分组。这对事务消息十分重要,如果原始生产者在事务之后崩溃,那么broker可以联系同一生产者分组的不同生产者实例来提交或回滚事务。
    ----设置服务器地址 setNamesrvAddr
    集群环境多个nameserver用;分割

  • 消费者
    ----字段:producerGroup
    消费者的分组名称
    ----设置服务器地址 setNamesrvAddr
    集群环境多个nameserver用;分割
    ----订阅topic主题 subscribe
    ----注册消息监听器

//生产者
    @Bean("momentsProducer")
    public DefaultMQProducer momentsProducer() throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);
        producer.setNamesrvAddr(nameServerAddr);
        producer.start();
        return producer;
    }
    //消费者
    @Bean("momentsConsumer")
    public DefaultMQPushConsumer momentsConsumer() throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);
        consumer.setNamesrvAddr(nameServerAddr);
        consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS, "*");
        //消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
                for (MessageExt msg:msgs){
                    System.out.println(msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        return consumer;
    }

其中MQ服务器地址的配置可以使用@value注解在spring配置文件中保存

//MQ服务器地址
    @Value("${rocketmq.name.server.address}")
    private String nameServerAddr;

2.实现发送消息工具的MQ工具类

  • 同步发送消息
    只有消息被成功接收并且被固化完成后才会收到反馈。 内置有重发机制, producer将会重试。
  • 异步发送消息
    消息发送后,立即返回。broker处理返程后, 触发sendCallback回调方法。发送成功和失败做相应处理。
//同步发送消息
    public static void syncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{
        SendResult result = producer.send(msg);
        System.out.println(result);
    }
    //异步发送消息
    public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                Logger logger = LoggerFactory.getLogger(RocketMQUtil.class);
                logger.info("异步发送消息成功,消息id:" + sendResult.getMsgId());
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
    }

三、课程收获
生产者和消费者的初始化分别使用DefaultMQProducer和DefaultMQPushConsumer(自动推送消费者);
消息发送分为同步和异步发送,异步发送需要处理回调方法。
生产者发送消息给消费者,消费者通过消息监听器监听到消息后做进一步处理。并返回状态。

四、学习过程
·

0人推荐
随时随地看视频
慕课网APP