并发处理程序正在阻塞

我们发现一个工作不正常。在处理程序中,我们将过滤即将到来的消息,然后将有效事件传递给一个 func 进行处理。该功能实现如下:mqtt.MessageHandler


func processEvent(i models.Foo) (string, error) {

    var wg sync.WaitGroup

    quit := make(chan bool)

    errc := make(chan error)

    done := make(chan error)


    err := func1()

    if err != nil {

        return err

    }


    switch strings.ToUpper(i.Status) {

    case "OK":

        wg.Add(1)

        go func() {

            defer wg.Done()

            err = longTimeTask1()

            ch := done

            if err != nil {

                log.Error("%s", err.Error())

                ch = errc

            }

            select {

            case ch <- err:

                return

            case <-quit:

                return

            }

        }()


        wg.Add(1)

        go func() {

            defer wg.Done()

            err = longTimeTask2()

            ch := done

            if err != nil {

                ch = errc

            }

            select {

            case ch <- err:

                return

            case <-quit:

                return

            }

        }()


        result := "processed"

        count := 0

        for {

            select {

            case err := <-errc:

                close(quit)

                log.Info("event: %s, %s", "", err.Error())

                return "", err

            case <-done:

                count++

                if count == 4 { // why 4???

                    return result, nil

                }

            }

        }


        wg.Wait()


        if err != nil {

            log.Info("event: %s, %s", result, err.Error())

            return result, err

        }

        close(quit)

        close(errc)

        close(done)

        return result, nil

    default:

        return "", nil

    }


    return "", nil

}

我明白了,它正试图同步和长时间任务2()。但对我来说,理解起来相当复杂。计数和计数 == 4 的目的是什么?为什么在最后收盘?代码提示无法访问 。在此之前,这个功能运行良好。但最近或可能返回一些错误,这会破坏代码,这个fuc似乎被完全阻止了。你能帮我理解代码,找到潜在的问题并重构这部分吗?longTimeTask1()wg.Wait()longTimeTask1()longTimeTask2()


阿波罗的战车
浏览 53回答 2
2回答

茅侃侃

查看 ,代码似乎期望从通道接收四条消息。但是,此代码最多可以从两个 goroutine 生成两个这样的消息,因此这是一个错误。countdone此外,如果任何一个 goroutine 返回错误,它就不会写入通道,所以这是另一个错误。done另一种写法可能是:...result := "processed"for {&nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp;case err := <-errc:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(quit) // Tell the goroutines to terminate&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Info("event: %s, %s", "", err.Error())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Wait() // Wait for them to finish&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return "", err&nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp;case <-done:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if count == 2 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return result, nil&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;}

肥皂起泡泡

这正是 errgroup 包设计用于的分叉和联接并发类型:func processEvent(ctx context.Context, i models.Foo) (string, error) {&nbsp; &nbsp; err := func1()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return "", err&nbsp; &nbsp; }&nbsp; &nbsp; g, ctx := errgroup.WithContext(ctx)&nbsp; &nbsp; if strings.ToUpper(i.Status) != "OK" {&nbsp; &nbsp; &nbsp; &nbsp; return "", nil&nbsp; &nbsp; }&nbsp; &nbsp; g.Go(func() error { return longTimeTask1(ctx) })&nbsp; &nbsp; g.Go(func() error { return longTimeTask2(ctx) })&nbsp; &nbsp; if err := g.Wait(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Printf("event: %v", err)&nbsp; &nbsp; &nbsp; &nbsp; return "", err&nbsp; &nbsp; }&nbsp; &nbsp; return "processed", nil}(https://play.golang.org/p/JNMKftQTLGs)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go