将数据从一个戈鲁丁发送到多个其他戈鲁丁

在项目中,程序通过websocket接收数据。此数据需要由 n 种算法处理。算法的数量可以动态变化。


我的尝试是创建一些发布/订阅模式,可以在其中启动和取消订阅。事实证明,这比预期的更具挑战性。


以下是我想出的(基于 https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):


package pubsub


import (

    "context"

    "sync"

    "time"

)


type Pubsub struct {

    sync.RWMutex

    subs   []*Subsciption

    closed bool

}


func New() *Pubsub {

    ps := &Pubsub{}

    ps.subs = []*Subsciption{}

    return ps

}


func (ps *Pubsub) Publish(msg interface{}) {

    ps.RLock()

    defer ps.RUnlock()


    if ps.closed {

        return

    }


    for _, sub := range ps.subs {

        // ISSUE1: These goroutines apparently do not exit properly... 

        go func(ch chan interface{}) {

            ch <- msg

        }(sub.Data)

    }

}


func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {

    ps.Lock()

    defer ps.Unlock()


    // prep channel

    ctx, cancel := context.WithCancel(context.Background())

    sub := &Subsciption{

        Data:   make(chan interface{}, 1),

        cancel: cancel,

        ps:     ps,

    }


    // prep subsciption

    ps.subs = append(ps.subs, sub)

    return ctx, sub, nil

}


正如评论中提到的,这(至少)有两个问题:

问题 1:

在实现中运行了一段时间后,我得到了一些这样的错误:

goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
    /home/X/Projects/bm/internal/pubsub/pubsub.go:30created by bookmaker/internal/pubsub.(*Pubsub).Publish
    /home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb

在没有真正理解这一点的情况下,在我看来,在中创建的goroutines确实会累积/泄漏。这是正确的吗,我在这里做错了什么?Publish()

问题 2:

当我通过它结束订阅时,它试图写入关闭的通道并恐慌。为了缓解这种情况,我创建了一个 goroutine 来关闭延迟通道。这感觉真的偏离了最佳实践,但我无法找到适当的解决方案。什么是确定性的方法?Unsubscribe()Publish()

这里有一个小操场供您测试:https://play.golang.org/p/K-L8vLjt7_9


小怪兽爱吃肉
浏览 51回答 1
1回答

侃侃无极

在深入研究您的解决方案及其问题之前,让我再次推荐此答案中介绍的另一种 Broker 方法:如何使用通道广播消息现在进入您的解决方案。每当你启动 goroutine 时,请始终考虑它将如何结束,并确保如果 goroutine 不应该在应用的生命周期内运行,请确保它确实如此。// ISSUE1: These goroutines apparently do not exit properly...&nbsp;go func(ch chan interface{}) {&nbsp; &nbsp; ch <- msg}(sub.Data)此 goroutine 尝试在 上发送值。这可能是一个阻塞操作:如果 的缓冲区已满并且 上没有现成的接收器,它将阻塞。这是脱离了发射的goroutine的控制,也脱离了对包装的控制。在某些情况下,这可能很好,但这已经给软件包的用户带来了负担。尽量避免这些。尝试创建易于使用且难以滥用的 API。chchchpubsub此外,仅仅为了在频道上发送价值而启动 goroutine 是一种资源浪费(goroutine 既便宜又轻便,但你不应该尽可能地向它们发送垃圾邮件)。你这样做是因为你不想被阻止。为避免阻塞,您可以使用具有“合理”高缓冲器的缓冲通道。是的,这并不能解决阻塞问题,只能帮助“慢速”客户端从通道接收。要“真正”避免在不启动 goroutine 的情况下阻塞,您可以使用非阻塞发送:select {case ch <- msg:default:&nbsp; &nbsp; // ch's buffer is full, we cannot deliver now}如果发送可以继续,它将发生。如果没有,则立即选择分支。你必须决定该怎么做。“丢失”消息是否可以接受?等到“放弃”可以接受一段时间吗?或者是否可以启动一个goroutine来执行此操作(但随后您将回到我们在这里尝试解决的问题)?或者,在客户端可以从通道接收之前,是否可以被阻止...chdefault选择合理的高缓冲区,如果遇到它仍然变满的情况,在客户端可以前进并从消息接收之前,阻止可能是可以接受的。如果不能,则整个应用可能处于不可接受的状态,并且“挂起”或“崩溃”可能是可以接受的。// ISSUE2: close the channel async with a delay to ensure// nothing will be written to the channel anymore// via a pending goroutine from Publish()go func(ch chan interface{}) {&nbsp; &nbsp; time.Sleep(500 * time.Millisecond)&nbsp; &nbsp; close(ch)}(s.Data)关闭通道是向接收器发出的信号,表示通道上不会发送更多值。因此,关闭通道始终是发送者的工作(和责任)。启动 goroutine 以关闭通道,您将该工作和责任“移交给”另一个不会与发送方同步的“实体”(goroutine)。这可能很容易导致死机(在闭合通道上发送是运行时死机,有关其他公理,请参阅未初始化的通道如何工作?)。别这样。是的,这是必要的,因为您启动了goroutines来发送。如果你不这样做,那么你可以“就地”关闭,而不启动goroutine,因为这样发送者和关闭者将是同一个实体:它本身,其发送和关闭操作受互斥锁保护。因此,解决第一个问题自然而然地解决了第二个问题。Pubsub通常,如果一个通道有多个发送方,则必须协调关闭通道。必须有一个实体(通常不是任何发送方)等待所有发送方完成,实际上使用 一个 ,然后该单个实体可以安全地关闭通道。请参阅关闭长度未知的通道。sync.WaitGroup
打开App,查看更多内容
随时随地看视频慕课网APP