我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。您会看到,管道的工作是从输入通道接收数据,对其进行处理,然后将结果输出到通道上。这是它的预期行为:
从输入通道接收数据。
将数据委托给可用的工作人员。
工作人员将结果发送到输出通道。
所有工作人员完成后关闭输出通道。
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
defer wg.Done()
wg.Add(1)
res := doSomethingWith(j)
out <- res
}(j)
}
}
但是,运行它可能会在不处理所有输入的情况下退出,或者会出现一条send on closed channel消息恐慌。使用标志构建源会在和-race之间发出数据竞争警告。close(out)out <- res
以下是我认为可能发生的情况。一旦许多工人完成了他们的工作,就会有一瞬间wg的计数器达到零。因此,wg.Wait()完成并且程序继续进行close(out)。同时,job 通道还没有完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于out通道已经关闭,因此会导致恐慌。
等待组是否应该放在其他地方?还是有更好的方法来等待所有工人完成?
慕尼黑8549860
慕妹3146593
相关分类