使用此代码(Paho MQTT)作为 GoRoutine 并通过通道传递消息以通过

作为标准代码,我用来发布消息以进行测试:


func main() {


    opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")

    opts.SetClientID("myclientid_")

    opts.SetDefaultPublishHandler(f)

    opts.SetConnectionLostHandler(connLostHandler)


    opts.OnConnect = func(c MQTT.Client) {

        fmt.Printf("Client connected, subscribing to: test/topic\n")


        if token := c.Subscribe("logs", 0, nil); token.Wait() && token.Error() != nil {

            fmt.Println(token.Error())

            os.Exit(1)

        }

    }


    c := MQTT.NewClient(opts)

    if token := c.Connect(); token.Wait() && token.Error() != nil {

        panic(token.Error())

    }



    for i := 0; i < 5; i++ {

        text := fmt.Sprintf("this is msg #%d!", i)

        token := c.Publish("logs", 0, false, text)

        token.Wait()

    }


    time.Sleep(3 * time.Second)


    if token := c.Unsubscribe("logs"); token.Wait() && token.Error() != nil {

        fmt.Println(token.Error())

        os.Exit(1)

    }


    c.Disconnect(250)

}

这个效果很好!但是在执行高延迟任务时大量传递消息,我的程序性能会很低,所以我必须使用 goroutine 和 Channel。

这段代码正是我想要的!

但作为 Golang 中的菜鸟,我不知道如何START()在主函数中运行函数以及要传递什么参数!

特别是,我将如何使用通道将消息传递给工作人员(发布者)?!

我们将不胜感激您的帮助!



侃侃无极
浏览 131回答 2
2回答

慕斯王

为什么不将消息发送给一群工作人员呢?像这样的东西:...&nbsp; &nbsp; const workerPoolSize = 10 // the number of workers you want to have&nbsp; &nbsp; wg := &sync.WaitGroup{}&nbsp; &nbsp; wCh := make(chan string)&nbsp; &nbsp; wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job&nbsp; &nbsp; // run workers in goroutines&nbsp; &nbsp; for i := 0; i < workerPoolSize; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func(wch <-chan string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // get the data from the channel&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for text := range wch {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c.Publish("logs", 0, false, text)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; token.Wait()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done() // worker says that he finishes the job&nbsp; &nbsp; &nbsp; &nbsp; }(wCh)&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < 5; i++ {&nbsp; &nbsp; &nbsp; &nbsp; // put the data to the channel&nbsp; &nbsp; &nbsp; &nbsp; wCh <- fmt.Sprintf("this is msg #%d!", i)&nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(wCh)&nbsp; &nbsp; wg.Wait() // wait for all workers to finish...

慕姐8265434

当您说“在执行高延迟任务时大量传递消息”时,我假设您的意思是您想要异步发送消息(因此消息由与主代码运行不同的 go 例程处理)。如果是这种情况,那么对您的初始示例进行非常简单的更改将为您提供:for i := 0; i < 5; i++ {        text := fmt.Sprintf("this is msg #%d!", i)        token := c.Publish("logs", 0, false, text)        // comment out... token.Wait()    }注意:您的示例代码可能会在消息实际发送之前退出;添加 time.Sleep(10 * time.Second) 会给它时间让它们熄灭;请参阅下面的代码了解处理此问题的另一种方法您的初始代码在消息发送之前停止的唯一原因是您调用了 token.Wait()。如果您不关心错误(并且您不检查错误,所以我假设您不关心),那么调用 token.Wait() 就没有什么意义(它只是等待消息发送;消息将消失无论你是否调用 token.Wait() )。如果您想记录任何错误,您可以使用类似以下内容:for i := 0; i < 5; i++ {        text := fmt.Sprintf("this is msg #%d!", i)        token := c.Publish("logs", 0, false, text)        go func(){            token.Wait()            err := token.Error()            if err != nil {                fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error            }        }()    }请注意,如果消息传递至关重要(但由于您没有检查错误,我假设它不是),您还需要做一些其他事情。就您找到的代码而言;我怀疑这会增加您不需要的复杂性(并且需要更多信息才能解决此问题;例如,MqttProtocol 结构未在您粘贴的位中定义)。额外的一点......在您的评论中您提到“发布的消息必须排序”。如果这是必要的(因此您想等到每条消息都已送达后再发送另一条消息),那么您需要类似以下内容:msgChan := make(chan string, 200) // Allow a queue of up to 200 messagesvar wg sync.WaitGroupwg.Add(1)go func(){ // go routine to send messages from channel    for msg := range msgChan {        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital        token.Wait()        // should check for errors here    }    wg.Done()}()for i := 0; i < 5; i++ {        text := fmt.Sprintf("this is msg #%d!", i)        msgChan <- text    }close(msgChan) // this will stop the goroutine (when all messages processed)wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)注意:这与 Ilya Kaznacheev 的解决方案类似(如果将workerPoolSize设置为1并使通道缓冲)正如您的评论表明等待组使这一点难以理解,这里是另一种可能更清晰的等待方式(等待组通常在您等待多件事情完成时使用;在这个例子中,我们只等待一件事情,所以可以使用更简单的方法)msgChan := make(chan string, 200) // Allow a queue of up to 200 messagesdone := make(chan struct{}) // channel used to indicate when go routine has finnishedgo func(){ // go routine to send messages from channel    for msg := range msgChan {        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital        token.Wait()        // should check for errors here    }    close(done) // let main routine know we have finnished}()for i := 0; i < 5; i++ {        text := fmt.Sprintf("this is msg #%d!", i)        msgChan <- text    }close(msgChan) // this will stop the goroutine (when all messages processed)<-done // wait for publish go routine to complete
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go