猿问

Go - 如何知道输出通道何时完成

我尝试遵循 Rob Pike 在“并发不是并行”的演讲中的例子,并做了这样的事情:我正在启动许多 go 例程作为从输入通道读取的工作人员,执行一些处理,然后通过输出通道发送结果.

然后我开始另一个 go 例程,它从某个源读取数据并通过他们的输入通道将其发送给工作人员。最后,我想遍历输出通道中的所有结果并对其进行处理。问题是,由于工作在工作人员之间分配,我不知道所有工作人员何时完成,因此我可以停止向输出通道询问更多结果,并且我的程序可以正常结束。

了解工作人员何时完成将结果发送到输出通道的最佳做法是什么?


慕的地6264312
浏览 157回答 3
3回答

阿晨1998

var Z = "Z"func Loop() {&nbsp; &nbsp; sc := make(chan *string)&nbsp; &nbsp; ss := make([]string, 0)&nbsp; &nbsp; done := make(chan struct{}, 1)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; //1 QUERY&nbsp; &nbsp; &nbsp; &nbsp; slice1 := []string{"a", "b", "c"}&nbsp; &nbsp; &nbsp; &nbsp; //2 WG INIT&nbsp; &nbsp; &nbsp; &nbsp; var wg1 sync.WaitGroup&nbsp; &nbsp; &nbsp; &nbsp; wg1.Add(len(slice1))&nbsp; &nbsp; &nbsp; &nbsp; //3 LOOP->&nbsp; &nbsp; &nbsp; &nbsp; loopSlice1(slice1, sc, &wg1)&nbsp; &nbsp; &nbsp; &nbsp; //7 WG WAIT<-&nbsp; &nbsp; &nbsp; &nbsp; wg1.Wait()&nbsp; &nbsp; &nbsp; &nbsp; sc <- &Z&nbsp; &nbsp; &nbsp; &nbsp; done <- struct{}{}&nbsp; &nbsp; }()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; var cc *string&nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cc = <-sc&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Infof("<-sc %s", *cc)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if *cc == Z {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ss = append(ss, *cc)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; <-done&nbsp; &nbsp; log.Infof("FUN: %#v", ss)}func loopSlice1(slice1 []string, sc chan *string, wg1 *sync.WaitGroup) {&nbsp; &nbsp; for i, x := range slice1 {&nbsp; &nbsp; &nbsp; &nbsp; //4 GO&nbsp; &nbsp; &nbsp; &nbsp; go func(n int, v string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //5 WG DONE&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg1.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //6 DOING&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //[1 QUERY&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; slice2 := []string{"X", "Y", "Z"}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //[2 WG INIT&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; var wg2 sync.WaitGroup&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg2.Add(len(slice2))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //[3 LOOP ->&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; loopSlice2(n, v, slice2, sc, &wg2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //[7 WG WAIT <-&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg2.Wait()&nbsp; &nbsp; &nbsp; &nbsp; }(i, x)&nbsp; &nbsp; }}func loopSlice2(n1 int, v1 string, slice2 []string, sc chan *string, wg2 *sync.WaitGroup) {&nbsp; &nbsp; for j, y := range slice2 {&nbsp; &nbsp; &nbsp; &nbsp; //[4 GO&nbsp; &nbsp; &nbsp; &nbsp; go func(n2 int, v2 string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //[5 WG DONE&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg2.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //[6 DOING&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r := fmt.Sprintf("%v%v %v,%v", n1, n2, v1, v2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sc <- &r&nbsp; &nbsp; &nbsp; &nbsp; }(j, y)&nbsp; &nbsp; }}

不负相思意

请我首先澄清您的术语:对渠道末端的误解可能会导致以后出现问题。您询问“输出通道”和“输入通道”。哪有这回事; 只有渠道。每个通道都有两端:输出(写入)端和输入(读取)端。我会假设这就是你的意思。现在回答你的问题。以最简单的情况为例:您只有一个发送方 goroutine 写入通道,并且只有一个工作 goroutine 从另一端读取,并且通道的缓冲为零。发送方 goroutine 将在写入每个项目时阻塞,直到该项目被消耗。通常,这在第一次发生时很快。一旦第一个项目传递给工作人员,工作人员就会很忙,发件人必须等待才能传递第二项内容。因此,乒乓效应如下:作者或读者会忙,但不会两者都忙。在 Rob Pike 所描述的意义上,goroutine 将是并发的,但实际上并不总是并行执行。如果您有许多从通道读取的工作程序 goroutine(并且其输入端由所有人共享),则发送方最初可以将一个项目分发给每个工作程序,但随后必须等待它们工作(类似于上面描述的乒乓球案例)。最后,当所有项目都由发送方发送后,它的工作就完成了。然而,读者可能还没有完成他们的工作。有时我们关心发件人是否提前完成,有时我们不关心。知道何时发生这种情况最容易通过 WaitGroup 完成(参见Not_a_Golfer的回答和我对相关问题的回答)。还有一个稍微复杂一点的替代方法:您可以使用返回通道来完成信号传输,而不是使用WaitGroup. 这并不难做到,但WaitGroup在这种情况下是首选,更简单。相反,如果通道包含缓冲区,则发送方发送最后一项的时间点会更快发生。在通道每个 worker 有一个缓冲区空间的极限情况下;这将允许发件人非常快速地完成,然后,可能继续处理其他事情。(任何比这更多的缓冲都是浪费)。发送者的这种解耦允许完全异步的行为模式,深受使用其他技术堆栈(Node-JS 和 JVM 的人)的人的喜爱。与它们不同的是,Go不需要您这样做,但您可以选择。早在 90 年代初期,作为批量同步并行 (BSP) 策略工作的副作用,Leslie Valiant 证明有时非常简单的同步策略可能很便宜。关键因素是需要足够的并行松弛(也称为过度并行)来保持处理器内核忙碌。这意味着必须有足够多的其他工作要做,以便任何特定的 goroutine 被阻塞一段时间都无关紧要。奇怪的是,这可能意味着使用较少数量的 goroutine 可能比使用较大数量的 goroutine 需要更多的小心。了解过度并行的影响是有用的:如果整个网络具有过度并行,通常没有必要付出额外的努力来使所有内容异步,因为无论哪种方式,CPU 内核都会很忙。因此,虽然知道如何等待发件人完成很有用,但更大的应用程序可能不需要您以同样的方式关注。作为最后一个脚注,WaitGroup是BSP 中使用的意义上的障碍。通过结合障碍和渠道,您可以同时使用 BSP 和 CSP。

aluckdog

我个人喜欢为此使用 a sync.WaitGroup。等待组是一个同步计数器,它具有三种方法 - Wait()、Done()和Add()。您要做的是增加等待组的计数器,将其传递给工作人员,并让Done()他们在完成后调用。然后,您只需阻塞另一端的等待组并在它们全部完成后关闭输出通道,从而导致输出处理器退出。基本上:// create the wait groupwg := sync.WaitGroup{}// this is the output channeloutchan := make(chan whatever)// start the workersfor i := 0; i < N; i++ {&nbsp; &nbsp;wg.Add(1) //we increment by one the waitgroup's count&nbsp; &nbsp;//the worker pushes data onto the output channel and calls wg.Done() when done&nbsp; &nbsp;go work(&wg, outchan)}// this is our "waiter" - it blocks until all workers are done and closes the channelgo func() {&nbsp; wg.Wait()&nbsp; close(outchan)}()//this loop will exit automatically when outchan is closedfor item := range outchan {&nbsp; &nbsp;workWithIt(item)}// TADA!
随时随地看视频慕课网APP

相关分类

Go
我要回答