固定工人数量模式的竞争条件

我正在玩一些用于学习目的的代码,并且在使用标志时,我得到了一个关于其执行的竞争条件,我想了解原因。该代码启动一组固定的 goroutine,这些 goroutine 充当从通道消耗任务的工作线程,没有固定数量的任务,只要通道接收到工作线程必须继续工作的任务。-race


我在调用函数时遇到争用条件。根据我的理解(看看数据竞争报告),当第一个调用由一个生成的goroutine和主要例程调用同时执行时,就会发生竞争条件。这是对的吗?如果是这样,这意味着我必须始终在主例程上执行对Add的调用,以避免资源上的这种竞争?但是,这也意味着我需要知道工人需要提前处理多少任务,如果我需要代码处理一旦工人运行可能遇到的任何数量的任务,那么哪种任务很糟糕......WaitGroupwg.Addwg.Wait


代码:


func Test(t *testing.T) {

    t.Run("", func(t *testing.T) {

        var wg sync.WaitGroup

        queuedTaskC := make(chan func())

        for i := 0; i < 5; i++ {

            wID := i + 1

            go func(workerID int) {

                for task := range queuedTaskC {

                    wg.Add(1)

                    task()

                }

            }(wID)

        }


        taskFn := func() {

            fmt.Println("executing task...")

            wg.Done()

        }

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn

        queuedTaskC <- taskFn


        wg.Wait()

        close(queuedTaskC)


        fmt.Println(len(queuedTaskC))

    })

}

报告:


==================

WARNING: DATA RACE

Read at 0x00c0001280d8 by goroutine 11:

  internal/race.Read()

      /src/internal/race/race.go:37 +0x206

  sync.(*WaitGroup).Add()

      /src/sync/waitgroup.go:71 +0x219

  workerpool.Test.func1.1()

      /workerpool/workerpool_test.go:36 +0x64


Previous write at 0x00c0001280d8 by goroutine 8:

  internal/race.Write()

      /src/internal/race/race.go:41 +0x125

  sync.(*WaitGroup).Wait()

      /src/sync/waitgroup.go:128 +0x126

  workerpool.Test.func1()

      /workerpool/workerpool_test.go:57 +0x292

  testing.tRunner()

      /src/testing/testing.go:1123 +0x202


Goroutine 11 (running) created at:

  workerpool.Test.func1()

      /workerpool/workerpool_test.go:34 +0xe4

  testing.tRunner()

      /src/testing/testing.go:1123 +0x202


阿波罗的战车
浏览 74回答 1
1回答

繁华开满天机

WaitGroup实现基于内部计数器,该计数器由 和 方法更改。在计数器归零之前,该方法不会返回。也可以重复使用,但在文档中描述的某些条件下:AddDoneWaitWaitGroup// If a WaitGroup is reused to wait for several independent sets of events,// new Add calls must happen after all previous Wait calls have returned.尽管您的代码没有重用,但它能够多次将计数器清零。当在给定时间没有处理任何任务时,就会发生这种情况,这在并发代码中是完全可能的。由于您的代码在调用之前不会等待返回,因此您会收到争用条件错误。wgWaitGroupWaitAdd正如每个人在评论中建议的那样,您应该放弃跟踪任务的想法,转而控制正在运行的戈鲁丁。附加代码建议。WaitGroupfunc Test(t *testing.T) {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; queuedTaskC := make(chan func(), 10)&nbsp; &nbsp; for i := 0; i < 5; i++ {&nbsp; &nbsp; &nbsp; &nbsp; wID := i + 1&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(workerID int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for task := range queuedTaskC {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; task()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(wID)&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; queuedTaskC <- func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("executing task...")&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; close(queuedTaskC)&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Println(len(queuedTaskC))}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go