关闭和发送到通道之间的竞争条件

我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。您会看到,管道的工作是从输入通道接收数据,对其进行处理,然后将结果输出到通道上。这是它的预期行为:

  1. 从输入通道接收数据。

  2. 将数据委托给可用的工作人员。

  3. 工作人员将结果发送到输出通道。

  4. 所有工作人员完成后关闭输出通道。

func (p *pipe) Process(in chan interface{}) (out chan interface{}) {

    var wg sync.WaitGroup

    out = make(chan interface{}, 100)

    go func() {

        for i := 1; i <= 100; i++ {

            go p.work(in, out, &wg)

        }

        wg.Wait()

        close(out)

    }()


    return

}


func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {

    for j := range jobs {

        func(j Job) {

            defer wg.Done()

            wg.Add(1)


            res := doSomethingWith(j)


            out <- res

        }(j)

    }

}


但是,运行它可能会在不处理所有输入的情况下退出,或者会出现一条send on closed channel消息恐慌。使用标志构建源会在和-race之间发出数据竞争警告。close(out)out <- res


以下是我认为可能发生的情况。一旦许多工人完成了他们的工作,就会有一瞬间wg的计数器达到零。因此,wg.Wait()完成并且程序继续进行close(out)。同时,job 通道还没有完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于out通道已经关闭,因此会导致恐慌。


等待组是否应该放在其他地方?还是有更好的方法来等待所有工人完成?


holdtom
浏览 109回答 2
2回答

慕尼黑8549860

目前尚不清楚为什么每个工作需要一名工人,但如果你这样做了,你可以重组你的外循环设置(见下面未经测试的代码)。这种方式首先消除了对工作池的需求。但是,总是在剥离任何工人wg.Add 之前做一个。在这里,你正在剥离 100 个工人:var wg sync.WaitGroupout = make(chan interface{}, 100)go func() {&nbsp; &nbsp; for i := 1; i <= 100; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go p.work(in, out, &wg)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; close(out)}()因此,您可以这样做:var wg sync.WaitGroupout = make(chan interface{}, 100)go func() {&nbsp; &nbsp; wg.Add(100)&nbsp; // ADDED - count the 100 workers&nbsp; &nbsp; for i := 1; i <= 100; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go p.work(in, out, &wg)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; close(out)}()请注意,您现在可以将wg自己向下移动到剥离工作人员的 goroutine 中。如果你放弃让每个工作人员将工作作为新的 goroutine 分拆的想法,这可以让事情变得更干净。但是,如果每个 worker 都将衍生出另一个 goroutine,那么该 worker 本身也必须使用wg.Add,如下所示:for j := range jobs {&nbsp; &nbsp; wg.Add(1)&nbsp; // ADDED - count the spun-off goroutines&nbsp; &nbsp; func(j Job) {&nbsp; &nbsp; &nbsp; &nbsp; res := doSomethingWith(j)&nbsp; &nbsp; &nbsp; &nbsp; out <- res&nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; // MOVED (for illustration only, can defer as before)&nbsp; &nbsp; }(j)}wg.Done() // ADDED - our work in `p.work` is now done也就是说,每个匿名函数都是通道的另一个用户,因此在启动wg.Add(1)新的 goroutine 之前增加用户的通道计数 ()。当你读完输入通道jobs后,调用wg.Done()(可能是通过之前的defer,但我在最后展示了它)。考虑这一点的关键是计算此时可以写入通道wg的活动 goroutine 的数量。只有当没有goroutine 打算再写时,它才会变为零。 这样可以安全地关闭通道。考虑使用更简单的(但未经测试):func (p *pipe) Process(in chan interface{}) (out chan interface{}) {&nbsp; &nbsp; out = make(chan interface{})&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer close(out)&nbsp; &nbsp; &nbsp; &nbsp; for j := range in {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func(j Job) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res := doSomethingWith(j)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- res&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }(j)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; }()&nbsp; &nbsp; return out}您现在有一个 goroutine 正在in尽可能快地读取通道,并在运行时分拆作业。每个传入的工作都会得到一个 goroutine,除非他们提前完成工作。没有池,每个作业只有一个工人(与您的代码相同,只是我们淘汰了没有做任何有用的池)。或者,因为只有一些可用的 CPU,所以在开始时像以前一样分拆一些 goroutine,但让每个 goroutine 运行一个作业以完成,并交付其结果,然后返回阅读下一个作业:func (p *pipe) Process(in chan interface{}) (out chan interface{}) {&nbsp; &nbsp; out = make(chan interface{})&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer close(out)&nbsp; &nbsp; &nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; &nbsp; &nbsp; ncpu := runtime.NumCPU()&nbsp; // or something fancier if you like&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(ncpu)&nbsp; &nbsp; &nbsp; &nbsp; for i := 0; i < ncpu; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for j := range in {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- doSomethingWith(j)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; }&nbsp; &nbsp; return out}通过使用runtime.NumCPU(),我们只能获得与运行作业的 CPU 一样多的工作人员读取作业。那些是游泳池,他们一次只做一项工作。如果输出通道读取器结构良好(即,不会导致管道阻塞),通常不需要缓冲输出通道。如果不是,则此处的缓冲深度会限制您可以“提前”完成多少工作,而无论谁正在使用结果。根据“提前工作”执行此操作的有用程度来设置它 - 不一定是 CPU 的数量,或预期作业的数量,或其他任何东西。

慕妹3146593

作业完成的速度可能与发送它们的速度一样快。在这种情况下,即使有更多的项目要处理,WaitGroup 也将浮动在零附近。对此的一种解决方法是在发送作业之前添加一个,并在发送完所有作业后减少一个,有效地将发件人视为“作业”之一。在这种情况下,如果我们wg.Add在发件人中执行以下操作会更好:func (p *pipe) Process(in chan interface{}) (out chan interface{}) {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; out = make(chan interface{}, 100)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for i := 1; i <= 100; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go p.work(in, out, &wg)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(out)&nbsp; &nbsp; }()&nbsp; &nbsp; return}func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {&nbsp; &nbsp; for j := range jobs {&nbsp; &nbsp; &nbsp; &nbsp; func(j Job) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res := doSomethingWith(j)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- res&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }(j)&nbsp; &nbsp; }}我在代码中注意到的一件事是每个作业都会启动一个 goroutine。同时,每个作业都jobs在循环中处理通道,直到清空/关闭。两者似乎都没有必要。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go