GoLang pubsub 服务器停止监听新频道订阅

我的 go websocket 应用程序中有一个 redis pubsub 连接,因此每当客户端连接并订阅频道时,它都会监听并发送消息。但是,假设 Client 1 订阅了 channel X,pubsub 开始监听并接收来自它的消息。


现在,客户端 1 也订阅了频道Y,因此服务器也应该收听来自该频道的消息,但是它停止收听X并且只收听Y.


    for {

        switch v := gPubSubConn.Receive().(type) {

        case redis.Message:

            log.Printf("Received message from %s", v.Channel)

            subscriptions := ps.GetSubscriptions(v.Channel, nil)

            for _, sub := range subscriptions {

                if v.Channel == types.TaskResults {

                    go sendTaskResultMessage(v.Data, sub)

                } else if v.Channel == types.TaskCount {

                    go sendTaskCountMessage(v.Data, sub)

                }

            }

        case redis.Subscription:

            log.Printf("Subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)

        case error:

            log.Println("Error pub/sub, delivery stopped")

            return

        }

这是一个示例日志输出


go-1  | New Client is connected, total:  1

go-1  | 2022/02/16 17:36:03 signature is invalid

go-1  | 2022/02/16 17:36:03 Subscription message: task_count: subscribe 1

go-1  | 2022/02/16 17:36:06 Received message from task_count

go-1  | 2022/02/16 17:36:06 Received message from task_count

go-1  | New Client is connected, total:  2

go-1  | 2022/02/16 17:36:14 signature is invalid

go-1  | 2022/02/16 17:36:14 Subscription message: task_results: subscribe 1

go-1  | 2022/02/16 17:36:16 Received message from task_count

go-1  | 2022/02/16 17:36:16 Received message from task_results

go-1  | 2022/02/16 17:36:16 Received message from task_results

go-1  | 2022/02/16 17:36:21 Received message from task_results

go-1  | 2022/02/16 17:36:21 Received message from task_results

go-1  | 2022/02/16 17:36:26 Received message from task_results

go-1  | 2022/02/16 17:36:26 Received message from task_results

go-1  | 2022/02/16 17:36:31 Received message from task_results

go-1  | 2022/02/16 17:36:31 Received message from task_results

有什么想法吗?


qq_花开花谢_0
浏览 59回答 1
1回答

Helenr

直接的问题是由以下行引起的websocketHandler:gPubSubConn = &redis.PubSubConn{Conn: gRedisConn.Get()}此行将当前的 pubsub 连接替换为新连接。新连接没有任何订阅。之前的连接泄露了。在应用程序启动时创建一次 pubsub 连接。该应用程序至少有一个数据竞争。使用竞赛检测器运行应用程序并修复报告的问题。
打开App,查看更多内容
随时随地看视频慕课网APP