Go-micro rabbit mq 插件 - 优先发布消息

由于支持 RabbitMQ 版本 3.5.0 优先级队列 - https://www.rabbitmq.com/priority.html


如果在队列创建期间传递了 x-max-priority 参数,则可以声明队列。


我可以成功声明一个优先支持的队列


brkrSub := broker.NewSubscribeOptions(

        broker.DisableAutoAck(),

        rabbitmq.QueueArguments(map[string]interface{}{"x-max-priority": 10}),

    )


    service.Server().Subscribe(

        service.Server().NewSubscriber(

            "mytopic",

            h.Handle,

            server.SubscriberQueue("mytopic.hello"),

            server.SubscriberContext(brkrSub.Context),

        ),

    )

但是如何发布指定优先级的消息?


    body := &message.MyTestMessage{

        Message: fmt.Sprintf("Message number %d", counter),

    }


    msg := client.NewMessage(

        topic,

        body,

        // TODO: Priority

    )

    if err := client.Publish(ctx, msg); err != nil {

        fmt.Printf("Cannot publish message: ", err.Error())

        return

    }

我找不到将优先级作为 MessageOption 或 PublishOption 传递的直接方法,但是,似乎有一种方法可以在 client.Publish 上下文中指定其他选项。我是否正在寻找正确的方向,如果是这样,你能帮我一点吗?


编辑:我能够执行以下操作而不会导致任何编译时错误。尽管如此,优先级仍然被忽略,并且消息以通常的方式出现



func setPriority(ctx context.Context, priority int) client.PublishOption {

    return func(o *client.PublishOptions) {

        o.Context = context.WithValue(ctx, "priority", priority)

    }

}


func publish(ctx context.Context, priority int, counter int) {

    //body := fmt.Sprintf("hello, I am a message %d", counter)

    body := &message.MyTestMessage{

        Message: fmt.Sprintf("Message number %d", counter),

    }


    msg := client.NewMessage(

        topic,

        body,

    )

    if err := client.Publish(ctx, msg, setPriority(ctx, priority)); err != nil {

        fmt.Printf("Cannot publish message: ", err.Error())

        return

    }


    fmt.Printf("Published message %d to %s \n", counter, topic)

}


温温酱
浏览 165回答 2
2回答

芜湖不芜

尝试这样的事情:func publishMessageToChan(queue *amqp.Queue, channel *amqp.Channel, messageToQueue string) error {&nbsp; &nbsp; return channel.Publish(&nbsp; &nbsp; &nbsp; &nbsp; "<exchange>", // exchange&nbsp; &nbsp; &nbsp; &nbsp; "<queue>",&nbsp; &nbsp; // routing key&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; &nbsp; &nbsp; &nbsp; // mandatory&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; &nbsp; &nbsp; &nbsp; // immediate&nbsp; &nbsp; &nbsp; &nbsp; amqp.Publishing{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Timestamp:&nbsp; &nbsp;time.Now(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ContentType: "text/plain",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Body:&nbsp; &nbsp; &nbsp; &nbsp; []byte(messageToQueue),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Priority:&nbsp; &nbsp; 0, // <-- Priority here < 0 to 9>&nbsp; &nbsp; &nbsp; &nbsp; })}使用库“github.com/streadway/amqp”

慕田峪7331174

var brokerOpts broker.PublishOptionsrabbitmq.Priority(uint8(10))(&brokerOpts)event.Publish(ctx, payload, client.PublishContext(brokerOpts.Context))
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go