猿问

Go - 为什么调度 goroutine 后台工作人员也需要自己的 goroutine?

我正在研究 Go 的一些并发模式。我查看了使用 goroutine 和输入/输出通道来实现后台工作者,并注意到当我将新作业发送到接收通道(基本上是将新作业排队)时,我必须在 goroutine 中执行它,否则调度会被搞砸。意义:


这会崩溃:

for _, jobData := range(dataSet) {

    input <- jobData

}

这有效:

go func() {

    for _, jobData := range(dataSet) {

        input <- jobData

    }

}()

对于更具体的事情,我玩了一些无意义的代码(这里是在 go playground 中):


package main


import (

    "log"

    "runtime"

)


func doWork(data int) (result int) {

    // ... some 'heavy' computation

    result = data * data

    return

}


// do the processing of the input and return

// results on the output channel

func Worker(input, output chan int) {

    for data := range input {

        output <- doWork(data)

    }

}


func ScheduleWorkers() {


    input, output := make(chan int), make(chan int)


    for i := 0 ; i < runtime.NumCPU() ; i++ {

        go Worker(input, output)

    }


    numJobs := 20


    // THIS DOESN'T WORK

    // and crashes the program

    /*

    for i := 0 ; i < numJobs ; i++ {

        input <- i

    }

    */


    // THIS DOES

    go func() {

        for i := 0 ; i < numJobs ; i++ {

            input <- i

        }

    }()


    results := []int{}

    for i := 0 ; i < numJobs ; i++ {

        // read off results

        result := <-output

        results = append(results, result)

        // do stuff...

    }


    log.Printf("Result: %#v\n", results)

}


func main() {

    ScheduleWorkers()

}

我正在努力解决这个微妙的差异 - 感谢您的帮助。谢谢。


哈士奇WWW
浏览 219回答 3
3回答

智慧大石

您的ScheduleWorks函数在主协程(即运行该main()函数的协程,程序在该协程中启动)中通过input.&nbsp;AWorker接收它,并通过 发送另一个值output。但是此时没有人接收output,因此程序无法继续,主 goroutine 将下一个值发送到另一个Worker.对每个 Worker 重复这个推理。你有runtime.NumCPU()工人,这可能少于numJobs.&nbsp;比方说runtime.NumCPU() == 4,所以你有 4 个工人。最后,您已成功发送 4 个值,每个值都是一对一的Worker。由于没有人从 读取output,所有 Worker 都忙于尝试发送,因此它们无法通过 接受更多数据input,因此第五个input <- i将挂起。此时每个 goroutine 都在等待;这就是僵局。您会注意到,如果您启动 20 个或更多的 Worker 而不是&nbsp;runtime.NumCPU(),则该程序可以运行。那是因为主 goroutine 可以通过 发送它想要的所有内容input,因为有足够的工作人员来接收它们。如果不是所有这些,而是将input <- i循环放在另一个 goroutine 中,就像在您成功的示例中一样,maingoroutine(在其中ScheduleWorks运行)可以继续并从output.&nbsp;所以,每次这个新的 goroutine 发送一个值时,worker 发送另一个值,output主 goroutine 得到这个输出,worker 可以接收另一个值。没有人等待,程序成功了。

有只小跳蛙

这是因为 Go 中的所有内容默认都是阻塞的。当您在无缓冲通道上发送第一个值时,它会阻塞,直到接收器从通道中取出该值。可以通过添加“容量”来缓冲通道。例如:make(chan&nbsp;int,&nbsp;20)&nbsp;//&nbsp;Make&nbsp;a&nbsp;buffered&nbsp;channel&nbsp;of&nbsp;int&nbsp;with&nbsp;capacity&nbsp;20从Go 规范:容量(以元素数为单位)设置通道中缓冲区的大小。如果容量大于零,则通道是异步的:如果缓冲区未满(发送)或非空(接收),则通信操作成功而不会阻塞,并且元素按发送顺序接收。如果容量为零或不存在,则只有当发送方和接收方都准备好时,通信才能成功。您可以通过使用缓冲通道而不是非缓冲通道来使原始函数工作,但是将函数调用包装在 goroutine 中可能是更好的方法,因为它实际上是并发的。来自Effective Go(完整阅读此文档!它可能是 Stack Overflow 上 Go 答案中链接最多的文档):接收器总是阻塞直到有数据要接收。如果通道未缓冲,则发送方将阻塞,直到接收方收到该值。如果通道有缓冲区,则发送方只会阻塞,直到值被复制到缓冲区;如果缓冲区已满,这意味着等待某个接收器检索到一个值。如果您使用缓冲通道,那么您只是填充通道,继续前进,然后再次排空。不能同时进行。例子:改变input,&nbsp;output&nbsp;:=&nbsp;make(chan&nbsp;int),&nbsp;make(chan&nbsp;int)到input,&nbsp;output&nbsp;:=&nbsp;make(chan&nbsp;int,&nbsp;20),&nbsp;make(chan&nbsp;int,&nbsp;20)
随时随地看视频慕课网APP

相关分类

Go
我要回答