我想测试与rabbitmq服务器的重启连接。在写了小脚本来测试。 http://play.golang.org/p/l3ZWzG0Qqb 但它不起作用。
在步骤 10 中,我关闭了通道和连接。并再次打开它们。并重新创建 chan amqp.Confirmation ( :75) 。并继续循环。但在那之后,从陈确认没有任何回报。
UPD:代码在这里。
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"os"
"time"
)
const SERVER = "amqp://user:pass@localhost:5672/"
const EXCHANGE_NAME = "publisher.test.1"
const EXCHANGE_TYPE = "direct"
const ROUTING_KEY = "publisher.test"
var Connection *amqp.Connection
var Channel *amqp.Channel
func setup(url string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
}
ch, err := conn.Channel()
if err != nil {
return nil, nil, err
}
return conn, ch, nil
}
func main() {
url := SERVER
Connection, Channel, err := setup(url)
if err != nil {
fmt.Println("err publisher setup:", err)
return
}
confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1))
if err := Channel.Confirm(false); err != nil {
log.Fatalf("confirm.select destination: %s", err)
}
for i := 1; i <= 3000000; i++ {
log.Println(i)
if err != nil {
fmt.Println("err consume:", err)
return
}
if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{
Body: []byte(fmt.Sprintf("%d", i)),
}); err != nil {
fmt.Println("err publish:", err)
log.Printf("%+v", err)
os.Exit(1)
return
}
// only ack the source delivery when the destination acks the publishing
confirmed := <-confirms
if confirmed.Ack {
log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
// TODO. Reconnect will be here
}
翻过高山走不出你
相关分类