猿问

让 go 例程等待 rabbitMQ 发送结果

我是 Go 的新手,我想制作一个管道来翻译我收到的每个请求,方法是将它发送到第一个队列 (TEST),然后从最后一个队列 (RESULT) 中获取最终结果并将其作为响应发回。


我面临的问题是,响应永远不会等到所有结果从队列中返回。这是代码:


func main() {

    requests := []int{3, 4, 5, 6, 7}

    var wg sync.WaitGroup

    wg.Add(1)

    resArr := []string{}

    go func() {

        for _, r := range requests {

            rabbitSend("TEST", r)

            resArr = append(resArr, <-rabbitReceive("RESULT"))

        }

        defer wg.Done()

    }()

    wg.Wait()


    log.Println("Result", resArr)

}

兔子发送方法:


func rabbitSend(queueName string, msg int) {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

    failOnError(err, "Failed to connect to RabbitMQ")

    defer conn.Close()


    ch, err := conn.Channel()

    failOnError(err, "Failed to open a channel")

    defer ch.Close()


    q, err := ch.QueueDeclare(

        queueName, // name

        true,      // durable

        false,     // delete when unused

        false,     // exclusive

        false,     // no-wait

        nil,       // arguments

    )

    failOnError(err, "Failed to declare a queue")


    body, _ := json.Marshal(msg)

    err = ch.Publish(

        "",     // exchange

        q.Name, // routing key

        false,  // mandatory

        false,  // immediate

        amqp.Publishing{

            ContentType: "application/json",

            Body:        []byte(body),

        })

    log.Printf("[x] Sent %s to %s", body, q.Name)

    failOnError(err, "Failed to publish a message")

}

兔子接收方法:


func rabbitReceive(queueName string) <-chan string {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

    failOnError(err, "Failed to connect to RabbitMQ")

    defer conn.Close()


    ch, err := conn.Channel()

    failOnError(err, "Failed to open a channel")

    defer ch.Close()


    q, err := ch.QueueDeclare(

        queueName, // name

        true,      // durable

        false,     // delete when usused

        false,     // exclusive

        false,     // no-waits

        nil,       // arguments

    )

噜噜哒
浏览 114回答 2
2回答

侃侃尔雅

修改你的func rabbitReceive(queueName string) <-chan string如下:&nbsp;func rabbitReceive(queueName string) <-chan string {&nbsp; &nbsp; conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")&nbsp; &nbsp; failOnError(err, "Failed to connect to RabbitMQ")&nbsp; &nbsp; ch, err := conn.Channel()&nbsp; &nbsp; failOnError(err, "Failed to open a channel")&nbsp; &nbsp; q, err := ch.QueueDeclare(&nbsp; &nbsp; &nbsp; &nbsp; queueName, // name&nbsp; &nbsp; &nbsp; &nbsp; true,&nbsp; &nbsp; &nbsp; // durable&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; &nbsp; &nbsp;// delete when usused&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; &nbsp; &nbsp;// exclusive&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; &nbsp; &nbsp;// no-waits&nbsp; &nbsp; &nbsp; &nbsp; nil,&nbsp; &nbsp; &nbsp; &nbsp;// arguments&nbsp; &nbsp; )&nbsp; &nbsp; failOnError(err, "Failed to declare a queue")&nbsp; &nbsp; msgs, err := ch.Consume(&nbsp; &nbsp; &nbsp; &nbsp; q.Name, // queue&nbsp; &nbsp; &nbsp; &nbsp; "",&nbsp; &nbsp; &nbsp;// consumer&nbsp; &nbsp; &nbsp; &nbsp; true,&nbsp; &nbsp;// auto-ack&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; // exclusive&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; // no-local&nbsp; &nbsp; &nbsp; &nbsp; false,&nbsp; // no-wait&nbsp; &nbsp; &nbsp; &nbsp; nil,&nbsp; &nbsp; // args&nbsp; &nbsp; )&nbsp; &nbsp; failOnError(err, "Failed to register a consumer")&nbsp; &nbsp; resCh := make(chan string)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; d := <-msgs&nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Received a message: %v from %v", string(d.Body), q.Name)&nbsp; &nbsp; &nbsp; &nbsp; resCh <- string(d.Body)&nbsp; &nbsp; &nbsp; &nbsp; conn.Close()&nbsp; &nbsp; &nbsp; &nbsp; ch.Close()&nbsp; &nbsp; &nbsp; &nbsp; close(resCh)&nbsp; &nbsp; }()&nbsp; &nbsp; return resCh}以前的代码导致您出现问题的原因是defer ch.Close(). ch在将响应写入之前关闭resCh。

慕哥6287543

跟进 @nightfury1204 很好的答案,你确实ch在写信给resCh. 只有一件事,在 go 例程中你想遍历所有消息,所以更好的方法是:go func() {&nbsp; &nbsp; &nbsp; &nbsp; for d := range msgs {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Received a message: %v from %v", string(d.Body), q.Name)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; resCh <- string(d.Body)&nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; conn.Close()&nbsp; &nbsp; &nbsp; &nbsp; ch.Close()&nbsp; &nbsp; &nbsp; &nbsp; close(resCh)&nbsp; &nbsp; }()
随时随地看视频慕课网APP

相关分类

Go
我要回答