从 Redis 退订似乎不起作用

我正在尝试在 Redis 中使用 pub-sub。我要做的是打开两个redis-cli. 第一个我发出命令flushall以确保以绿色启动。


然后在另一个终端中,我打开MONITOR以打印来自 Golang 示例客户端的所有命令(代码如下)。


这是我从 MONITOR 打印的内容:


1590207069.340860 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"

1590207069.341380 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "New"

1590207069.345266 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"

1590207069.353706 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "Old"

1590207069.354219 [0 127.0.0.1:58912] "subscribe" "New"

1590207069.354741 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"

1590207069.355444 [0 127.0.0.1:58912] "unsubscribe" "New" "Old"

1590207069.356754 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "OldPlusPlus"

1590207069.357206 [0 127.0.0.1:58914] "subscribe" "New" "Old"

1590207069.357656 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"

1590207069.358362 [0 127.0.0.1:58912] "unsubscribe" "OldPlusPlus" "New" "Old"

1590207069.361030 [0 127.0.0.1:58916] "subscribe" "OldPlusPlus" "New" "Old"

我试图让客户端对随着时间的推移打开的所有通道都有一个连接。而不是一个连接/线程来处理 Redis 的每个通道。因此,每当需要新的订阅请求时,我都会尝试从客户端删除所有以前的订阅,并对旧频道和新频道进行新订阅。


但似乎该unsubscribe命令没有按预期工作(或者我遗漏了一些东西)!


因为当我尝试获取每个频道的客户端数量时,从第一个终端:


127.0.0.1:6379> pubsub numsub OldPlusPlus New Old

1) "OldPlusPlus"

2) (integer) 1

3) "New"

4) (integer) 2

5) "Old"

6) (integer) 2

除了当我尝试向“新”频道发送消息时,我的 go 客户端收到了两次消息!


繁星点点滴滴
浏览 137回答 1
1回答

冉冉说

问题与订阅频道的 *redis.PubSub 类型的对象不是用于取消订阅频道的对象有关。因此,我必须维护对此类对象的引用,然后使用该引用取消订阅所有频道。这是修改和工作的代码:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "github.com/go-redis/redis/v7"&nbsp; &nbsp; "log")type user struct {&nbsp; &nbsp; name&nbsp; &nbsp; &nbsp; &nbsp; string&nbsp; &nbsp; rooms&nbsp; &nbsp; &nbsp; &nbsp;[]string&nbsp; &nbsp; stopRunning chan bool&nbsp; &nbsp; running&nbsp; &nbsp; &nbsp;bool&nbsp; &nbsp; roomsPubsub map[string]*redis.PubSub}func (u *user) connect(rdb *redis.Client) error {&nbsp; &nbsp; // get all user rooms (from DB) and start subscribe&nbsp; &nbsp; r, err := rdb.SMembers(fmt.Sprintf("user:%s:rooms", u.name)).Result()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; }&nbsp; &nbsp; u.rooms = r&nbsp; &nbsp; if len(u.rooms) == 0 {&nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; }&nbsp; &nbsp; u.doSubscribe("", rdb)&nbsp; &nbsp; return nil}func (u *user) subscribe(room string, rdb *redis.Client) error {&nbsp; &nbsp; // check if already subscribed&nbsp; &nbsp; for i := range u.rooms {&nbsp; &nbsp; &nbsp; &nbsp; if u.rooms[i] == room {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; // add room to user&nbsp; &nbsp; userRooms := fmt.Sprintf("user:%s:rooms", u.name)&nbsp; &nbsp; if err := rdb.SAdd(userRooms, room).Err(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; }&nbsp; &nbsp; // get all user rooms (from DB) and start subscribe&nbsp; &nbsp; r, err := rdb.SMembers(userRooms).Result()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; }&nbsp; &nbsp; u.rooms = r&nbsp; &nbsp; if u.running {&nbsp; &nbsp; &nbsp; &nbsp; u.stopRunning <- true&nbsp; &nbsp; }&nbsp; &nbsp; u.doSubscribe(room, rdb)&nbsp; &nbsp; return nil}func (u *user) doSubscribe(room string, rdb *redis.Client) {&nbsp; &nbsp; pubSub := rdb.Subscribe(u.rooms...)&nbsp; &nbsp; if len(room) > 0 {&nbsp; &nbsp; &nbsp; &nbsp; u.roomsPubsub[room] = pubSub&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; u.running = true&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("starting the listener for user:", u.name, "on rooms:", u.rooms)&nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case msg, ok := <-pubSub.Channel():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(msg.Payload, msg.Channel)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case <-u.stopRunning:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Stop listening for user:", u.name, "on old rooms")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for k, v := range u.roomsPubsub {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err := v.Unsubscribe(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("unable to unsubscribe", err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delete(u.roomsPubsub, k)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()}func (u *user) unsubscribe(room string, rdb *redis.Client) error {&nbsp; &nbsp; return nil}var rdb *redis.Clientfunc main() {&nbsp; &nbsp; rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})&nbsp; &nbsp; u := &user{&nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; &nbsp; &nbsp; "Wael",&nbsp; &nbsp; &nbsp; &nbsp; stopRunning: make(chan bool),&nbsp; &nbsp; &nbsp; &nbsp; roomsPubsub: make(map[string]*redis.PubSub),&nbsp; &nbsp; }&nbsp; &nbsp; if err := u.connect(rdb); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }&nbsp; &nbsp; if err := u.subscribe("New", rdb); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }&nbsp; &nbsp; if err := u.subscribe("Old", rdb); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }&nbsp; &nbsp; if err := u.subscribe("OldPlusPlus", rdb); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }&nbsp; &nbsp; select {}}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go