RabbitMQ 队列长度始终为 0

我正在编写一个应用程序,我遇到了这个问题,一遍又一遍地查看代码,似乎没有任何问题,用下面的基本代码片段测试,问题是可重现的...... RabbitMQ 说队列总是空的不是。


下面的 Golang 代码片段显示了生产者发送消息的频率高于消费者消费消息的频率。消费者始终处于活动状态,但睡眠时间更长,以使队列在其积压中具有消息。结果?消费者每次尝试都会获取消息,但是 API 总是说没有消息 -> 消息计数为 0。


package main


import (

    "encoding/json"

    "fmt"

    "github.com/streadway/amqp"

    "io/ioutil"

    "net/http"

    "testing"

    "time"

)

func main() {


    username := "guest"

    password := "guest"

    scheme := "amqp"

    rabbitMqHost := "localhost"

    port := "5672"


    connectionString := fmt.Sprintf("%s://%s:%s@%s:%s/", scheme, username, password, rabbitMqHost, port)


    conn, err := amqp.Dial(connectionString)


    if err != nil {

        panic(err)

    }


    ch, err := conn.Channel()

    if err != nil {

        panic(err)

    }


    exchangeName := "my-exchange"

    // Declare exchange

    err = ch.ExchangeDeclare(

        exchangeName, // name

        "fanout",     // type

        true,         // durable

        true,         // auto-deleted

        false,        // internal

        false,        // no-wait

        nil,          // arguments

    )


    if err != nil {

        panic(err)

    }


    // Create first Queue

    queueName := "my-queue"

    q, err := ch.QueueDeclare(

        queueName, // name

        true,      // durable

        true,      // delete when unsused

        false,     // exclusive

        false,     // no-wait

        nil,       // arguments

    )


    if err != nil {

        panic(err)

    }


    // Bind Exchange to Queue

    err = ch.QueueBind(

        q.Name,       // queue name

        "",           // routing key

        exchangeName, // exchange

        false,

        nil,

    )


    // Listen

    eventQueue, err := ch.Consume(

        q.Name, // queue

        "",     // consumer

        true,   // auto-ack

        false,  // exclusive

        false,  // no-local

        false,  // no-wait

        nil,    // args

    )


    if err != nil {

        panic(err)

    }


}

您可以使用以下 RabbitMQ 服务器进行测试:


docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management


万千封印
浏览 189回答 2
2回答

四季花海

该字段q2.Messages不可靠,它是未等待确认的消息的计数,即已确认的消息。你的消费者被声明为autoAck = true——即noAck——,这意味着不需要确认,这意味着已经确认了零个消息。当您注释掉消费者时,确认消息的数量可能取决于发布者缓冲区。使用 AMQP 0.9.1 以编程方式在给定队列上获取精确数量的消息基本上是不可能的。您可以改用message_stats管理 API 中的字段:http://localhost:15672/api/queues/vhost/queue_name

汪汪一只猫

接受的解决方案将是黑绿色的。证明是下面的替换,只需将问题部分中的消费者和发布者代码替换为:// Listen&nbsp; &nbsp; eventQueue, err := ch.Consume(&nbsp; &nbsp; &nbsp; &nbsp; q.Name, // queue&nbsp; &nbsp; &nbsp; &nbsp; "",&nbsp; &nbsp; &nbsp;// consumer&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; // auto-ack <-- Difference&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; // exclusive&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; // no-local&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; // no-wait&nbsp; &nbsp; &nbsp; &nbsp; nil,&nbsp; &nbsp; // args&nbsp; &nbsp; )&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; panic(err)&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for a := range eventQueue {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err = ch.Ack(a.DeliveryTag, false) // <-- Difference&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; panic(err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Received Event %s\n", string(a.Body))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(time.Second * 4)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; count := 0&nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ContentType: "application/json",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Body:&nbsp; &nbsp; &nbsp; &nbsp; []byte(fmt.Sprintf("Message %d", count)),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("Sent Message %d\n", count)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; panic(err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if count >= 20 { // <-- Difference&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(time.Second * 2)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()输出:.... The increase in the queue lengthSent Message 13Queue Len: 8.000000 - 0Queue Len: 8.000000 - 0Received Event Message 4Sent Message 14Queue Len: 8.000000 - 0Queue Len: 9.000000 - 0Sent Message 15Queue Len: 9.000000 - 0Queue Len: 9.000000 - 0Received Event Message 5Sent Message 16Queue Len: 9.000000 - 0Queue Len: 9.000000 - 0Sent Message 17Queue Len: 11.000000 - 0Queue Len: 11.000000 - 0Received Event Message 6Sent Message 18Queue Len: 11.000000 - 0Queue Len: 11.000000 - 0Sent Message 19Queue Len: 11.000000 - 0Queue Len: 12.000000 - 0Received Event Message 7Queue Len: 12.000000 - 0Queue Len: 12.000000 - 0Queue Len: 12.000000 - 0Queue Len: 12.000000 - 0Received Event Message 8Queue Len: 12.000000 - 0Queue Len: 12.000000 - 0Queue Len: 12.000000 - 0Queue Len: 12.000000 - 0Received Event Message 9Queue Len: 12.000000 - 0Queue Len: 11.000000 - 0Queue Len: 11.000000 - 0Queue Len: 11.000000 - 0Received Event Message 10Queue Len: 11.000000 - 0Queue Len: 11.000000 - 0Queue Len: 10.000000 - 0Queue Len: 10.000000 - 0Received Event Message 11Queue Len: 10.000000 - 0Queue Len: 10.000000 - 0Queue Len: 10.000000 - 0Queue Len: 9.000000 - 0Received Event Message 12....As publisher exits it decreases, the consumer catches up and message len decreases:Received Event Message 16Queue Len: 5.000000 - 0Queue Len: 5.000000 - 0Queue Len: 5.000000 - 0Queue Len: 4.000000 - 0Received Event Message 17Queue Len: 4.000000 - 0Queue Len: 4.000000 - 0Queue Len: 4.000000 - 0Queue Len: 4.000000 - 0Received Event Message 18Queue Len: 2.000000 - 0Queue Len: 2.000000 - 0Queue Len: 2.000000 - 0Queue Len: 2.000000 - 0Received Event Message 19Queue Len: 2.000000 - 0Queue Len: 1.000000 - 0
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go