尝试编写工作方法池时出现死锁

在下面的代码中,我不明白为什么“Worker”方法似乎退出而不是从输入通道“in”中提取值并处理它们。


我曾假设他们只会在消耗来自输入通道“in”的所有输入并处理它们之后才会返回


package main


import (

    "fmt"

    "sync"

)


type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)


type Result struct {

    i   int

    val int

}


func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {

    for item := range in {

        item *= item // returns the square of the input value

        fmt.Printf("=> %d: %d\n", id, item)

        out <- Result{item, id}

    }

    wg.Done()

    fmt.Printf("%d exiting ", id)

}


func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {

    wg := sync.WaitGroup{}

    for id := 0; id < n_workers; id++ {

        fmt.Printf("Starting : %d\n", id)

        wg.Add(1)

        go Worker(in, out, id, &wg)

    }

    wg.Wait()  // wait for all workers to complete their tasks

    close(out) // close the output channel when all tasks are completed

}


const (

    NW = 4

)


func main() {

    in := make(chan int)

    out := make(chan Result)


    go func() {

        for i := 0; i < 100; i++ {

            in <- i

        }

        close(in)

    }()

    Run_parallel(NW, in, out, Worker)


    for item := range out {

        fmt.Printf("From out : %d: %d", item.i, item.val)

    }

}



输出是


Starting : 0

Starting : 1

Starting : 2

Starting : 3

=> 3: 0

=> 0: 1

=> 1: 4

=> 2: 9

fatal error: all goroutines are asleep - deadlock!


慕森卡
浏览 66回答 3
3回答

繁华开满天机

致命错误:所有 goroutine 都处于休眠状态 - 死锁!完整的错误显示了每个 goroutine “卡住”的位置。&nbsp;如果你在操场上运行它,它甚至会显示你的行号。这让我很容易诊断。您Run_parallel在maingroutine 中运行,因此在main可以读取之前out,Run_parallel必须返回。在Run_parallel可以返回之前,它必须wg.Wait()。但在工人打电话之前wg.Done(),他们必须写信给out。这就是导致僵局的原因。一种解决方案很简单:只需Run_parallel在自己的 Goroutine 中并发运行。&nbsp;&nbsp;&nbsp;&nbsp;go&nbsp;Run_parallel(NW,&nbsp;in,&nbsp;out,&nbsp;Worker)现在,mainrange over&nbsp;out,等待outs 关闭以发出完成信号。&nbsp;Run_parallel等待工人与wg.Wait(),工人将范围内in。所有的工作都会完成,并且在完成之前程序不会结束。(https://go.dev/play/p/oMrgH2U09tQ)

暮色呼如

解决方案 :Run_parallel 必须在它自己的 goroutine 中运行:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync")type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)type Result struct {&nbsp; &nbsp; id&nbsp; int&nbsp; &nbsp; val int}func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for item := range in {&nbsp; &nbsp; &nbsp; &nbsp; item *= 2 // returns the double of the input value (Bogus handling of data)&nbsp; &nbsp; &nbsp; &nbsp; out <- Result{id, item}&nbsp; &nbsp; }}func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {&nbsp; &nbsp; wg := sync.WaitGroup{}&nbsp; &nbsp; for id := 0; id < n_workers; id++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go Worker(in, out, id, &wg)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; // wait for all workers to complete their tasks&nbsp; &nbsp; close(out) // close the output channel when all tasks are completed}const (&nbsp; &nbsp; NW = 8)func main() {&nbsp; &nbsp; in := make(chan int)&nbsp; &nbsp; out := make(chan Result)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; in <- i&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(in)&nbsp; &nbsp; }()&nbsp; &nbsp; go Run_parallel(NW, in, out, Worker)&nbsp; &nbsp; for item := range out {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("From out [%d]: %d\n", item.id, item.val)&nbsp; &nbsp; }&nbsp; &nbsp; println("- - - All done - - -")}

绝地无双

解决方案的替代配方:在那个替代公式中,没有必要将 Run_parallel 作为 goroutine 启动(它会触发自己的 goroutine)。我更喜欢第二种解决方案,因为它自动执行 Run_parallel() 必须与主函数并行运行的事实。此外,出于同样的原因,它更安全,更不容易出错(无需记住使用 go 关键字运行 Run_parallel)。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync")type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)type Result struct {&nbsp; &nbsp; id&nbsp; int&nbsp; &nbsp; val int}func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for item := range in {&nbsp; &nbsp; &nbsp; &nbsp; item *= 2 // returns the double of the input value (Bogus handling of data)&nbsp; &nbsp; &nbsp; &nbsp; out <- Result{id, item}&nbsp; &nbsp; }}func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg := sync.WaitGroup{}&nbsp; &nbsp; &nbsp; &nbsp; defer close(out) // close the output channel when all tasks are completed&nbsp; &nbsp; &nbsp; &nbsp; for id := 0; id < n_workers; id++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go Worker(in, out, id, &wg)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)&nbsp; &nbsp; }()}const (&nbsp; &nbsp; NW = 8)func main() {&nbsp; &nbsp; in := make(chan int)&nbsp; &nbsp; out := make(chan Result)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer close(in)&nbsp; &nbsp; &nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; in <- i&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; Run_parallel(NW, in, out, Worker)&nbsp; &nbsp; for item := range out {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("From out [%d]: %d\n", item.id, item.val)&nbsp; &nbsp; }&nbsp; &nbsp; println("- - - All done - - -")}
打开App,查看更多内容
随时随地看视频慕课网APP