生产者:
写配置
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: output: producer: transactional: true #transactional/group实现stream事务 group: tx-add-bonus-group bindings: output: destination: add-bonus
在这段配置中rockerMQ中的bindings里面是开启stream的事务,而里面的group则是在
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {中定义的group,而在stream下面的bindings下定义的destination则是最后注册到MQ上面;
在事务监听器中,实现事务方法,这里只是编写执行本地事务,回查后续编写
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private ShareService shareService;
@Resource
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
MessageHeaders headers = message.getHeaders();
String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID).toString();
Integer share_id = Integer.valueOf(headers.get("share_id").toString());
//转换JSON字符串
String dtoString = (String) headers.get("dto");
ShareAuditDto dto = JSON.parseObject(dtoString, ShareAuditDto.class);
try {
//本地事务成功
this.shareService.auditByIdWithRocketMqLog(share_id, dto,transactionId);
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
//本地事务失败
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}修改shareService,对象以JSON字符串的形式发送
//添加事务
if (AuditStatusEnum.PASS.equals(shareAuditDto.getAuditStatusEnum())) {
//发送半消息
String transactionId = UUID.randomUUID().toString();
//以下为使用stream的情况
this.source.output().send(
MessageBuilder.withPayload(
UserAddBonusMsgDTO.builder()
.id(share.getUserId())
.bonus(50)
.build()
)
//header也有妙用
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
//这里吧DTO直接放到头里面,因为直接传对象会将这个对象直接toString,变成字符串穿进去,所以转换成JSON字符串
.setHeader("dto", JSON.toJSONString(shareAuditDto))
.setHeader("share_id", id)
.build()
);
}消费者:
在配置文件中接收到add-bonus即可
stream: rocketmq: binder: name-server: 127.0.0.1:9876 #找到borker bindings: input: destination: add-bonus group: test-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动 其他MQ:可留空
将业务代码搬迁至service并加上事务
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//当收到消息的时候执行的业务
//1、为用户加积分
Integer id = userAddBonusMsgDTO.getId();
Integer bonus = userAddBonusMsgDTO.getBonus();
User user = this.userMapper.selectByPrimaryKey(id);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
//2、记录日志到bonus_event_log表里面
this.bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(id)
.value(bonus)
.event("CONTRIBUTE")
.createTime(new Date())
.description("投稿加积分")
.build()
);
log.info("积分添加完毕");
}再调用即可
package com.itmuch.usercenter.rocketmq;
import com.itmuch.usercenter.dao.bonusEventLog.BonusEventLogMapper;
import com.itmuch.usercenter.dao.user.UserMapper;
import com.itmuch.usercenter.domain.dto.message.messaging.UserAddBonusMsgDTO;
import com.itmuch.usercenter.domain.entity.bonusEventLog.BonusEventLog;
import com.itmuch.usercenter.domain.entity.user.User;
import com.itmuch.usercenter.service.user.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Slf4j
@Service
public class AddBonusStreamConsumer {
@Autowired
private UserService userService;
@StreamListener(Sink.INPUT)
public void recevice(UserAddBonusMsgDTO userAddBonusMsgDTO) {
this.userService.addBonus(userAddBonusMsgDTO);
}
}