RabbitMQ:如何将失败的消息从一个队列移动到另一个队列?

我有两个队列:

https://img1.mukewang.com/651926eb0001939423480486.jpg

当我运行时同样可见rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate -u admin -p admin:


我得到:


+-------+-------------------------+-------------------------+----------+------------------------------------+

| vhost |          name           |          node           | messages | message_stats.publish_details.rate |

+-------+-------------------------+-------------------------+----------+------------------------------------+

| /     | high_priority           | rabbit@server-rabbitmq  | 5        | 0.0                                |

| /     | high_priority_secondary | rabbit@server-rabbitmq  | 0        | 0.0                                |

+-------+-------------------------+-------------------------+----------+------------------------------------+

我的交流(rabbitmqadmin -V / list exchanges -u admin -p admin)如下:


+-------------------------+---------+

|          name           |  type   |

+-------------------------+---------+

|                         | direct  |

| amq.direct              | direct  |

| amq.fanout              | fanout  |

| amq.headers             | headers |

| amq.match               | headers |

| amq.rabbitmq.trace      | topic   |

| amq.topic               | topic   |

| high_priority           | direct  |

| high_priority_secondary | direct  |

| low_priority            | direct  |

+-------------------------+---------+

队列和整个相关逻辑是在 PHP / Symfony 中实现的,但是我想通过在终端中使用rabbitmqadmin或命令来使用本机逻辑(如果可能) 。rabbitmqctl


如果消息失败high_priority,我希望 RabbitMQ 自动将其移至队列,high_priority_secondary而无需任何 PHP 参与。这可能吗?我已经开始阅读有关死信交换的内容,但我不知道如何解决这个问题。


我已经为辅助队列创建了一个消费者,因此一旦消息移动到那里,它就会被处理。


仅在 CLI 中可以实现此目的吗?


仅供参考:SO 上有一些建议的帖子已经涵盖了这个问题,但没有一个解决方案是纯粹的 CLI 解决方案。


森栏
浏览 168回答 2
2回答

largeQ

好吧,虽然我不必修改任何 PHP 代码,但我确实必须更改yaml框架级别的配置,因为我希望我的解决方案得以保留并成为代码库的一部分。在你的app/config/services/rabbitmq.yaml:定义生产者:high_priority:    connection: default    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer    exchange_options:        name: 'high_priority'        type: directhigh_priority_secondary:    connection: default    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer    exchange_options:        name: 'high_priority_secondary'        type: directmessage_hospital:    connection: default    class: Foo\Infrastructure\RabbitMQ\SuppressedProducer    exchange_options:        name: 'message_hospital'        type: direct定义消费者:high_priority:    connection: default    exchange_options:        name: 'high_priority'        type: direct    queue_options:        name: 'high_priority'        arguments:            x-dead-letter-exchange: ['S', 'high_priority_secondary']    qos_options:        prefetch_size: 0        prefetch_count: 1        global: false    callback: foo.task_bus.consumerhigh_priority_secondary:    connection: default    exchange_options:        name: 'high_priority_secondary'        type: direct    queue_options:        name: 'high_priority_secondary'        arguments:            x-dead-letter-exchange: ['S', 'message_hospital']    qos_options:        prefetch_size: 0        prefetch_count: 1        global: false    callback: foo.task_bus.consumermessage_hospital:    connection: default    exchange_options:        name: 'message_hospital'        type: direct    queue_options:        name: 'message_hospital'    qos_options:        prefetch_size: 0        prefetch_count: 1        global: false    callback: foo.task_bus.consumer现在队列看起来像:由于 DLX 属性,消息一旦在之前的消息中失败,就会立即进入医院队列。

RISEBY

high_priority_secondary队列应该绑定到high_priority_secondary交换器。 high_priority队列应该绑定到high_priority交换并且应该用 声明x-dead-letter-exchange = high_priority_secondary。因此队列应该用死信交换来声明。要测试这一点,只需在从队列中使用消息时通过重新排队拒绝该消息即可high_priority。
打开App,查看更多内容
随时随地看视频慕课网APP