猿问

超30min订单自动取消用rabbitmq怎么做跪求!

超30min订单自动取消,RabbitMQ做延迟队列,下单成功把订单号推入RabbitMQ,超30min订单自动进入DLX死信队列,消费端监听死信队列得到超时订单,订单状态置为超时,这个已完成,目前卡在30min内支付(不超时)时如何取消相应的订单?即如何在RabbitMQ中删除指定的消息?消息确认后会清除,怎样确认指定的消息?不知道我的思路有问题还是怎样,目前卡在这了,请各位大神指点一下
相关代码
下单
@Override
@Transactional(isolation=Isolation.READ_COMMITTED,rollbackFor=Exception.class)
publicResultorder(LongproductId,LonguserId){
//redis中商品名
StringproductNameKey="PRODUCT_NAME_"+productId;
//redis中商品库存
StringproductNumberKey="PRODUCT_"+productId;
//redis中已抢购成功的用户ID
StringuserIdKey="USER_"+productId;
StringproductName=redisUtil.get(productNameKey,0);
//缓存中有商品
if(NumberUtils.toInt(redisUtil.get(productNumberKey,0))>0){
//当前用户是否已抢购成功
if(!redisUtil.sismember(userIdKey,userId+"")){
//减缓存
redisUtil.decr(productNumberKey);
//减库存
booleanflag=productMapper.reduceRepertory(productId)==1;
if(flag){
//生成订单
Orderorder=newOrder();
//订单号
StringorderNo="ORDER_"+DATE_TIME_FORMATTER.format(LocalDateTime.now())+"_"+productId+"_"+userId;
order.setOrderNo(orderNo);
order.setOrderPrice(newBigDecimal(100));
order.setOrderStatus(1);
order.setUserId(userId);
save(order);
//建立订单商品关系
ProductOrderproductOrder=newProductOrder();
productOrder.setOrderId(order.getId());
productOrder.setProductId(productId);
productOrderMapper.insert(productOrder);
//缓存userId
redisUtil.sadd(userIdKey,userId+"");
//订单号orderNo加入MQ,30分钟未支付加入死信队列
messageProducer.sendMessage(orderNo,30*60*1000);
returnResult.success("下单成功,订单号"+order.getId());
}else{
Productproduct=productMapper.selectOne(newQueryWrapper().eq("id",productId));
//减库存失败,重置缓存
redisUtil.set(productNumberKey,JacksonUtil.toJson(product.getProductNumber()),0);
log.error("用户{}下单失败,商品名:{}",userId,productName);
returnResult.error("202","用户"+userId+"下单失败,商品名"+productName);
}
}else{
//该用户已抢到商品
log.error("您已抢到商品{},订单已生成,请去收银台支付",productName);
returnResult.error("203","您已抢到商品"+productName+",订单已生成,请去收银台支付");
}
}else{
//商品售罄
log.error("商品{}已被抢光",productName);
returnResult.error("201","商品"+productName+"已被抢光");
}
}
支付,这里如何删除RabbitMQ中指定的消息(下单时把订单都投入RabbitMQ里了)
@Override
publicResultpay(StringorderNo,LonguserId){
Orderorder=orderMapper.selectOne(newQueryWrapper().eq("order_no",orderNo).eq("user_id",userId));
order.setOrderStatus(2);//已支付
orderMapper.updateById(order);
//MQ中删除支付成功的订单TODO
returnResult.success("订单"+orderNo+"支付成功");
}
发消息
@Component
@Slf4j
publicclassMessageProducer{
@Resource
privateRabbitTemplaterabbitTemplate;
/**
*发送消息
*
*@parammessage消息
*@paramttl有效时间
*/
publicvoidsendMessage(Stringmessage,intttl){
CorrelationDatacorrelationId=newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TTL,RabbitConfig.ROUTINGKEY_TTL,message,message1->{
MessagePropertiesmessageProperties=message1.getMessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化
messageProperties.setExpiration(ttl+"");//消息TTL,单位毫秒
returnmessage1;
},correlationId);
log.info("发送消息:{},有效时间:{}秒,回调ID:{},当前时间:{}",message,ttl/1000,correlationId,LocalDateTime.now().toString());
}
}
rabbit配置,声明TTL交换器及对应队列,声明AE备份交换器及对应队列绑定到TTL交换器上,保证消息路由失败不丢失,声明DLX死信交换器及队列绑定到TTL交换器上,实现延迟效果
@Configuration
@Slf4j
publicclassRabbitConfig{
@Value("${spring.rabbitmq.addresses}")
privateStringaddresses;
@Value("${spring.rabbitmq.username}")
privateStringusername;
@Value("${spring.rabbitmq.password}")
privateStringpassword;
@Value("${spring.rabbitmq.virtual-host}")
privateStringvirtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
privateBooleanpublisherConfirms;
@Value("${spring.rabbitmq.publisher-returns}")
privateBooleanpublisherReturns;
/**
*备份交换器
*/
publicstaticfinalStringEXCHANGE_AE="EXCHANGE_AE";
/**
*TTL交换器
*/
publicstaticfinalStringEXCHANGE_TTL="EXCHANGE_TTL";
/**
*死信交换器
*/
publicstaticfinalStringEXCHANGE_DLX="EXCHANGE_DLX";
/**
*备份队列
*/
publicstaticfinalStringQUEUE_AE="QUEUE_AE";
/**
*TTL队列
*/
publicstaticfinalStringQUEUE_TTL="QUEUE_TTL";
/**
*死信队列
*/
publicstaticfinalStringQUEUE_DLX="QUEUE_DLX";
/**
*TTLRouting_Key
*特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
*/
publicstaticfinalStringROUTINGKEY_TTL="routing.ttl.#";
/**
*死信Routing_Key
*/
publicstaticfinalStringROUTINGKEY_DLX="routing.dlx";
@Resource
privatePublishConfirmpublishConfirm;
@Resource
privatePublishReturnCallBackpublishReturnCallBack;
@Bean
publicConnectionFactoryconnectionFactory(){
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//消息发送到RabbitMQ交换器后接收ack回调
connectionFactory.setPublisherConfirms(publisherConfirms);
//消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
connectionFactory.setPublisherReturns(publisherReturns);
returnconnectionFactory;
}
@Bean
//@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型
publicRabbitTemplaterabbitTemplate(){
RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory());
rabbitTemplate.setConfirmCallback(publishConfirm);
rabbitTemplate.setReturnCallback(publishReturnCallBack);
returnrabbitTemplate;
}
/**
*备份队列常用FanoutExchange
*/
@Bean
publicFanoutExchangealternateExchange(){
returnnewFanoutExchange(EXCHANGE_AE);
}
/**
*TTL队列,指定AE
*/
@Bean
publicTopicExchangettlExchange(){
Mapargs=newHashMap<>();
args.put("alternate-exchange",EXCHANGE_AE);
returnnewTopicExchange(EXCHANGE_TTL,true,false,args);
}
/**
*死信交换器
*/
@Bean
publicDirectExchangedeathLetterExchange(){
returnnewDirectExchange(EXCHANGE_DLX);
}
/**
*备份队列
*/
@Bean
publicQueuequeueAE(){
//returnnewQueue(QUEUE_AE,true);//队列持久
returnQueueBuilder.durable(QUEUE_AE).build();
}
/**
*TTL队列,指定DLX
*/
@Bean
publicQueuequeueTTL(){
Mapargs=newHashMap<>();
//x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,
args.put("x-dead-letter-exchange",EXCHANGE_DLX);
//x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。
args.put("x-dead-letter-routing-key",ROUTINGKEY_DLX);
//returnnewQueue(QUEUE_TTL,true,false,false,args);
returnQueueBuilder.durable(QUEUE_TTL).withArguments(args).build();
}
/**
*死信队列
*/
@Bean
publicQueuequeueDLX(){
//returnnewQueue(QUEUE_DLX,true);//队列持久
returnQueueBuilder.durable(QUEUE_DLX).build();
}
/**
*AE绑定队列
*/
@Bean
publicBindingbindingAE(){
returnBindingBuilder.bind(queueAE()).to(alternateExchange());
}
/**
*TTL绑定队列
*/
@Bean
publicBindingbindingTTL(){
returnBindingBuilder.bind(queueTTL()).to(ttlExchange()).with(ROUTINGKEY_TTL);
}
/**
*DLX绑定队列
*/
@Bean
publicBindingbindingDLX(){
returnBindingBuilder.bind(queueDLX()).to(deathLetterExchange()).with(ROUTINGKEY_DLX);
}
监听死信队列,获取超时订单(未实现,目前只是demo)
@RabbitHandler//声明接收方法
@RabbitListener(queues={RabbitConfig.QUEUE_DLX})
publicvoidprocessDLX(Messagemessage,Channelchannel){
Stringpayload=newString(message.getBody());
log.info("接收处理DLX队列当中的消息:{},当前时间:{}",payload,LocalDateTime.now().toString());
try{
//TODO通知MQ消息已被成功消费,可以ACK了
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch(IOExceptione){
//TODO如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
}
}
另外,百度上找到的一个项目RabbitTemplate有这样的配置(RabbitTemplate必须是prototype类型),其他人项目并没有这个配置,想知道到底怎样才算对,小白一个,望大神多多指点
@Bean
//@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型
publicRabbitTemplaterabbitTemplate(){
RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory());
rabbitTemplate.setConfirmCallback(publishConfirm);
rabbitTemplate.setReturnCallback(publishReturnCallBack);
returnrabbitTemplate;
}
慕仙森
浏览 395回答 2
2回答

慕哥9229398

感觉不需要删除吧。消费超时订单消息的方法内,判断一下这个订单的状态是已支付还是未支付,未支付-删除订单并恢复库存,ack=true,正常消费消息。

慕妹3242003

建议超时订单也不用死信队列,采用普通的队列处理即可;实际中单据真出现在死信队列中,无法分辨,存在二义性;可以考虑采用补偿法,而且生成订单和实际支付应该是分开的步骤,请考虑实际的业务场景在做相应的处理!希望我的回答能帮到你!
随时随地看视频慕课网APP

相关分类

JavaScript
我要回答