我正在尝试动态更改 mqtt 客户端处理程序和证书,当订阅者和发布者连接时,这会导致订阅者 EOF
这就是我正在尝试做的,
1] 我正在初始化订阅者/发布者(使用 firstPubHandler、firstConnectHandler 和默认证书)
2] 使用发布者在服务器上发送注册消息以获取新证书详细信息
3] 服务器将返回证书详细信息,该响应将由 firstConnectHandler 在主题.../id/Certificate上处理以下载证书。
4] firstPubHandler 将处理服务器的响应并重新初始化发布者/订阅者(使用 messagePubHandler、connectHandler 和新下载的证书),connectHandler 将侦听所有主题/id/+
一切正常,除了当我重新初始化订阅者/发布者时,订阅者不断断开连接并出现错误“EOF”
我在这里做错什么了吗?或者有没有更好的方法来做到这一点?任何帮助表示赞赏
- 主功能
var opt Params
var publisher mqtt.Client
var subscriber mqtt.Client
func main() {
InitializeBroker(firstPubHandler, firstConnectHandler)
//Ultimately it will trigger message on ".../id/Certificate" topic which will be handled byfirstConnectHandler
PublishRegistrationMessage(publisher)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)
go func() {
for {
}
}()
<-done
<-c
DisconnectBrocker()
}
-- 处理程序
// First handlers
var firstPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
DownloadCertificates(msg.Payload())
InitializeBroker(messagePubHandler, connectHandler)
}
var firstConnectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
if token := c.Subscribe(opt.SubClientId+"/id/Certificate", 0, firstPubHandler); token.Wait() && token.Error() != nil {
log.Error(token.Error())
}
}
// Second handlers
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
ProcessMessage(msg.Payload())
}
var connectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
if token := c.Subscribe(opt.SubClientId+"/id/+", 0, messagePubHandler); token.Wait() && token.Error() != nil {
log.Error(token.Error())
}
}
// Common handler
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Info(err)
}
摇曳的蔷薇
相关分类