Goroutines管道中的死锁

我需要你的帮助来理解为什么我的函数会导致死锁。当我注释掉像下面这样的行时,它可以正常工作(因此我知道问题在这里)。readFromWorker


整体就在这里 https://play.golang.org/p/-0mRDAeD2tr


我真的非常感谢你的帮助


func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {

    defer func() {

        wg.Done()

    }()


    //stageIn1 := make(chan *data)

    //stageOut1 := make(chan *data)


    for v := range inCh {

        fmt.Println("v", v)


        //stageIn1 <- v

    }


    //go stage1(stageIn1, stageOut1)

    //go stage2(stageOut1)

}


千万里不及你
浏览 159回答 1
1回答

慕田峪4524236

我已经评论了你做错的相关部分。另外,我建议考虑一个更好的模式。请记住,在通道上不会停止循环,除非为它正在循环的同一通道调用。此外,关闭通道的经验法则是,发送到通道的发送方也必须关闭它,因为发送到关闭的通道会导致 。for rangeclosepanic此外,使用无缓冲和缓冲通道时要非常小心。对于无缓冲的信道,发送方和接收方必须准备就绪,否则您的情况中也会发生死锁。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "sync")type data struct {&nbsp; &nbsp; id&nbsp; &nbsp; int&nbsp; &nbsp; url&nbsp; &nbsp;string&nbsp; &nbsp; field int}type job struct {&nbsp; &nbsp; id&nbsp; int&nbsp; &nbsp; url string}func sendToWorker(id int, inCh <-chan job, outCh chan<- *data, wg *sync.WaitGroup) {&nbsp; &nbsp; // wg.Done() is itself a function call, no need to wrap it inside&nbsp; &nbsp; // an anonymous function just to use defer.&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for v := range inCh {&nbsp; &nbsp; &nbsp; &nbsp; // some pre process stuff and then pass to pipeline&nbsp; &nbsp; &nbsp; &nbsp; outCh <- &data{id: v.id, url: v.url}&nbsp; &nbsp; }}func readFromWorker(inCh <-chan *data, wg *sync.WaitGroup) {&nbsp; &nbsp; // wg.Done() is itself a function call, no need to wrap it inside&nbsp; &nbsp; // an anonymous function just to use defer.&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; stageIn1&nbsp; = make(chan *data)&nbsp; &nbsp; &nbsp; &nbsp; stageOut1 = make(chan *data)&nbsp; &nbsp; )&nbsp; &nbsp; // Spawn the goroutines so that there's no deadlock&nbsp; &nbsp; // as the sender and receiver both should be ready&nbsp; &nbsp; // when using unbuffered channels.&nbsp; &nbsp; go stage1(stageIn1, stageOut1)&nbsp; &nbsp; go stage2(stageOut1)&nbsp; &nbsp; for v := range inCh {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("v", v)&nbsp; &nbsp; &nbsp; &nbsp; stageIn1 <- v&nbsp; &nbsp; }&nbsp; &nbsp; close(stageIn1)}func stage1(in <-chan *data, out chan<- *data) {&nbsp; &nbsp; for s := range in {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("stage1 = ", s)&nbsp; &nbsp; &nbsp; &nbsp; out <- s&nbsp; &nbsp; }&nbsp; &nbsp; // Close the out channel&nbsp; &nbsp; close(out)}func stage2(out <-chan *data) {&nbsp; &nbsp; // Loop until close&nbsp; &nbsp; for s := range out {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("stage2 = ", s)&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; const chanBuffer = 1&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; inputsCh&nbsp; = make(chan job, chanBuffer)&nbsp; &nbsp; &nbsp; &nbsp; resultsCh = make(chan *data, chanBuffer)&nbsp; &nbsp; &nbsp; &nbsp; wgInput&nbsp; sync.WaitGroup&nbsp; &nbsp; &nbsp; &nbsp; wgResult sync.WaitGroup&nbsp; &nbsp; )&nbsp; &nbsp; for i := 1; i <= 4; i++ {&nbsp; &nbsp; &nbsp; &nbsp; wgInput.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go sendToWorker(i, inputsCh, resultsCh, &wgInput)&nbsp; &nbsp; }&nbsp; &nbsp; wgResult.Add(1)&nbsp; &nbsp; go readFromWorker(resultsCh, &wgResult)&nbsp; &nbsp; for j := 1; j <= 10; j++ {&nbsp; &nbsp; &nbsp; &nbsp; inputsCh <- job{id: j, url: "google.com"}&nbsp; &nbsp; }&nbsp; &nbsp; close(inputsCh)&nbsp; &nbsp; wgInput.Wait()&nbsp; &nbsp; close(resultsCh)&nbsp; &nbsp; wgResult.Wait()}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go