如何协调多个 goroutine 的关闭

说我有一个功能


type Foo struct {}


func (a *Foo) Bar() {

    // some expensive work - does some calls to redis

}

它在我的应用程序的某个时刻在 goroutine 中执行。许多这些可能在任何给定点执行。在应用程序终止之前,我想确保所有剩余的 goroutines 都已完成它们的工作。


我可以做这样的事情吗:


type Foo struct {

    wg sync.WaitGroup

}


func (a *Foo) Close() {

    a.wg.Wait()

}


func (a *Foo) Bar() {

    a.wg.Add(1)

    defer a.wg.Done()


    // some expensive work - does some calls to redis

}

假设 Bar 在 goroutine 中执行,其中许多可能在给定时间运行,并且一旦调用 Close 并且在 sigterm 或 sigint 上调用 Close 时不应调用 Bar。


这有意义吗?


通常我会看到 Bar 函数如下所示:


func (a *Foo) Bar() {

    a.wg.Add(1)


    go func() {

        defer a.wg.Done()

        // some expensive work - does some calls to redis

    }()

}


米脂
浏览 153回答 4
4回答

白衣染霜花

是的,WaitGroup是正确的答案。根据doc ,您可以随时使用WaitGroup.Add计数器大于零。请注意,当计数器为零时发生的具有正增量的调用必须发生在等待之前。具有负增量的调用或在计数器大于零时开始的具有正增量的调用可能随时发生。通常这意味着对 Add 的调用应该在创建 goroutine 或其他要等待的事件的语句之前执行。如果重复使用 WaitGroup 来等待多个独立的事件集,则必须在所有先前的 Wait 调用返回后发生新的 Add 调用。请参阅 WaitGroup 示例。Close但是一个技巧是,在调用之前,您应该始终保持计数器大于零。这通常意味着您应该调用wg.Addin NewFoo(或类似的东西)并wg.Donein Close. 并且为了防止多次调用Done破坏等待组,你应该包装Close成sync.Once. 您可能还想防止Bar()调用 new。

杨魅力

我认为无限期地等待所有 go 例程完成不是正确的方法。如果其中一个 go routines 被阻塞或说它由于某种原因挂起并且从未成功终止,应该发生什么情况 kill 进程或等待 go routines 完成?相反,无论所有例程是否已完成,您都应该等待一段时间并终止应用程序。上下文包可用于向所有 go 例程发送信号以处理 kill 信号。appCtx, cancel := context.WithCancel(context.Background())这里 appCtx 必须传递给所有的 go 例程。在退出信号调用cancel()。作为 go 例程运行的函数可以处理如何处理取消上下文。

慕容森

WaitGroup是一种方式,但是,Go 团队errgroup完全针对您的用例引入了。leaf bebop 的回答中最不方便的部分是忽视错误处理。错误处理是存在的原因errgroup。惯用的 go 代码不应该吞下错误。但是,保留结构的签名Foo(装饰性的除外workerNumber)——并且没有错误处理——我的建议如下所示:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "math/rand"&nbsp; &nbsp; "time"&nbsp; &nbsp; "golang.org/x/sync/errgroup")type Foo struct {&nbsp; &nbsp; errg errgroup.Group}func NewFoo() *Foo {&nbsp; &nbsp; foo := &Foo{&nbsp; &nbsp; &nbsp; &nbsp; errg: errgroup.Group{},&nbsp; &nbsp; }&nbsp; &nbsp; return foo}func (a *Foo) Bar(workerNumber int) {&nbsp; &nbsp; a.errg.Go(func() error {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; // simulates the long running clals&nbsp; &nbsp; &nbsp; &nbsp; case <-time.After(time.Second * time.Duration(rand.Intn(10))):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(fmt.Sprintf("worker %d completed its work", workerNumber))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; })}func (a *Foo) Close() {&nbsp; &nbsp; a.errg.Wait()}func main() {&nbsp; &nbsp; foo := NewFoo()&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; foo.Bar(i)&nbsp; &nbsp; }&nbsp; &nbsp; <-time.After(time.Second * 5)&nbsp; &nbsp; fmt.Println("Waiting for workers to complete...")&nbsp; &nbsp; foo.Close()&nbsp; &nbsp; fmt.Println("Done.")}这里的好处是,如果你在你的代码中引入错误处理(你应该),你只需要稍微修改这段代码:简而言之,将返回errg.Wait()第一个 redis 错误,并且Close()可以通过堆栈向上传播它(到 main,在这种情况下)。也可以使用该context.Context包,如果调用失败,您还可以立即取消任何正在运行的 redis 调用。文档中有这方面的示例errgroup。

largeQ

我经常使用的模式是:https ://play.golang.org/p/ibMz36TS62zpackage mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")type response struct {&nbsp; &nbsp; message string}func task(i int, done chan response) {&nbsp; &nbsp; time.Sleep(1 * time.Second)&nbsp; &nbsp; done <- response{fmt.Sprintf("%d done", i)}}func main() {&nbsp; &nbsp; responses := GetResponses(10)&nbsp; &nbsp; fmt.Println("all done", len(responses))}func GetResponses(n int) []response {&nbsp; &nbsp; donequeue := make(chan response)&nbsp; &nbsp; wg := sync.WaitGroup{}&nbsp; &nbsp; for i := 0; i < n; i++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(value int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; task(value, donequeue)&nbsp; &nbsp; &nbsp; &nbsp; }(i)&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(donequeue)&nbsp; &nbsp; }()&nbsp; &nbsp; responses := []response{}&nbsp; &nbsp; for result := range donequeue {&nbsp; &nbsp; &nbsp; &nbsp; responses = append(responses, result)&nbsp; &nbsp; }&nbsp; &nbsp; return responses}这也使得节流变得容易:https ://play.golang.org/p/a4MKwJKj634package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")type response struct {&nbsp; &nbsp; message string}func task(i int, done chan response) {&nbsp; &nbsp; time.Sleep(1 * time.Second)&nbsp; &nbsp; done <- response{fmt.Sprintf("%d done", i)}}func main() {&nbsp; &nbsp; responses := GetResponses(10, 2)&nbsp; &nbsp; fmt.Println("all done", len(responses))}func GetResponses(n, concurrent int) []response {&nbsp; &nbsp; throttle := make(chan int, concurrent)&nbsp; &nbsp; for i := 0; i < concurrent; i++ {&nbsp; &nbsp; &nbsp; &nbsp; throttle <- i&nbsp; &nbsp; }&nbsp; &nbsp; donequeue := make(chan response)&nbsp; &nbsp; wg := sync.WaitGroup{}&nbsp; &nbsp; for i := 0; i < n; i++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; <-throttle&nbsp; &nbsp; &nbsp; &nbsp; go func(value int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throttle <- 1&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; task(value, donequeue)&nbsp; &nbsp; &nbsp; &nbsp; }(i)&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(donequeue)&nbsp; &nbsp; }()&nbsp; &nbsp; responses := []response{}&nbsp; &nbsp; for result := range donequeue {&nbsp; &nbsp; &nbsp; &nbsp; responses = append(responses, result)&nbsp; &nbsp; }&nbsp; &nbsp; return responses}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go