golang中的并行算法对向量中的元素求和

我正在 Golang 中实施一些并行算法作为练习。现在我正在尝试对向量中的所有元素求和,但为此,我需要一个障碍。我四处搜索,但找不到任何可以帮助我的东西。


这是我的代码的样子:


package main


import (

    "fmt"

    "math"

    "sync"

)


func main() {

    var wg sync.WaitGroup


    sumWorkerFunc := func(k int, a []int, br *sync.WaitGroup) {

        bound := int(math.Ceil(math.Log2(float64(k))))

        for i := 1; i < bound; i++ {

            if k%int(math.Pow(2, float64(i))) == 0 {

                a[k] = a[k-int(math.Pow(2, float64(i-1)))] + a[k]

            }


            /* barrier here */

        }


        wg.Done()

    }


    a := []int{0, 1, 2, 3, 4, 5, 6, 7}


    fmt.Println("Before:")

    fmt.Println(a)


    workers := 8

    wg.Add(workers)


    for k := 0; k < workers; k++ {

        go sumWorkerFunc(k, a, br)

    }

    wg.Wait()


    fmt.Println("After:")

    fmt.Println(a)

}

在开始下一次迭代之前,我需要等待所有工作人员完成,因为他们需要下一次迭代的结果。这就是我试图做的:


package main


import (

    "fmt"

    "math"

    "sync"

)


func main() {

    var wg sync.WaitGroup


    sumWorkerFunc := func(k int, a []int, br *sync.WaitGroup) {

        bound := int(math.Ceil(math.Log2(float64(k))))

        for i := 1; i < bound; i++ {

            if k%int(math.Pow(2, float64(i))) == 0 {

                a[k] = a[k-int(math.Pow(2, float64(i-1)))] + a[k]

            }


            br.Done()

            br.Wait() // this should not be here

            br.Add(1)

        }


        wg.Done()

    }


    a := []int{0, 1, 2, 3, 4, 5, 6, 7}


    fmt.Println("Before:")

    fmt.Println(a)


    workers := 8

    wg.Add(workers)


    var barrier sync.WaitGroup

    barrier.Add(workers)


    for k := 0; k < workers; k++ {

        go sumWorkerFunc(k, a, &barrier)

    }

    wg.Wait()


    fmt.Println("After:")

    fmt.Println(a)

}

但是我不能在那里放置一个 Wait() 因为它会被所有的工人调用。在那里实施障碍的正确方法是什么?我开始认为这个问题可能更多地针对可能不适合 Golang 的共享内存模型。


谢谢!


至尊宝的传说
浏览 149回答 2
2回答

茅侃侃

您需要的不仅仅是一个 WaitGroup 来协调此例程。看看这个图案:func main() {&nbsp; &nbsp; const size = 100&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; wg sync.WaitGroup&nbsp; &nbsp; &nbsp; &nbsp; a&nbsp; [size]int&nbsp; &nbsp; )&nbsp; &nbsp; // Fill array a with all ones&nbsp; &nbsp; for i := 0; i < size; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func(x int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; a[x] = 1&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }(i)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Println(a)}每个工作人员在开始工作之前将自己添加到 WaitGroup 中,然后在完成后将自己删除。 你看到问题了吗?尝试自己运行几次并查看输出。wg.Wait()仅当您在调用所有预期的方法之后wg.Add(x)调用它时才有效。由于wg.Add(1)它们在 goroutine 中,没有任何其他同步,我们不确定到 时添加了多少mainworker wg.Wait()。例如,有可能在 100 名工人中,有 50 人都打电话了wg.Add(1),然后wg.Done()在剩下的 50 人做任何事情之前。因此,wg.Wait()在 50 名工人仍未完成时继续进行。这一切就是说wg.Add(x)必须同步!var barrier sync.WaitGroupbarrier.Add(workers)for k := 0; k < workers; k++ {&nbsp; &nbsp; go sumWorkerFunc(k, a, &barrier)}在这种情况下,Add是同步的,因为它发生在工作人员开始执行之前。br.Done()br.Wait()br.Add(1)让你所有的工人都打电话不一定有问题Wait,问题Add是不同步。您不能在此处使用 WaitGroup 来同步自身。您需要一些额外的功能(另一个 WaitGroup、一个锁、一个通道等)来在轮次之间创建跨工作线程的同步。这是我能想到的最简单的解决方案,它使每一轮都有一个 WaitGroup:func main() {&nbsp; &nbsp; const (&nbsp; &nbsp; &nbsp; &nbsp; workers = 3&nbsp; &nbsp; &nbsp; &nbsp; rounds&nbsp; = 5&nbsp; &nbsp; )&nbsp; &nbsp; work := func(i int, roundWgs []sync.WaitGroup) {&nbsp; &nbsp; &nbsp; &nbsp; for r := 0; r < rounds; r++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // This is the "work" we do each round&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("round: %v, worker %v\n", r, i)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // We are finished the current round, and will wait for the group.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; roundWgs[r].Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; roundWgs[r].Wait()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; // Each round of work has it's own WaitGroup, in which each worker must finish.&nbsp; &nbsp; var roundWgs = make([]sync.WaitGroup, rounds)&nbsp; &nbsp; for i := 0; i < rounds; i++ {&nbsp; &nbsp; &nbsp; &nbsp; roundWgs[i].Add(workers)&nbsp; &nbsp; }&nbsp; &nbsp; // wg is our outermost WaitGroup, which waits until all work is done.&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(workers)&nbsp; &nbsp; for i := 0; i < workers; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func(j int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; work(j, roundWgs)&nbsp; &nbsp; &nbsp; &nbsp; }(i)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()}输出:round: 0, worker 2round: 0, worker 0round: 0, worker 1round: 1, worker 1round: 1, worker 0round: 1, worker 2round: 2, worker 2round: 2, worker 1round: 2, worker 0round: 3, worker 0round: 3, worker 2round: 3, worker 1round: 4, worker 1round: 4, worker 2round: 4, worker 0同样,Adds 是同步的,因为它们都发生在任何 worker 开始之前。

一只甜甜圈

我认为你正在尝试做的更像是一个共享内存场景。最好不要延迟等待,而是将其放在启动 goroutine 的循环之后。并且您应该使用互斥锁来确保互斥。而且我认为这个问题的顺序无关紧要。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go