手记

stream(3)stream+rocketMQ重构生产者、消费者

生产者:

写配置

    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 #找到borker
        bindings:
          output:
            producer:
              transactional: true #transactional/group实现stream事务
              group: tx-add-bonus-group
      bindings:
        output:
            destination:  add-bonus

在这段配置中rockerMQ中的bindings里面是开启stream的事务,而里面的group则是在

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

中定义的group,而在stream下面的bindings下定义的destination则是最后注册到MQ上面;


在事务监听器中,实现事务方法,这里只是编写执行本地事务,回查后续编写

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private ShareService shareService;

    @Resource
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {


        MessageHeaders headers = message.getHeaders();

        String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID).toString();
        Integer share_id = Integer.valueOf(headers.get("share_id").toString());

        //转换JSON字符串
        String dtoString = (String) headers.get("dto");
        ShareAuditDto dto = JSON.parseObject(dtoString, ShareAuditDto.class);
        try {
            //本地事务成功
            this.shareService.auditByIdWithRocketMqLog(share_id, dto,transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            //本地事务失败
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

修改shareService,对象以JSON字符串的形式发送

        //添加事务
        if (AuditStatusEnum.PASS.equals(shareAuditDto.getAuditStatusEnum())) {
            //发送半消息
            String transactionId = UUID.randomUUID().toString();
            //以下为使用stream的情况
            this.source.output().send(
                    MessageBuilder.withPayload(
                            UserAddBonusMsgDTO.builder()
                                    .id(share.getUserId())
                                    .bonus(50)
                                    .build()
                    )
                            //header也有妙用
                            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                            //这里吧DTO直接放到头里面,因为直接传对象会将这个对象直接toString,变成字符串穿进去,所以转换成JSON字符串
                            .setHeader("dto", JSON.toJSONString(shareAuditDto))
                            .setHeader("share_id", id)
                            .build()
            );
        }

消费者:

在配置文件中接收到add-bonus即可

    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 #找到borker
      bindings:
        input:
       destination: add-bonus
          group: test-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动  其他MQ:可留空

将业务代码搬迁至service并加上事务

@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
    //当收到消息的时候执行的业务
    //1、为用户加积分
    Integer id = userAddBonusMsgDTO.getId();
    Integer bonus = userAddBonusMsgDTO.getBonus();
    User user = this.userMapper.selectByPrimaryKey(id);
    user.setBonus(user.getBonus() + bonus);
    this.userMapper.updateByPrimaryKeySelective(user);
    //2、记录日志到bonus_event_log表里面
    this.bonusEventLogMapper.insert(
            BonusEventLog.builder()
                    .userId(id)
                    .value(bonus)
                    .event("CONTRIBUTE")
                    .createTime(new Date())
                    .description("投稿加积分")
                    .build()
    );
    log.info("积分添加完毕");
}

再调用即可

package com.itmuch.usercenter.rocketmq;

import com.itmuch.usercenter.dao.bonusEventLog.BonusEventLogMapper;
import com.itmuch.usercenter.dao.user.UserMapper;
import com.itmuch.usercenter.domain.dto.message.messaging.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.entity.bonusEventLog.BonusEventLog;
import com.itmuch.usercenter.domain.entity.user.User;
import com.itmuch.usercenter.service.user.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;

@Slf4j
@Service
public class AddBonusStreamConsumer {

    @Autowired
    private UserService userService;

    @StreamListener(Sink.INPUT)
    public void recevice(UserAddBonusMsgDTO userAddBonusMsgDTO) {
        this.userService.addBonus(userAddBonusMsgDTO);
    }
}







0人推荐
随时随地看视频
慕课网APP