手记

rocketMq(3)分布式事务

当处理数据库插入后,往rocketmq发送消息,再将一部分数据存入缓存,当存入缓存这一步出了问题,可是消息却已经发送了,这样普通的事务

@Transactional(rollbackFor = Exception.class)

就无法完成事务的回滚了,需要用到rocketmq的分布式事务



1、生产者会先向MQ Server发送一条半消息,打了标记(commit/rollback),半消息不会向消费者发送或被消费者获取;

//添加事务
if (AuditStatusEnum.PASS.equals(shareAuditDto.getAuditStatusEnum())) {
    //发送半消息
    String transactionId = UUID.randomUUID().toString();
    this.rocketMQTemplate.sendMessageInTransaction(
            "tx-add-bonus-group",
            "add-bonus",
            MessageBuilder.withPayload(
                    UserAddBonusMsgDTO.builder()
                            .id(share.getUserId())
                            .bonus(50)
                            .build()
            )
                    //header也有妙用
                    .setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
                    .setHeader("share_id",id)
                    .build(),
            //这里有大用处
            shareAuditDto

    );
}


2-3、当半消息发送成功之后,生产者会去执行本地事务 ;

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id,ShareAuditDto shareAuditDto){
    Share share = Share.builder()
            .id(id)
            .auditStatus(shareAuditDto.getAuditStatusEnum().toString())
            .reason(shareAuditDto.getReason())
            .build();
    this.shareMapper.updateByPrimaryKeySelective(share);
}

    在发送半消息后,我们写一个监听器,负责用于执行本地事务和第4步

    **txProducerGroup = "tx-add-bonus-group"是半消息当中的txProducerGroup

    **Message message 是半消息当中的消息体

    ** Object o 是半消息当中最后一个参数

package com.itmuch.usercenter.rocketmq;

import com.itmuch.usercenter.dao.rocketmqTransactionLog.RocketmqTransactionLogMapper;
import com.itmuch.usercenter.domain.dto.ShareAuditDto;
import com.itmuch.usercenter.domain.entity.rocketmqTransactionLog.RocketmqTransactionLog;
import com.itmuch.usercenter.service.content.ShareService;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

import javax.annotation.Resource;

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private ShareService shareService;

    @Resource
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override//用来执行本地事务
    public RocketMQLocalTransactionState executeLocalTransaction(Message message,Object o) {
        //回去shareService
        //Message message -> MessageBuilder.withPayload(
        //                            UserAddBonusMsgDTO.builder()
        //                                    .id(share.getUserId())
        //                                    .bonus(50)
        //                                    .build()
        //                    )
        //Object o -> shareAuditDto

        MessageHeaders headers = message.getHeaders();

        String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID).toString();
        Integer share_id = Integer.valueOf(headers.get("share_id").toString());

        try {
            //本地事务成功
            this.shareService.auditByIdWithRocketMqLog(share_id, (ShareAuditDto) o,transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            //本地事务失败
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override//用于回查本地事务
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID).toString();

        //查找rocketmq_transaction_log表中ID为transactionId的数据
        RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build()
        );

        if (rocketmqTransactionLog == null){
            //找不到这个日志,证明本地事务没成功呢
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            //找到该日志,本地事务已成功,只是二次确认没返回,现在这里返回
            return RocketMQLocalTransactionState.COMMIT;
        }
    }
}

4、生产者根据本地事务的执行状态(成功与否)会向MQ Server发送二次确认请求,如果执行状态成功,则半消息会转状态commit,向消费者投递消息,否则rollback将消息丢弃;

5、如果第4步的二次确认没发送成功到MQ Server,消息状态改变为UNKNOWN,MQ Server会过一段时间会回查(这里有一条半消息为什么一直没有二次确认?)生产者;

        **二次确认没收到,可能是二次确认准备发送的一瞬间断网啥的,中断这个操作。

6-7、生产者接到回查后会去本地事务进行确认事务信息,将结果状态返回给MQ Server中的半消息状态,确定该消息是commit还是rollback;

总而言之:生产者先向MQ Server发送一条消息,但是打了标记不然消费者进行消费,待生产者完成本地事务后再确定让不让消费者消费该信息;







0人推荐
随时随地看视频
慕课网APP