Goroutines 节流示例

我正在通过 Udemy 课程学习基本的围棋。在 goroutines 部分,有一个节流的例子,它让我理解了等待组是如何工作的。


package main


import (

    "fmt"

    "math/rand"

    "sync"

    "time"

)


func main() {

    c1 := make(chan int)

    c2 := make(chan int)


    go populate(c1)


    go fanOutIn(c1, c2)


    for v := range c2 {

        fmt.Println(v)

    }


    fmt.Println("about to exit")

}


func populate(c chan int) {

    for i := 0; i < 100; i++ {

        c <- i

    }

    close(c)

}


func fanOutIn(c1, c2 chan int) {

    var wg sync.WaitGroup

    const goroutines = 10

    wg.Add(goroutines) 

    for i := 0; i < goroutines; i++ {

        go func() {

            for v := range c1 {

                func(v2 int) {

                    c2 <- timeConsumingWork(v2)

                }(v)

            }

            wg.Done()

        }()

    }

    wg.Wait()

    close(c2)

}


func timeConsumingWork(n int) int {

    time.Sleep(time.Microsecond * time.Duration(rand.Intn(500)))

    return n + rand.Intn(1000)

}

不符合我理解的部分是在fanOutIn我们设置WaitGroup, 和Add(10).


为什么我打印出 100 个值?只能将单个值 ( i := 0) 放入c1,并且该值永远不会从通道中显式删除。然后代码命中wg.Done(),等待组队列减少到 9,依此类推。


根据我目前的理解,我希望看到 10 个值0 + rand.Intn(1000)。


慕慕森
浏览 129回答 1
1回答

一只甜甜圈

分拆出来的函数如下所示(包括go前面的 和调用它的括号):go func() {&nbsp; &nbsp; for v := range c1 {&nbsp; &nbsp; &nbsp; &nbsp; func(v2 int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; c2 <- timeConsumingWork(v2)&nbsp; &nbsp; &nbsp; &nbsp; }(v)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Done()}()这段代码有点奇怪和离奇。让我们进一步缩小它,丢弃wg.Done并只保留for循环本身:for v := range c1 {&nbsp; &nbsp; func(v2 int) {&nbsp; &nbsp; &nbsp; &nbsp; c2 <- timeConsumingWork(v2)&nbsp; &nbsp; }(v)}有一个内部未命名的函数在这里非常没用;我们可以在不改变程序行为的情况下丢弃它,得到:for v := range c1 {&nbsp; &nbsp; c2 <- timeConsumingWork(v)}这最后是一个简单的循环。现在的一个关键问题是:您期望从这个循环中进行多少次迭代? 注意:它不一定是任何常数。或许更好的方式来表达这个问题是:这个循环什么时候结束?for循环读取一个通道。这种循环在从通道读取指示没有更多数据时结束,即通道已关闭且其队列为空。(请参阅循环的 Go 规范部分for。)所以这个最里面的循环,for v := range c1直到通道c1关闭并且队列中没有更多数据时才会终止。该频道是使用以下内容创建的:c1 := make(chan int)所以它没有队列,所以我们甚至不需要考虑这一点:它在 aclose(c1)关闭它后终止。 您现在应该寻找一个closecloses c1。我们的近在哪里?这是关闭的地方c1:func populate(c chan int) {&nbsp; &nbsp; for i := 0; i < 100; i++ {&nbsp; &nbsp; &nbsp; &nbsp; c <- i&nbsp; &nbsp; }&nbsp; &nbsp; close(c)}我们称之为 withc1作为它的参数,所以它的 final close(c)closes c1。现在你可以问:我们什么时候接到这个close电话? 答案很明显:i >= 100在循环之后,即,在我们将 100 个值(分别从 0 到 99)发送到 channel 之后c1。什么fanOutIn是剥离 10 个 goroutine。10 个 goroutine 中的每一个都运行我上面引用的第一个匿名函数。该匿名函数有一个循环,运行次数不定,一直重复直到通道c1关闭。循环中的每次行程都会获取通道的值,因此最初,如果十个 goroutine 都设法在有任何可用值之前启动,那么所有十个 goroutine 都将等待值。当生产者函数将一个值放入通道时,十个等待的 goroutine 中的一个将获取它并开始使用它。如果该 goroutine 需要很长时间才能回到自己for循环的顶部,则另一个 goroutine 将获取下一个生成的值。所以这里发生的情况是,多达 10 个生成的值通过通道传播到多达 10 个 goroutine。1 这些(最多 10 个)goroutine 中的每一个都花费了一些重要的时间来使用它的值,然后将最终产品值发送到通道c2并返回到它自己的无限for循环的顶部。只有当生产者关闭了它的通道c(这里是我们的c1)时,十个 goroutine 才会看到一个关闭的通道空队列,从而允许它们退出for循环。当他们退出他们的for循环时,他们每个人都会调用wg.Done()(一次)并终止。因此,一旦close(c1)发生(通过close(c)in populate),最终所有十个匿名 goroutine 都将调用wg.Done(). 届时,wg.Wait()infanOutIn将返回。这将调用close(c2)并从 中返回fanOutIn,同时终止该goroutine。同时,在 中main,我们使用for v := range c2从通道中读取c2。当十个 goroutine 中的任何一个for写入值时,此循环将运行。c2只有当它自己关闭时它才会退出c2(它的队列也必须是空的,但又c2是一个零长度队列)。所以在关闭之前main不会继续通过for循环,直到返回才会发生,直到发生十次调用才会发生,直到通道关闭才会发生。c2wg.Wait()wg.Done()c1这意味着在调用之前main无法通过自己的for循环,并且只有在生成恰好 100 个值之后才会发生这种情况。populateclose(c)
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go