猿问

Mosquitto 订阅者收到额外消息

我正在探索 mosquitto 作为一个完整的初学者。我有一个 golang 测试程序,我正在使用它来查看它是如何管理消息的,但是该程序的后续运行显示收到了额外的消息。


import (

    mqtt "github.com/eclipse/paho.mqtt.golang"

    log "github.com/sirupsen/logrus"

    "strconv"

    "time"

)

var sent int

var received int


func Test() {

    sent  = 0

    received = 0

    const TOPIC = "mytopic/test"


    opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883").SetClientID("client1").SetResumeSubs(false).SetAutoReconnect(false).SetCleanSession(true)


    client1 := mqtt.NewClient(opts)

    if token := client1.Connect(); token.Wait() && token.Error() != nil {

        panic(token.Error())

    }

    if token := client1.Subscribe(TOPIC, 2, receiver1); token.Wait() && token.Error() != nil {

        panic(token.Error())

    }


    defer howManySent()

    defer howManyReceived()

    defer client1.Disconnect(1)


    for index:=0;index<3;index++ {

        sendMsg := "client1 message " + strconv.Itoa(index)

        if token := client1.Publish(TOPIC, 2, true, sendMsg); token.Wait() && token.Error() != nil {

            panic(token.Error())

        } else {

            sent++

        }

        time.Sleep(1 * time.Second)

    }

    time.Sleep(1*time.Second)

}


func howManyReceived() {

    log.Warn("RECEIVED:", received)

}


func howManySent() {

    log.Warn("SENT:", sent)

}


func receiver1(client mqtt.Client, msg mqtt.Message) {

    received++

    msg.Ack()

    output := "message id:" + strconv.Itoa(int(msg.MessageID())) + " message = " + string(msg.Payload())

    log.Warn(output)

}

第一次运行的输出:


WARN[0000] message id:1 message = client1 message 0     

WARN[0001] message id:2 message = client1 message 1     

WARN[0002] message id:3 message = client1 message 2     

WARN[0004] RECEIVED:3                                   

WARN[0004] SENT:3      


我究竟做错了什么?


largeQ
浏览 197回答 1
1回答

白衣非少年

发布定义如下:Publish(topic&nbsp;string,&nbsp;qos&nbsp;byte,&nbsp;retained&nbsp;bool,&nbsp;payload&nbsp;interface{})&nbsp;Token因此,当您打电话时,token := client1.Publish(TOPIC, 2, true, sendMsg)您正在发布一条retained设置为 true 的消息。Retain是与消息一起传递给代理的标志,它意味着:如果 RETAIN 标志设置为 1,则在客户端发送给服务器的 PUBLISH 数据包中,服务器必须存储应用程序消息及其 QoS,以便可以将其传递给订阅匹配其主题名称的未来订阅者 [MQTT- 3.3.1-5]。建立新订阅时,必须将每个匹配主题名称上的最后保留消息(如果有)发送给订阅者 [MQTT-3.3.1-6]。每条带有retainset 的新消息都会替换任何以前的消息,因此当您的第一次执行完成时,消息2会被保留。这会在客户端重新连接时发送给客户端(意味着您看到的结果是预期的)。如果您更改对它的调用,token := client1.Publish(TOPIC, 2, false, sendMsg)它将按您预期的方式运行。mosquitto 文档对标志说如下persistence:如果为 true,则连接、订阅和消息数据将写入 mosquitto.db 中由 persistence_location 指定的位置的磁盘。当 mosquitto 重启时,它会重新加载 mosquitto.db 中存储的信息所以这只有在你重启 Mosquitto 时才会产生影响;即使此标志设置为 ,会话信息仍将存储在内存中false。
随时随地看视频慕课网APP

相关分类

Go
我要回答