Redis golang 客户端定期丢弃错误的 PubSub 连接(EOF)

我做了什么:

我正在使用golang来自 的 Redis 库github.com/go-redis/redis。我的客户端监听一个名为“control”的 PubSub 通道。每当消息到达时,我都会处理它并继续接收下一条消息。我没完没了地听着,信息经常出现,有时好几天都没有。

我的期望:

我希望 Redis 通道能够无限地保持打开状态并在发送消息时接收消息。

我的经历:

通常它会运行几天,但偶尔client.Receive()会返回EOF错误。发生此错误后,客户端不再在该通道上接收消息。在内部,redis 客户端向 stdout 打印以下消息:

redis: 2019/08/29 14:18:57 pubsub.go:151: redis: 丢弃坏的 PubSub 连接: EOF

免责声明:我不确定这个错误是导致我停止接收消息的原因,它只是看起来相关。

附加问题:

我想了解为什么会发生这种情况,如果这是正常的,并且client.Subscribe()每当我遇到这种行为时重新连接到频道是否是一个很好的补救措施,或者我应该解决根本问题,无论它是什么。

代码:

这是处理我的客户端的完整代码(连接到 redis、订阅频道、无休止地接收消息):

func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {

    rootLogger = log.With(zap.String("component", "redis-client"))


    host := env.RedisHost

    port := env.RedisPort

    pass := env.RedisPass

    addr := fmt.Sprintf("%s:%s", host, port)

    tlsCfg := &tls.Config{}

    client = redis.NewClient(&redis.Options{

        Addr:      addr,

        Password:  pass,

        TLSConfig: tlsCfg,

    })


    if _, err := client.Ping().Result(); err != nil {

        return err

    }


    go func() {

        controlSub := client.Subscribe("control")

        defer controlSub.Close()

        for {

            in, err := controlSub.Receive()  // *** SOMETIMES RETURNS EOF ERROR ***

            if err != nil {

                rootLogger.Error("failed to get feedback", zap.Error(err))

                break

            }

            switch in.(type) {

            case *redis.Message:

                cm := comm.ControlMessageEvent{}

                payload := []byte(in.(*redis.Message).Payload)

                if err := json.Unmarshal(payload, &cm); err != nil {

                    rootLogger.Error("failed to parse control message", zap.Error(err))

                } else if err := handleIncomingEvent(&cm); err != nil {

                    rootLogger.Error("failed to handle control message", zap.Error(err))

                }



狐的传说
浏览 224回答 3
3回答

慕容708150

pubsub.Channel()我通过遍历从而不是返回的通道来解决断开连接Receive()。这是新代码:func listenToControlChannel(client *redis.Client) {    pubsub := client.Subscribe("control")    defer pubsub.Close()    if _, err := pubsub.Receive(); err != nil {        rootLogger.Error("failed to receive from control PubSub", zap.Error(err))        return    }    controlCh := pubsub.Channel()    fmt.Println("start listening on control PubSub")    // Endlessly listen to control channel,    for msg := range controlCh {        cm := ControlMessageEvent{}        payload := []byte(msg.Payload)        if err := json.Unmarshal(payload, &cm); err != nil {            fmt.Printf("failed to parse control message: %s\n", err.Error())        } else if err := handleIncomingEvent(&cm); err != nil {            fmt.Printf("failed to handle control message: %s\n", err.Error())        }    }}

繁星点点滴滴

我不知道如果这是正确的方法,但在创建新的 Redis 客户端时,将ReadTimeout属性设置为-1解决了我的问题。redisClient := redis.NewClient(&redis.Options{    Addr:        addr,    Password:    redisConf.Password,    DB:          0, // Default DB    ReadTimeout: -1,})注意:我使用的是 go-redis/v9

冉冉说

我的看法是,如果 Redis 认为客户端空闲,它可能会断开你的客户端的连接。解决这个问题的方法似乎是这样的:使用ReceiveTimeout而不是Receive.如果操作超时,则发出Ping并等待回复。冲洗,重复。这样,您就可以确保连接上存在一些流量,无论是否实际发布了任何数据。
打开App,查看更多内容
随时随地看视频慕课网APP