猿问

在 RabbitMQ 和 Spring 中是否可以选择创建只接受高优先级消息的消费者?

我正在创建一个应用程序,该应用程序使用 RabbitMQ 向消费者发送消息以进行耗时的处理。但是,我需要对消息进行优先级排序。当高优先级的消息到达时,即使所有消费者实例都在处理其他消息,也必须处理该消息。


AFAIK 在 Spring Boot 和 RabbitMQ 中不可能抢占处理低优先级消息并切换到处理高优先级消息。


是否可以创建仅接受高优先级消息的消费者,或者在所有其他繁忙且高优先级消息到达时动态运行额外的消费者组?


我尝试使用 x-max-priority=10 标志添加队列并增加消费者数量,但这并不能解决我的问题。


想象一下,我们运行 50 个消费者并发送 50 条低优先级消息。当执行耗时的处理时,一条新消息以高优先级到达,但无法立即处理,因为所有 50 个消费者都忙。


配置中有一部分设置了消费者的数量


@Bean

public SimpleRabbitListenerContainerFactory

rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,

                               @Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {

  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

  configurer.configure(factory, connectionFactory);

  factory.setConcurrentConsumers(50);

  factory.setMaxConcurrentConsumers(100);

  return factory;

}

有没有办法创建一组接受消息高优先级消息(例如高于0)的消费者或为高优先级消息动态创建消费者?


开满天机
浏览 157回答 2
2回答

手掌心

我不知道实施您所描述的先发制人策略的方法,但是您可以考虑许多其他选择。优先级设定首先要考虑的是priorityRabbitMQ 本身的支持。考虑一下 Gavin M. Roy 的《RabbitMQ in Depth》中的摘录:“从 RabbitMQ 3.5.0 开始,优先级字段已按照 AMQP 规范实现。它被定义为一个整数,可能的值为 0 到 9,用于队列中的消息优先级排序。根据规定,如果发布了优先级为 9 的消息,随后又发布了优先级为 0 的消息,则新连接的消费者将先于优先级为 9 的消息接收到优先级为 0 的消息。例如rabbitTemplate.convertAndSend("Hello World!", message -> {  MessageProperties properties = MessagePropertiesBuilder.newInstance()                                                         .setPriority(0)                                                         .build();  return MessageBuilder.fromMessage(message)                       .andProperties(properties)                       .build();});基于优先级的交换第二种选择是定义主题交换并定义考虑您的优先级的路由键。例如,考虑events使用模式的路由密钥进行交换,EventName.Priority例如OrderPlaced.High、OrderPlaced.Normal或OrderPlaced.Low。基于此,您可以有一个队列仅绑定到高优先级的订单,即OrderPlaced.High和多个专用于该队列的消费者。例如String routingKey = String.format("%s.%s", event.name(), event.priority());rabbit.convertAndSend(routingKey, event);使用如下所示的侦听器,其中队列使用路由键high-priority-orders绑定到events事件OrderPlaced和优先级的交换器。HighOrderPlaced.High@Component@RabbitListener(queues = "high-priority-orders", containerFactory="orders")public class HighPriorityOrdersListener { @RabbitHandler public void onOrderPlaced(OrderPlacedEvent orderPlaced) {   //... }}显然,您将需要一个专用的线程池(在orders上面的容器工厂中)来处理高优先级请求。

慕少森

AMQP 协议中没有从队列中“选择”消息的机制。您可能需要考虑使用具有专用消费者的离散队列。顺便说一句,这与 spring 无关;有关 RabbitMQ 的一般问题应直接咨询rabbitmq-users Google 群组。
随时随地看视频慕课网APP

相关分类

Java
我要回答