等待多个回调,超时,无需忙于等待或轮询

在 Go 中,我有两个最终不会触发的回调。


registerCb(func() {...})

registerCb(func() {...})

/* Wait for both func to execute with timeout */

我想等待它们,但如果其中一个没有执行,则会超时。


同步。WaitGroup 不起作用,因为它是阻塞的,而不是基于通道的。此外,您调用 WaitGroup.Done() 而不会在回调之外出现恐慌的风险。


我目前的解决方案是只使用两个布尔值和一个繁忙的等待循环。但这并不令人满意。是否有任何不使用轮询或忙碌等待的惯用方法?


更新:


下面是一些代码,演示了一个繁忙的等待解决方案,但应该在两个回调都触发后立即返回,或者在超时之后返回,而不使用轮询


package main


import (

    "fmt"

    "log"

    "sync"

    "time"

)



var cbOne func()

var cbTwo func()


func registerCbOne(cb func()) {

    cbOne = cb

}


func registerCbTwo(cb func()) {

    cbTwo = cb

}


func executeCallbacks() {

    <-time.After(1 * time.Second)

    cbOne()


    // Might never happen

    //<-time.After(1 * time.Second)

    //cbTwo()

}


func main() {


    // Some process in background will execute our callbacks

    go func() {

        executeCallbacks()

    }()


    err := WaitAllOrTimeout(3 * time.Second)

    if err != nil {

        fmt.Println("Error: ", err.Error())

    }



    fmt.Println("Hello, playground")

}


func WaitAllOrTimeout(to time.Duration) error {


    cbOneDoneCh := make(chan bool, 1)

    cbTwoDoneCh := make(chan bool, 1)

    cbOneDone := false

    cbTwoDone := false

    


    registerCbOne(func() {

        fmt.Println("cb One");

        cbOneDoneCh <- true

    })


    registerCbTwo(func() {

        fmt.Println("cb Two");

        cbTwoDoneCh <- true

    })


    // Wait for cbOne and cbTwo to be executed or a timeout

    

    // Busywait solution

    for {

        select {

             case <-time.After(to):

                 if cbOneDone && cbTwoDone {

                     fmt.Println("Both CB executed (we could poll more often)")

                     return nil

                 }

                 fmt.Println("Timeout!")

                 return fmt.Errorf("Timeout")

             case <-cbOneDoneCh:

                 cbOneDone = true

             case <-cbTwoDoneCh:

                 cbTwoDone = true

        }

    }

}


阿晨1998
浏览 127回答 3
3回答

饮歌长啸

这是我的评论的后续内容,在您添加示例解决方案后添加。为了比我在注释中更清晰,您的示例代码实际上并没有那么糟糕。以下是您的原始示例:// Busywait solutionfor {&nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;case <-time.After(to):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;if cbOneDone && cbTwoDone {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;fmt.Println("Both CB executed (we could poll more often)")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return nil&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;fmt.Println("Timeout!")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return fmt.Errorf("Timeout")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;case <-cbOneDoneCh:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;cbOneDone = true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;case <-cbTwoDoneCh:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;cbTwoDone = true&nbsp; &nbsp; }}这不是一个“繁忙的等待”,但它确实有几个错误(包括你需要一个只为已完成的通道发送语义的事实,或者可能更容易,至少同样好,在完成时只关闭它们一次,也许使用)。我们想做的是:sync.Once启动计时器 作为超时。to使用计时器的通道和两个“完成”通道输入选择循环。我们希望在发生以下第一个事件时退出&nbsp;select 循环:计时器触发,或双“完成”通道已发出信号。如果我们要转到两个已完成的通道,我们也希望清除变量(设置为 ),以便选择不会旋转 - 这将变成真正的忙碌等待 - 但目前让我们假设我们在回调时只发送一次,否则只会泄漏通道, 以便我们可以按照编写的方式使用您的代码,因为这些选择只会返回一次。以下是更新后的代码:closeChnilt := timer.NewTimer(to)for !cbOneDone || !cbTwoDone {&nbsp; &nbsp; select {&nbsp; &nbsp; case <-t.C:&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Timeout!")&nbsp; &nbsp; &nbsp; &nbsp; return fmt.Errorf("timeout")&nbsp; &nbsp; }&nbsp; &nbsp; case <-cbOneDoneCh:&nbsp; &nbsp; &nbsp; &nbsp; cbOneDone = true&nbsp; &nbsp; case <-cbTwoDoneCh:&nbsp; &nbsp; &nbsp; &nbsp; cbTwoDone = true&nbsp; &nbsp; }}// insert t.Stop() and receive here to drain t.C if desiredfmt.Println("Both CB executed")return nil请注意,我们最多将经历两次循环:如果我们从两个&nbsp;Done 通道接收到每个通道,则循环停止而不会超时。没有旋转/忙碌等待:我们从未收到过任何东西。我们返回零(无错误)。t.C如果我们从一个 Done 通道接收,循环将恢复,但会阻塞等待计时器或另一个 Done 通道。如果我们从 接收到 ,则表示我们尚未收到两个回调。我们可能有一个,但有一个暂停,我们选择放弃,这是我们的目标。我们返回一个错误,而不通过循环返回。t.C一个真正的版本需要更多的工作来正确清理并避免泄漏“完成”通道(以及计时器通道及其goroutine;参见评论),但这是一般的想法。您已经将回调转换为通道操作,并且已经具有其通道的计时器。

撒科打诨

