当处理数据库插入后,往rocketmq发送消息,再将一部分数据存入缓存,当存入缓存这一步出了问题,可是消息却已经发送了,这样普通的事务
@Transactional(rollbackFor = Exception.class)
就无法完成事务的回滚了,需要用到rocketmq的分布式事务
1、生产者会先向MQ Server发送一条半消息,打了标记(commit/rollback),半消息不会向消费者发送或被消费者获取;
//添加事务
if (AuditStatusEnum.PASS.equals(shareAuditDto.getAuditStatusEnum())) {
//发送半消息
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder.withPayload(
UserAddBonusMsgDTO.builder()
.id(share.getUserId())
.bonus(50)
.build()
)
//header也有妙用
.setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
.setHeader("share_id",id)
.build(),
//这里有大用处
shareAuditDto
);
}2-3、当半消息发送成功之后,生产者会去执行本地事务 ;
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id,ShareAuditDto shareAuditDto){
Share share = Share.builder()
.id(id)
.auditStatus(shareAuditDto.getAuditStatusEnum().toString())
.reason(shareAuditDto.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
}在发送半消息后,我们写一个监听器,负责用于执行本地事务和第4步
**txProducerGroup = "tx-add-bonus-group"是半消息当中的txProducerGroup
**Message message 是半消息当中的消息体
** Object o 是半消息当中最后一个参数
package com.itmuch.usercenter.rocketmq;
import com.itmuch.usercenter.dao.rocketmqTransactionLog.RocketmqTransactionLogMapper;
import com.itmuch.usercenter.domain.dto.ShareAuditDto;
import com.itmuch.usercenter.domain.entity.rocketmqTransactionLog.RocketmqTransactionLog;
import com.itmuch.usercenter.service.content.ShareService;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import javax.annotation.Resource;
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private ShareService shareService;
@Resource
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override//用来执行本地事务
public RocketMQLocalTransactionState executeLocalTransaction(Message message,Object o) {
//回去shareService
//Message message -> MessageBuilder.withPayload(
// UserAddBonusMsgDTO.builder()
// .id(share.getUserId())
// .bonus(50)
// .build()
// )
//Object o -> shareAuditDto
MessageHeaders headers = message.getHeaders();
String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID).toString();
Integer share_id = Integer.valueOf(headers.get("share_id").toString());
try {
//本地事务成功
this.shareService.auditByIdWithRocketMqLog(share_id, (ShareAuditDto) o,transactionId);
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
//本地事务失败
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override//用于回查本地事务
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID).toString();
//查找rocketmq_transaction_log表中ID为transactionId的数据
RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
if (rocketmqTransactionLog == null){
//找不到这个日志,证明本地事务没成功呢
return RocketMQLocalTransactionState.ROLLBACK;
}else{
//找到该日志,本地事务已成功,只是二次确认没返回,现在这里返回
return RocketMQLocalTransactionState.COMMIT;
}
}
}4、生产者根据本地事务的执行状态(成功与否)会向MQ Server发送二次确认请求,如果执行状态成功,则半消息会转状态commit,向消费者投递消息,否则rollback将消息丢弃;
5、如果第4步的二次确认没发送成功到MQ Server,消息状态改变为UNKNOWN,MQ Server会过一段时间会回查(这里有一条半消息为什么一直没有二次确认?)生产者;
**二次确认没收到,可能是二次确认准备发送的一瞬间断网啥的,中断这个操作。
6-7、生产者接到回查后会去本地事务进行确认事务信息,将结果状态返回给MQ Server中的半消息状态,确定该消息是commit还是rollback;
总而言之:生产者先向MQ Server发送一条消息,但是打了标记不然消费者进行消费,待生产者完成本地事务后再确定让不让消费者消费该信息;