去 PubSub 没有互斥锁?

我将在网站后端实现通知系统,每次页面访问都会为用户订阅页面上显示的一些数据,当系统发生变化时,他会收到通知。例如,有人正在查看包含新闻文章的页面,当发布新文章时,我想通知用户,以便他可以通过 js 或重新加载页面来获取这些新文章。手动或自动。

为了实现这一点,我将以发布/订阅的方式使用频道。例如,会有一个“新闻”频道。创建新文章时,该频道将收到该文章的 id。当用户打开一个页面并订阅“新闻”频道(可能通过 websocket)时,必须有一个此新闻频道的订阅者列表,他将作为订阅者添加到该列表中以得到通知。

就像是:

type Channel struct {
  ingres <-chan int // news article id
  subs [] chan<- int
  mx sync.Mutex}

这些中的每一个都将有一个 goroutine 将进入的内容分发到 subs 列表中。

现在我看到的问题,可能是过早的优化,是会有很多频道和很多来来往往的订户。这意味着会有很多带有互斥锁的世界末日事件。例如,如果有 10 000 个用户在线,订阅了新闻频道,goroutine 将必须发送 10k 个通知,而 subs 切片将被锁定,因此新订阅者将不得不等待互斥锁解锁。现在将其乘以 100 个通道,我认为我们遇到了问题。

因此,我正在寻找一种方法来添加和删除订阅者,而不会阻止其他订阅者被添加或删除,或者可能只是在可接受的时间内全面接收通知。

“等待所有子程序接收”问题可以通过 goroutine 解决每个子程序的超时问题,因此在收到 id 后,将创建 10k 个 goroutines 并且可以立即解锁互斥锁。但是,它仍然可以添加多个渠道。


当年话下
浏览 72回答 1
1回答

慕容森

