在项目中,程序通过websocket接收数据。此数据需要由 n 种算法处理。算法的数量可以动态变化。
我的尝试是创建一些发布/订阅模式,可以在其中启动和取消订阅。事实证明,这比预期的更具挑战性。
以下是我想出的(基于 https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):
package pubsub
import (
"context"
"sync"
"time"
)
type Pubsub struct {
sync.RWMutex
subs []*Subsciption
closed bool
}
func New() *Pubsub {
ps := &Pubsub{}
ps.subs = []*Subsciption{}
return ps
}
func (ps *Pubsub) Publish(msg interface{}) {
ps.RLock()
defer ps.RUnlock()
if ps.closed {
return
}
for _, sub := range ps.subs {
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
}
}
func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
ps.Lock()
defer ps.Unlock()
// prep channel
ctx, cancel := context.WithCancel(context.Background())
sub := &Subsciption{
Data: make(chan interface{}, 1),
cancel: cancel,
ps: ps,
}
// prep subsciption
ps.subs = append(ps.subs, sub)
return ctx, sub, nil
}
正如评论中提到的,这(至少)有两个问题:
问题 1:
在实现中运行了一段时间后,我得到了一些这样的错误:
goroutine 120624 [runnable]: bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740) /home/X/Projects/bm/internal/pubsub/pubsub.go:30created by bookmaker/internal/pubsub.(*Pubsub).Publish /home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb
在没有真正理解这一点的情况下,在我看来,在中创建的goroutines确实会累积/泄漏。这是正确的吗,我在这里做错了什么?Publish()
问题 2:
当我通过它结束订阅时,它试图写入关闭的通道并恐慌。为了缓解这种情况,我创建了一个 goroutine 来关闭延迟通道。这感觉真的偏离了最佳实践,但我无法找到适当的解决方案。什么是确定性的方法?Unsubscribe()
Publish()
这里有一个小操场供您测试:https://play.golang.org/p/K-L8vLjt7_9
侃侃无极
相关分类