猿问

如何解决生产者和消费者代码中的死锁

当我运行下面的程序时,出现错误


davecheney      tweets about golang

beertocode      does not tweet about golang

ironzeb         tweets about golang

beertocode      tweets about golang

vampirewalk666  tweets about golang

fatal error: all goroutines are asleep - deadlock!


goroutine 1 [semacquire]:

sync.runtime_Semacquire(0xc000010260?)

        /usr/local/go/src/runtime/sema.go:56 +0x25

sync.(*WaitGroup).Wait(0x100c000058058?)

        /usr/local/go/src/sync/waitgroup.go:136 +0x52

main.main()

        /home/joe/go/src/github.com/go-concurrency-exercises/1-producer-consumer/main.go:53 +0x14f

死锁从何而来,如何改进程序以避免死锁?


package main


import (

    "fmt"

    "sync"

    "time"

)


func producer(stream Stream, tweetChan chan *Tweet) {

    for {

        tweet, err := stream.Next()

        if err == ErrEOF {

            close(tweetChan)

            return

        }

        tweetChan <- tweet

        //tweets = append(tweets, tweet)

    }

}


func consumer(tweetChan chan *Tweet) {

    for t := range tweetChan {

        if t.IsTalkingAboutGo() {

            fmt.Println(t.Username, "\ttweets about golang")

        } else {

            fmt.Println(t.Username, "\tdoes not tweet about golang")

        }

    }

}


func main() {

    start := time.Now()

    stream := GetMockStream()


    var wg sync.WaitGroup

    tweetChan := make(chan *Tweet)

    // Producer

    //tweets := producer(stream)

    wg.Add(2)

    go producer(stream, tweetChan)

    // Consumer

    //consumer(tweets)

    go consumer(tweetChan)


    wg.Wait()


    fmt.Printf("Process took %s\n", time.Since(start))

}

如果需要看mockstream.go,参考 https://github.com/loong/go-concurrency-exercises/tree/master/1-producer-consumer


我的程序是原程序修改main.go的并发版本


呼啦一阵风
浏览 146回答 1
1回答

墨色风雨

对 wg.Wait() 的调用一直在等待,直到组的计数器为零,但是没有正在运行的 goroutines 来递减计数器。通过在从 goroutine 函数返回之前调用 wg.Done() 来修复:func producer(wg *sync.WaitGroup, stream Stream, tweetChan chan *Tweet) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; tweet, err := stream.Next()&nbsp; &nbsp; &nbsp; &nbsp; if err == ErrEOF {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(tweetChan)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; tweetChan <- tweet&nbsp; &nbsp; }}func consumer(wg *sync.WaitGroup, tweetChan chan *Tweet) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for t := range tweetChan {&nbsp; &nbsp; &nbsp; &nbsp; if t.IsTalkingAboutGo() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(t.Username, "\ttweets about golang")&nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(t.Username, "\tdoes not tweet about golang")&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func main() {&nbsp; &nbsp; start := time.Now()&nbsp; &nbsp; stream := GetMockStream()&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; tweetChan := make(chan *Tweet)&nbsp; &nbsp; wg.Add(2)&nbsp; &nbsp; go producer(&wg, stream, tweetChan)&nbsp; &nbsp; go consumer(&wg, tweetChan)&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Printf("Process took %s\n", time.Since(start))}
随时随地看视频慕课网APP

相关分类

Go
我要回答