猿问

WaitGroup.Wait() 超时

WaitGroup.Wait()分配超时的惯用方法是什么?

我想这样做的原因是为了保护我的“调度程序”免于永远等待错误的“工人”。这会导致一些哲学问题(即,一旦出现错误的工人,系统如何可靠地继续运行?),但我认为这超出了这个问题的范围。

我有一个我会提供的答案。现在我已经把它写下来了,它看起来并没有那么糟糕,但它仍然比它应该的更令人费解。我想知道是否有一些更简单、更惯用的方法,甚至是不使用 WaitGroups 的替代方法。

塔。


jeck猫
浏览 305回答 3
3回答

温温酱

大多数情况下,您在下面发布的解决方案都尽可能好。改进它的几个技巧:或者,您可以关闭通道以发出完成信号,而不是在其上发送值,关闭通道上的接收操作始终可以立即进行。最好使用defer语句来表示完成,即使函数突然终止,它也会执行。此外,如果只有一个“作业”要等待,您可以完全省略WaitGroup并在作业完成时发送一个值或关闭通道(与您在select语句中使用的通道相同)。指定持续1秒很简单,只要:timeout := time.Second。例如,指定 2 秒是:timeout := 2 * time.Second。您不需要转换,time.Second它已经是 type time.Duration,将它与一个无类型常量相乘2也会产生一个 type 值time.Duration。我还将创建一个包装此功能的助手/实用程序函数。请注意,WaitGroup必须作为指针传递,否则副本将不会收到WaitGroup.Done()调用的“通知” 。就像是:// waitTimeout waits for the waitgroup for the specified max timeout.// Returns true if waiting timed out.func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {&nbsp; &nbsp; c := make(chan struct{})&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer close(c)&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; }()&nbsp; &nbsp; select {&nbsp; &nbsp; case <-c:&nbsp; &nbsp; &nbsp; &nbsp; return false // completed normally&nbsp; &nbsp; case <-time.After(timeout):&nbsp; &nbsp; &nbsp; &nbsp; return true // timed out&nbsp; &nbsp; }}使用它:if waitTimeout(&wg, time.Second) {&nbsp; &nbsp; fmt.Println("Timed out waiting for wait group")} else {&nbsp; &nbsp; fmt.Println("Wait group finished")}在Go Playground上试一试。

慕桂英4014372

我是这样做的:http : //play.golang.org/p/eWv0fRlLECgo func() {&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; c <- struct{}{}}()timeout := time.Duration(1) * time.Secondfmt.Printf("Wait for waitgroup (up to %s)\n", timeout)select {case <-c:&nbsp; &nbsp; fmt.Printf("Wait group finished\n")case <-time.After(timeout):&nbsp; &nbsp; fmt.Printf("Timed out waiting for wait group\n")}fmt.Printf("Free at last\n")它工作正常,但这是最好的方法吗?

慕尼黑5688855

大多数现有答案都表明存在泄漏 goroutines。为WaitGroup.Wait分配超时的惯用方法是使用底层同步/原子包原语。我从@icza 答案中获取了代码并使用atomic包重写了它,并添加了上下文取消,因为这是通知超时的惯用方式。package mainimport (&nbsp; &nbsp; "context"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync/atomic"&nbsp; &nbsp; "time")func main() {&nbsp; &nbsp; var submitCount int32&nbsp; &nbsp; // run this instead of wg.Add(1)&nbsp; &nbsp; atomic.AddInt32(&submitCount, 1)&nbsp; &nbsp; // run this instead of wg.Done()&nbsp; &nbsp; // atomic.AddInt32(&submitCount, -1)&nbsp; &nbsp; timeout := time.Second&nbsp; &nbsp; ctx, cancel := context.WithTimeout(context.Background(), timeout)&nbsp; &nbsp; defer cancel()&nbsp; &nbsp; fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)&nbsp; &nbsp; waitWithCtx(ctx, &submitCount)&nbsp; &nbsp; fmt.Println("Free at last")}// waitWithCtx returns when passed counter drops to zero// or when context is cancelledfunc waitWithCtx(ctx context.Context, counter *int32) {&nbsp; &nbsp; ticker := time.NewTicker(10 * time.Millisecond)&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; case <-ticker.C:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if atomic.LoadInt32(counter) == 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Go
我要回答