猿问

Golang-Paho MQTT 订阅者在重新初始化订阅者后不断断开连接并出现错误 EOF

我正在尝试动态更改 mqtt 客户端处理程序和证书,当订阅者和发布者连接时,这会导致订阅者 EOF


这就是我正在尝试做的,


1] 我正在初始化订阅者/发布者(使用 firstPubHandler、firstConnectHandler 和默认证书)


2] 使用发布者在服务器上发送注册消息以获取新证书详细信息


3] 服务器将返回证书详细信息,该响应将由 firstConnectHandler 在主题.../id/Certificate上处理以下载证书。


4] firstPubHandler 将处理服务器的响应并重新初始化发布者/订阅者(使用 messagePubHandler、connectHandler 和新下载的证书),connectHandler 将侦听所有主题/id/+


一切正常,除了当我重新初始化订阅者/发布者时,订阅者不断断开连接并出现错误“EOF”


我在这里做错什么了吗?或者有没有更好的方法来做到这一点?任何帮助表示赞赏


- 主功能


var opt Params

var publisher mqtt.Client

var subscriber mqtt.Client


func main() {

    InitializeBroker(firstPubHandler, firstConnectHandler)


    //Ultimately it will trigger message on ".../id/Certificate" topic which will be handled byfirstConnectHandler 

    PublishRegistrationMessage(publisher)


    c := make(chan os.Signal, 1)

    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    done := make(chan os.Signal, 1)

    signal.Notify(done, os.Interrupt, syscall.SIGTERM)

    go func() {

        for {

        }

    }()

    <-done

    <-c

    DisconnectBrocker()

}

-- 处理程序


// First handlers

var firstPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {

    DownloadCertificates(msg.Payload())

    InitializeBroker(messagePubHandler, connectHandler)

}


var firstConnectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {

    if token := c.Subscribe(opt.SubClientId+"/id/Certificate", 0, firstPubHandler); token.Wait() && token.Error() != nil {

        log.Error(token.Error())

    }

}


// Second handlers

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {

    ProcessMessage(msg.Payload())

}


var connectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {

    if token := c.Subscribe(opt.SubClientId+"/id/+", 0, messagePubHandler); token.Wait() && token.Error() != nil {

        log.Error(token.Error())

    }

}


// Common handler

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {

    log.Info(err)

}


肥皂起泡泡
浏览 927回答 1
1回答

摇曳的蔷薇

基于对您的代码的快速审查,这就是我认为正在发生的事情(因为您没有提供所有代码,所以涉及一些猜测):main()调用InitializeBroker创建两个到代理的连接。默认发布处理程序设置为firstPubHandler并在OnConnect您订阅的处理程序中SubClientId+"/id/Certificate当接收到消息 ( ) 时,您从消息中获取证书并使用它与使用相同客户端 ID 但不同/default 发布处理程序firstPubHandler的代理建立一组新连接。OnConnect因此,在第 2 点之后,您实际上有两组独立的代理连接(总共 4 个连接)。但是MQTT-3.1.4-2(参见规范)指出:如果 ClientId 代表一个已经连接到服务器的客户端,那么服务器必须断开现有的客户端。因此,当建立第二组连接时,代理将丢弃第一组连接。这是您看到的“EOF”断开连接。第二组连接仍将启动。当您connectLostHandler对第一组和第二组连接使用相同的连接时,您无法在日志中看到哪个连接被终止。总之,我相信您的代码确实有效。但是,您可能应该调用c.Disconnect(),firstConnectHandler以便在建立第二组连接之前完全关闭初始连接。您还需要将其存储在publisher某处,以便同时断开该连接。注意:我很难理解你为什么要这样做。建立初始连接以检索证书似乎会降低系统的整体安全性。标准方法是给每个客户端一个唯一的证书,然后使用代理上的 ACL 来应用任何必要的限制。对于许多代理,您可以在 ACL 中使用证书公用名(从而消除对第二个连接的需要)。
随时随地看视频慕课网APP

相关分类

Go
我要回答