下面的代码有两个变体,第一个是常规模式,没有什么花哨的,它做了工作,做得很好。您将回调启动到例程中,使它们推送到接收器,收听该接收器以获取结果或超时。注意接收器通道的初始容量,为了防止泄漏例程,它必须与回调次数匹配。第二个工厂将同步机制分解成小函数进行组装,提供两种等待方法,waitAll 和 waitOne。写起来不错,但效率肯定更低,分配更多,渠道更多来回,推理更复杂,更微妙。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")func main() {&nbsp; &nbsp; ExampleOne()&nbsp; &nbsp; ExampleTwo()&nbsp; &nbsp; ExampleThree()&nbsp; &nbsp; fmt.Println("Hello, playground")}func ExampleOne() {&nbsp; &nbsp; log.Println("start reg")&nbsp; &nbsp; errs := make(chan error, 2)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; fn := callbackWithOpts("reg: so slow", 2*time.Second, nil)&nbsp; &nbsp; &nbsp; &nbsp; errs <- fn()&nbsp; &nbsp; }()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; fn := callbackWithOpts("reg: too fast", time.Millisecond, fmt.Errorf("broke!"))&nbsp; &nbsp; &nbsp; &nbsp; errs <- fn()&nbsp; &nbsp; }()&nbsp; &nbsp; select {&nbsp; &nbsp; case err := <-errs: // capture only one result,&nbsp; &nbsp; &nbsp; &nbsp; // the fastest to finish.&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println(err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; case <-time.After(time.Second): // or wait that many amount of time,&nbsp; &nbsp; &nbsp; &nbsp; // in case they are all so slow.&nbsp; &nbsp; }&nbsp; &nbsp; log.Println("done reg")}func ExampleTwo() {&nbsp; &nbsp; log.Println("start wait")&nbsp; &nbsp; errs := waitAll(&nbsp; &nbsp; &nbsp; &nbsp; withTimeout(time.Second,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; callbackWithOpts("waitAll: so slow", 2*time.Second, nil),&nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; withTimeout(time.Second,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; callbackWithOpts("waitAll: too fast", time.Millisecond, nil),&nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; )&nbsp; &nbsp; for err := range trim(errs) {&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println(err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; log.Println("done wait")}func ExampleThree() {&nbsp; &nbsp; log.Println("start waitOne")&nbsp; &nbsp; errs := waitOne(&nbsp; &nbsp; &nbsp; &nbsp; withTimeout(time.Second,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; callbackWithOpts("waitOne: so slow", 2*time.Second, nil),&nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; &nbsp; &nbsp; withTimeout(time.Second,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; callbackWithOpts("waitOne: too fast", time.Millisecond, nil),&nbsp; &nbsp; &nbsp; &nbsp; ),&nbsp; &nbsp; )&nbsp; &nbsp; for err := range trim(errs) {&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println(err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; log.Println("done waitOne")}// a configurable callback for playingfunc callbackWithOpts(msg string, tout time.Duration, err error) func() error {&nbsp; &nbsp; return func() error {&nbsp; &nbsp; &nbsp; &nbsp; <-time.After(tout)&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(msg)&nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; }}// withTimeout return a function that returns first error or times out and return nilfunc withTimeout(tout time.Duration, h func() error) func() error {&nbsp; &nbsp; return func() error {&nbsp; &nbsp; &nbsp; &nbsp; d := make(chan error, 1)&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; d <- h()&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case err := <-d:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; case <-time.After(tout):&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; }}// wait launches all func() and return their errors into the returned error channel; (merge)// It is the caller responsability to drain the output error channel.func waitAll(h ...func() error) chan error {&nbsp; &nbsp; d := make(chan error, len(h))&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; for i := 0; i < len(h); i++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(h func() error) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; d <- h()&nbsp; &nbsp; &nbsp; &nbsp; }(h[i])&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(d)&nbsp; &nbsp; }()&nbsp; &nbsp; return d}// wait launches all func() and return the first error into the returned error channel// It is the caller responsability to drain the output error channel.func waitOne(h ...func() error) chan error {&nbsp; &nbsp; d := make(chan error, len(h))&nbsp; &nbsp; one := make(chan error, 1)&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; for i := 0; i < len(h); i++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(h func() error) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; d <- h()&nbsp; &nbsp; &nbsp; &nbsp; }(h[i])&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for err := range d {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; one <- err&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(one)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(d)&nbsp; &nbsp; }()&nbsp; &nbsp; return one}func trim(err chan error) chan error {&nbsp; &nbsp; out := make(chan error)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for e := range err {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- e&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(out)&nbsp; &nbsp; }()&nbsp; &nbsp; return out}

慕斯王

func wait(ctx context.Context, wg *sync.WaitGroup) error {&nbsp; &nbsp; done := make(chan struct{}, 1)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; done <- struct{}{}&nbsp; &nbsp; }()&nbsp; &nbsp; select {&nbsp; &nbsp; case <-done:&nbsp; &nbsp; &nbsp; &nbsp; // Counter is 0, so all callbacks completed.&nbsp; &nbsp; &nbsp; &nbsp; return nil&nbsp; &nbsp; case <-ctx.Done():&nbsp; &nbsp; &nbsp; &nbsp; // Context cancelled.&nbsp; &nbsp; &nbsp; &nbsp; return ctx.Err()&nbsp; &nbsp; }}或者,您可以传递 a 和 块而不是 on ,但我认为使用上下文更习惯用语。time.Duration<-time.After(d)<-ctx.Done()
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go