生产者:
写配置
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: output: producer: transactional: true #transactional/group实现stream事务 group: tx-add-bonus-group bindings: output: destination: add-bonus
在这段配置中rockerMQ中的bindings里面是开启stream的事务,而里面的group则是在
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group") public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
中定义的group,而在stream下面的bindings下定义的destination则是最后注册到MQ上面;
在事务监听器中,实现事务方法,这里只是编写执行本地事务,回查后续编写
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group") public class AddBonusTransactionListener implements RocketMQLocalTransactionListener { @Autowired private ShareService shareService; @Resource private RocketmqTransactionLogMapper rocketmqTransactionLogMapper; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { MessageHeaders headers = message.getHeaders(); String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID).toString(); Integer share_id = Integer.valueOf(headers.get("share_id").toString()); //转换JSON字符串 String dtoString = (String) headers.get("dto"); ShareAuditDto dto = JSON.parseObject(dtoString, ShareAuditDto.class); try { //本地事务成功 this.shareService.auditByIdWithRocketMqLog(share_id, dto,transactionId); return RocketMQLocalTransactionState.COMMIT; }catch (Exception e){ //本地事务失败 return RocketMQLocalTransactionState.ROLLBACK; } } }
修改shareService,对象以JSON字符串的形式发送
//添加事务 if (AuditStatusEnum.PASS.equals(shareAuditDto.getAuditStatusEnum())) { //发送半消息 String transactionId = UUID.randomUUID().toString(); //以下为使用stream的情况 this.source.output().send( MessageBuilder.withPayload( UserAddBonusMsgDTO.builder() .id(share.getUserId()) .bonus(50) .build() ) //header也有妙用 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) //这里吧DTO直接放到头里面,因为直接传对象会将这个对象直接toString,变成字符串穿进去,所以转换成JSON字符串 .setHeader("dto", JSON.toJSONString(shareAuditDto)) .setHeader("share_id", id) .build() ); }
消费者:
在配置文件中接收到add-bonus即可
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: input: destination: add-bonus group: test-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动 其他MQ:可留空
将业务代码搬迁至service并加上事务
@Transactional(rollbackFor = Exception.class) public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) { //当收到消息的时候执行的业务 //1、为用户加积分 Integer id = userAddBonusMsgDTO.getId(); Integer bonus = userAddBonusMsgDTO.getBonus(); User user = this.userMapper.selectByPrimaryKey(id); user.setBonus(user.getBonus() + bonus); this.userMapper.updateByPrimaryKeySelective(user); //2、记录日志到bonus_event_log表里面 this.bonusEventLogMapper.insert( BonusEventLog.builder() .userId(id) .value(bonus) .event("CONTRIBUTE") .createTime(new Date()) .description("投稿加积分") .build() ); log.info("积分添加完毕"); }
再调用即可
package com.itmuch.usercenter.rocketmq; import com.itmuch.usercenter.dao.bonusEventLog.BonusEventLogMapper; import com.itmuch.usercenter.dao.user.UserMapper; import com.itmuch.usercenter.domain.dto.message.messaging.UserAddBonusMsgDTO; import com.itmuch.usercenter.domain.entity.bonusEventLog.BonusEventLog; import com.itmuch.usercenter.domain.entity.user.User; import com.itmuch.usercenter.service.user.UserService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; @Slf4j @Service public class AddBonusStreamConsumer { @Autowired private UserService userService; @StreamListener(Sink.INPUT) public void recevice(UserAddBonusMsgDTO userAddBonusMsgDTO) { this.userService.addBonus(userAddBonusMsgDTO); } }