猿问

Golang - 为到不同服务器的多个连接扩展 websocket 客户端

我有一个 websocket 客户端。实际上,它比下面显示的基本代码复杂得多。我现在需要扩展此客户端代码以打开到多个服务器的连接。最终,从服务器接收到消息时需要执行的任务是相同的。处理这个问题的最佳方法是什么?正如我上面所说,接收消息时执行的实际代码比示例中显示的要复杂得多。


package main


import (

        "flag"

        "log"

        "net/url"

        "os"

        "os/signal"

        "time"


        "github.com/gorilla/websocket"

)


var addr = flag.String("addr", "localhost:1234", "http service address")


func main() {

        flag.Parse()

        log.SetFlags(0)


        interrupt := make(chan os.Signal, 1)

        signal.Notify(interrupt, os.Interrupt)


        // u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}

        u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}

        log.Printf("connecting to %s", u.String())


        c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)

        if err != nil {

                log.Fatal("dial:", err)

        }

        defer c.Close()


        done := make(chan struct{})


        go func() {

                defer close(done)

                for {

                        _, message, err := c.ReadMessage()

                        if err != nil {

                                log.Println("read:", err)

                                return

                        }

                        log.Printf("recv: %s", message)

                }

        }()


        ticker := time.NewTicker(time.Second)

        defer ticker.Stop()


        for {

                select {

                case <-done:

                        return

                case t := <-ticker.C:

                        err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))

                        if err != nil {

                                log.Println("write:", err)

                                return

                        }



胡子哥哥
浏览 219回答 2
2回答

眼眸繁星

修改中断处理以在中断时关闭通道。这允许多个 goroutines 通过等待通道关闭来等待事件。shutdown := make(chan struct{})interrupt := make(chan os.Signal, 1)signal.Notify(interrupt, os.Interrupt)go func() {    <-interrupt    log.Println("interrupt")    close(shutdown)}()将每个连接代码移动到一个函数中。这段代码是从问题中复制粘贴的,有两个变化:中断通道被关闭通道替换;该函数在函数完成时通知 sync.WaitGroup。func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {    defer wg.Done()    log.Printf("connecting to %s", u)    c, _, err := websocket.DefaultDialer.Dial(u, nil)    if err != nil {        log.Fatal("dial:", err)    }    defer c.Close()    done := make(chan struct{})    go func() {        defer close(done)        for {            _, message, err := c.ReadMessage()            if err != nil {                log.Println("read:", err)                return            }            log.Printf("recv: %s", message)        }    }()    ticker := time.NewTicker(time.Second)    defer ticker.Stop()    for {        select {        case <-done:            return        case t := <-ticker.C:            err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))            if err != nil {                log.Println("write:", err)                return            }        case <-shutdown:            // Cleanly close the connection by sending a close message and then            // waiting (with timeout) for the server to close the connection.            err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))            if err != nil {                log.Println("write close:", err)                return            }            select {            case <-done:            case <-time.After(time.Second):            }            return        }    }}在中声明一个sync.WaitGroupmain()。对于要连接到的每个 websocket 端点,递增 WaitGroup 并启动 goroutine 以连接该端点。启动 goroutine 后,在 WaitGroup 上等待 goroutine 完成。var wg sync.WaitGroupfor _, u := range endpoints { // endpoints is []string                               // where elements are URLs                               // of endpoints to connect to.    wg.Add(1)    go connect(u, shutdown, &wg)}wg.Wait()

开满天机

与每个不同服务器的通信是否完全独立于其他服务器?如果是的话,我会以这样的方式四处走动:在main中创建一个带有取消函数的上下文在 main 中创建一个等待组来跟踪启动的 goroutines对于每个服务器,添加到等待组,从传递上下文和等待组引用的主函数启动一个新的 goroutinemain进入一个 for/select 循环,监听信号,如果信号到达,调用 cancelfunc 并等待等待组。main还可以监听来自 goroutines 的结果 chan,并可能自己打印结果,因为 goroutines 不应该直接这样做。正如我们所说,每个goroutine都有 wg 的引用、上下文和可能的 chan 以返回结果。现在,如果 goroutine 必须只做一件事,或者它是否需要做一系列事情,这个方法就会分裂。对于第一种方法如果只有一件事要做,我们会遵循此处描述的方法(观察到异步他会依次启动一个新的 goroutine 来执行 DoSomething() 步骤,该步骤将在通道上返回结果)这允许它能够随时接受取消信号。由您决定您想要的非阻塞程度以及您想要响应取消信号的速度。此外,将关联的上下文传递给 goroutines 的好处是您可以调用 Context enabled大多数库函数的版本。例如,如果您希望您的拨号超时为 1 分钟,您可以创建一个新的上下文,该上下文从传递的超时开始,然后使用该超时创建 DialContext。这允许拨号从超时或父(您在 main 中创建的)上下文的 cancelfunc 被调用时停止。如果需要做更多的事情,我通常更喜欢用 goroutine 做一件事,让它调用一个新的 goroutine 来执行下一步(将所有引用传递到管道中)然后退出。这种方法可以很好地扩展取消,并能够在任何步骤停止流水线,并且可以轻松地为可能需要太长时间的步骤支持带有 dealines 的上下文。
随时随地看视频慕课网APP

相关分类

Go
我要回答