我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。您会看到,管道的工作是从输入通道接收数据,对其进行处理,然后将结果输出到通道上。这是它的预期行为:
从输入通道接收数据。
将数据委托给可用的工作人员。
Worker 将结果发送到输出通道。
所有工作人员完成后关闭输出通道。
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
defer wg.Done()
wg.Add(1)
res := doSomethingWith(j)
out <- res
}(j)
}
}
但是,运行它可能会退出而不处理所有输入,或者出现错误并显示send on closed channel消息。使用该标志构建源会在和-race之间发出数据争用警告。close(out)out <- res
我认为可能会发生以下情况。一旦一些工人完成了工作,wg计数器就会瞬间归零。因此,wg.Wait()完成并且程序继续进行close(out)。与此同时,作业通道尚未完成数据生成,这意味着一些工作人员仍在另一个 goroutine 中运行。由于out通道已经关闭,因此会导致恐慌。
等待组应该放在其他地方吗?或者有没有更好的方法来等待所有工人完成?
撒科打诨
一只萌萌小番薯
相关分类