根据链接的评论,我想出了这段代码:package notifimport (&nbsp; &nbsp; "context"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time"&nbsp; &nbsp; "unsafe")type Client struct {&nbsp; &nbsp; recv&nbsp; &nbsp;chan interface{}&nbsp; &nbsp; ch&nbsp; &nbsp; &nbsp;*Channel&nbsp; &nbsp; o&nbsp; &nbsp; &nbsp; sync.Once&nbsp; &nbsp; ctx&nbsp; &nbsp; context.Context&nbsp; &nbsp; cancel context.CancelFunc}// will be nil if this client is write-onlyfunc (c *Client) Listen() <-chan interface{} {&nbsp; &nbsp; return c.recv}func (c *Client) Close() {&nbsp; &nbsp; select {&nbsp; &nbsp; case <-c.ctx.Done():&nbsp; &nbsp; case c.ch.unsubscribe <- c:&nbsp; &nbsp; }}func (c *Client) Done() <-chan struct{} {&nbsp; &nbsp; return c.ctx.Done()}func (c *Client) doClose() {&nbsp; &nbsp; c.o.Do(func() {&nbsp; &nbsp; &nbsp; &nbsp; c.cancel()&nbsp; &nbsp; &nbsp; &nbsp; if c.recv != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(c.recv)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; })}func (c *Client) send(msg interface{}) {&nbsp; &nbsp; // write-only clients will not handle any messages&nbsp; &nbsp; if c.recv == nil {&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; t := time.NewTimer(c.ch.sc)&nbsp; &nbsp; select {&nbsp; &nbsp; case <-c.ctx.Done():&nbsp; &nbsp; case c.recv <- msg:&nbsp; &nbsp; case <-t.C:&nbsp; &nbsp; &nbsp; &nbsp; // time out/slow consumer, close the connection&nbsp; &nbsp; &nbsp; &nbsp; c.Close()&nbsp; &nbsp; }}func (c *Client) Broadcast(payload interface{}) bool {&nbsp; &nbsp; select {&nbsp; &nbsp; case <-c.ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; return false&nbsp; &nbsp; default:&nbsp; &nbsp; &nbsp; &nbsp; c.ch.Broadcast() <- &envelope{Message: payload, Sender: uintptr(unsafe.Pointer(c))}&nbsp; &nbsp; &nbsp; &nbsp; return true&nbsp; &nbsp; }}type envelope struct {&nbsp; &nbsp; Message interface{}&nbsp; &nbsp; Sender&nbsp; uintptr}// leech is channel-blocking so goroutine should be called internally to make it non-blocking// this is to ensure proper order of leeched messages.func NewChannel(ctx context.Context, name string, slowConsumer time.Duration, emptyCh chan string, leech func(interface{})) *Channel {&nbsp; &nbsp; return &Channel{&nbsp; &nbsp; &nbsp; &nbsp; name:&nbsp; &nbsp; &nbsp; &nbsp; name,&nbsp; &nbsp; &nbsp; &nbsp; ingres:&nbsp; &nbsp; &nbsp; make(chan interface{}, 1000),&nbsp; &nbsp; &nbsp; &nbsp; subscribe:&nbsp; &nbsp;make(chan *Client, 1000),&nbsp; &nbsp; &nbsp; &nbsp; unsubscribe: make(chan *Client, 1000),&nbsp; &nbsp; &nbsp; &nbsp; aud:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;make(map[*Client]struct{}, 1000),&nbsp; &nbsp; &nbsp; &nbsp; ctx:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ctx,&nbsp; &nbsp; &nbsp; &nbsp; sc:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; slowConsumer,&nbsp; &nbsp; &nbsp; &nbsp; empty:&nbsp; &nbsp; &nbsp; &nbsp;emptyCh,&nbsp; &nbsp; &nbsp; &nbsp; leech:&nbsp; &nbsp; &nbsp; &nbsp;leech,&nbsp; &nbsp; }}type Channel struct {&nbsp; &nbsp; name&nbsp; &nbsp; &nbsp; &nbsp; string&nbsp; &nbsp; ingres&nbsp; &nbsp; &nbsp; chan interface{}&nbsp; &nbsp; subscribe&nbsp; &nbsp;chan *Client&nbsp; &nbsp; unsubscribe chan *Client&nbsp; &nbsp; aud&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;map[*Client]struct{}&nbsp; &nbsp; ctx&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;context.Context&nbsp; &nbsp; sc&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Duration&nbsp; &nbsp; empty&nbsp; &nbsp; &nbsp; &nbsp;chan string&nbsp; &nbsp; leech&nbsp; &nbsp; &nbsp; &nbsp;func(interface{})}func (ch *Channel) Id() string {&nbsp; &nbsp; return ch.name}// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode// in which case the client will not be disconnected for being slow reader.func (ch *Channel) Subscribe(writeOnly ...bool) *Client {&nbsp; &nbsp; c := &Client{&nbsp; &nbsp; &nbsp; &nbsp; ch: ch,&nbsp; &nbsp; }&nbsp; &nbsp; if len(writeOnly) == 0 || writeOnly[0] == false {&nbsp; &nbsp; &nbsp; &nbsp; c.recv = make(chan interface{})&nbsp; &nbsp; }&nbsp; &nbsp; c.ctx, c.cancel = context.WithCancel(ch.ctx)&nbsp; &nbsp; ch.subscribe <- c&nbsp; &nbsp; return c}func (ch *Channel) Broadcast() chan<- interface{} {&nbsp; &nbsp; return ch.ingres}// returns once context is cancelledfunc (ch *Channel) Start() {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-ch.ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for cl := range ch.aud {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delete(ch.aud, cl)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cl.doClose()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; case cl := <-ch.subscribe:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch.aud[cl] = struct{}{}&nbsp; &nbsp; &nbsp; &nbsp; case cl := <-ch.unsubscribe:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delete(ch.aud, cl)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cl.doClose()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if len(ch.aud) == 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch.signalEmpty()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; case msg := <-ch.ingres:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e, ok := msg.(*envelope)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg = e.Message&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for cl := range ch.aud {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ok == false || uintptr(unsafe.Pointer(cl)) != e.Sender {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go cl.send(e.Message)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ch.leech != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch.leech(msg)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func (ch *Channel) signalEmpty() {&nbsp; &nbsp; if ch.empty == nil {&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; select {&nbsp; &nbsp; case ch.empty <- ch.name:&nbsp; &nbsp; default:&nbsp; &nbsp; }}type subscribeRequest struct {&nbsp; &nbsp; name string&nbsp; &nbsp; recv chan *Client&nbsp; &nbsp; wo&nbsp; &nbsp;bool}type broadcastRequest struct {&nbsp; &nbsp; name string&nbsp; &nbsp; recv chan *Channel}type brokeredChannel struct {&nbsp; &nbsp; ch&nbsp; &nbsp; &nbsp;*Channel&nbsp; &nbsp; cancel context.CancelFunc}type brokerLeech interface {&nbsp; &nbsp; Match(string) func(interface{})}func NewBroker(ctx context.Context, slowConsumer time.Duration, leech brokerLeech) *Broker {&nbsp; &nbsp; return &Broker{&nbsp; &nbsp; &nbsp; &nbsp; chans:&nbsp; &nbsp; &nbsp;make(map[string]*brokeredChannel, 100),&nbsp; &nbsp; &nbsp; &nbsp; subscribe: make(chan *subscribeRequest, 10),&nbsp; &nbsp; &nbsp; &nbsp; broadcast: make(chan *broadcastRequest, 10),&nbsp; &nbsp; &nbsp; &nbsp; ctx:&nbsp; &nbsp; &nbsp; &nbsp;ctx,&nbsp; &nbsp; &nbsp; &nbsp; sc:&nbsp; &nbsp; &nbsp; &nbsp; slowConsumer,&nbsp; &nbsp; &nbsp; &nbsp; empty:&nbsp; &nbsp; &nbsp;make(chan string, 10),&nbsp; &nbsp; &nbsp; &nbsp; leech:&nbsp; &nbsp; &nbsp;leech,&nbsp; &nbsp; }}type Broker struct {&nbsp; &nbsp; chans&nbsp; &nbsp; &nbsp;map[string]*brokeredChannel&nbsp; &nbsp; subscribe chan *subscribeRequest&nbsp; &nbsp; broadcast chan *broadcastRequest&nbsp; &nbsp; ctx&nbsp; &nbsp; &nbsp; &nbsp;context.Context&nbsp; &nbsp; sc&nbsp; &nbsp; &nbsp; &nbsp; time.Duration&nbsp; &nbsp; empty&nbsp; &nbsp; &nbsp;chan string&nbsp; &nbsp; leech&nbsp; &nbsp; &nbsp;brokerLeech}// returns once context is cancelledfunc (b *Broker) Start() {&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-b.ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; case req := <-b.subscribe:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch, ok := b.chans[req.name]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ok == false {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx, cancel := context.WithCancel(b.ctx)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var l func(interface{})&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if b.leech != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; l = b.leech.Match(req.name)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch = &brokeredChannel{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch:&nbsp; &nbsp; &nbsp;NewChannel(ctx, req.name, b.sc, b.empty, l),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cancel: cancel,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; b.chans[req.name] = ch&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go ch.ch.Start()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.recv <- ch.ch.Subscribe(req.wo)&nbsp; &nbsp; &nbsp; &nbsp; case req := <-b.broadcast:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch, ok := b.chans[req.name]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ok == false {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx, cancel := context.WithCancel(b.ctx)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var l func(interface{})&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if b.leech != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; l = b.leech.Match(req.name)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch = &brokeredChannel{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch:&nbsp; &nbsp; &nbsp;NewChannel(ctx, req.name, b.sc, b.empty, l),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cancel: cancel,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; b.chans[req.name] = ch&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go ch.ch.Start()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req.recv <- ch.ch&nbsp; &nbsp; &nbsp; &nbsp; case name := <-b.empty:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if ch, ok := b.chans[name]; ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch.cancel()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delete(b.chans, name)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}// subscription is read-write by default. by providing "writeOnly=true", it can be switched into write-only mode// in which case the client will not be disconnected for being slow reader.func (b *Broker) Subscribe(name string, writeOnly ...bool) *Client {&nbsp; &nbsp; req := &subscribeRequest{&nbsp; &nbsp; &nbsp; &nbsp; name: name,&nbsp; &nbsp; &nbsp; &nbsp; recv: make(chan *Client),&nbsp; &nbsp; &nbsp; &nbsp; wo:&nbsp; &nbsp;len(writeOnly) > 0 && writeOnly[0] == true,&nbsp; &nbsp; }&nbsp; &nbsp; b.subscribe <- req&nbsp; &nbsp; c := <-req.recv&nbsp; &nbsp; close(req.recv)&nbsp; &nbsp; return c}func (b *Broker) Broadcast(name string) chan<- interface{} {&nbsp; &nbsp; req := &broadcastRequest{name: name, recv: make(chan *Channel)}&nbsp; &nbsp; b.broadcast <- req&nbsp; &nbsp; ch := <-req.recv&nbsp; &nbsp; close(req.recv)&nbsp; &nbsp; return ch.ingres}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go