程序使用等待组进入死锁

我正在编写一个程序,它读取名为 orders.csv 的文件中的订单号列表,并将其与文件夹中存在的其他 csv 文件进行比较。


问题是即使使用等待组它也会陷入死锁,我不知道为什么。


出于某种原因,stackoverflow 说我的帖子主要是代码,所以我必须添加这一行,因为如果有人想帮助我调试我遇到的这个问题,整个代码都是必要的。


package main


import (

    "bufio"

    "fmt"

    "log"

    "os"

    "path/filepath"

    "strings"

    "sync"

)


type Files struct {

    filenames []string

}


type Orders struct {

    ID []string

}


var ordersFilename string = "orders.csv"


func main() {

    var (

        ordersFile *os.File

        files       Files

        orders     Orders

        err        error

    )


    mu := new(sync.Mutex)

    wg := &sync.WaitGroup{}

    wg.Add(1)


    if ordersFile, err = os.Open(ordersFilename); err != nil {

        log.Fatalln("Could not open file: " + ordersFilename)

    }


    orders = getOrderIDs(ordersFile)


    files.filenames = getCSVsFromCurrentDir()


    var filenamesSize = len(files.filenames)

    var ch = make(chan map[string][]string, filenamesSize)

    var done = make(chan bool)


    for i, filename := range files.filenames {

        go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {

            wg.Add(1)

            defer wg.Done()

            checkFile(currentFilename, orders, ch)

            mu.Lock()

            *filenamesSize--

            mu.Unlock()

            if i == *filenamesSize {

                done <- true

                close(done)

            }

        }(filename, ch, i, orders, wg, &filenamesSize, mu, done)

    }


    select {

    case str := <-ch:

        fmt.Printf("%+v\n", str)

    case <-done:

        wg.Done()

        break

    }


    wg.Wait()

    close(ch)

}



繁星coding
浏览 119回答 3
3回答

梦里花落0921

第一个问题是wg.Addalways 必须在它所代表的 goroutine(s) 之外。如果不是,则&nbsp;wg.Wait可能会在 goutine 实际开始运行(并被调用wg.Add)之前调用该调用,因此会“认为”没有什么可等待的。代码的第二个问题是它等待例程完成的方式有多种。有渠道WaitGroup,有done渠道。仅使用其中之一。哪一个还取决于如何使用 goroutines 的结果。在这里,我们来到下一个问题。第三个问题是收集结果。目前,该代码仅打印/使用来自 goroutine 的单个结果。for { ... }在选择周围放置一个循环,如果通道关闭,则使用它return来跳出循环。done(请注意,您不需要在done频道上发送任何内容,关闭它就足够了。)改进版 0.0.1所以这里的第一个版本(包括其他一些“代码清理”)带有done用于关闭和WaitGroup删除的通道:func main() {&nbsp; &nbsp; ordersFile, err := os.Open(ordersFilename)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatalln("Could not open file: " + ordersFilename)&nbsp; &nbsp; }&nbsp; &nbsp; orders := getOrderIDs(ordersFile)&nbsp; &nbsp; files := Files{&nbsp; &nbsp; &nbsp; &nbsp; filenames: getCSVsFromCurrentDir(),&nbsp; &nbsp; }&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; mu = new(sync.Mutex)&nbsp; &nbsp; &nbsp; &nbsp; filenamesSize = len(files.filenames)&nbsp; &nbsp; &nbsp; &nbsp; ch = make(chan map[string][]string, filenamesSize)&nbsp; &nbsp; &nbsp; &nbsp; done = make(chan bool)&nbsp; &nbsp; )&nbsp; &nbsp; for i, filename := range files.filenames {&nbsp; &nbsp; &nbsp; &nbsp; go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; checkFile(currentFilename, orders, ch)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mu.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; *filenamesSize--&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mu.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // TODO: This also accesses filenamesSize, so it also needs to be protected with the mutex:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if i == *filenamesSize {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; done <- true&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(done)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(filename, ch, i, orders, &filenamesSize, mu, done)&nbsp; &nbsp; }&nbsp; &nbsp; // Note: closing a channel is not really needed, so you can omit this:&nbsp; &nbsp; defer close(ch)&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case str := <-ch:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("%+v\n", str)&nbsp; &nbsp; &nbsp; &nbsp; case <-done:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}改进版 0.0.2但是,在您的情况下,我们有一些优势。我们确切地知道我们启动了多少个 goroutine,因此也知道我们期望有多少结果。(当然,如果每个 goroutine 返回一个当前代码所做的结果。)这为我们提供了另一种选择,因为我们可以使用另一个具有相同迭代次数的 for 循环来收集结果:func main() {&nbsp; &nbsp; ordersFile, err := os.Open(ordersFilename)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatalln("Could not open file: " + ordersFilename)&nbsp; &nbsp; }&nbsp; &nbsp; orders := getOrderIDs(ordersFile)&nbsp; &nbsp; files := Files{&nbsp; &nbsp; &nbsp; &nbsp; filenames: getCSVsFromCurrentDir(),&nbsp; &nbsp; }&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; // Note: a buffered channel helps speed things up. The size does not need to match the size of the items that will&nbsp; &nbsp; &nbsp; &nbsp; //&nbsp; &nbsp;be passed through the channel. A fixed, small size is perfect here.&nbsp; &nbsp; &nbsp; &nbsp; ch = make(chan map[string][]string, 5)&nbsp; &nbsp; )&nbsp; &nbsp; for _, filename := range files.filenames {&nbsp; &nbsp; &nbsp; &nbsp; go func(filename string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // orders and channel are not variables of the loop and can be used without copying&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; checkFile(filename, orders, ch)&nbsp; &nbsp; &nbsp; &nbsp; }(filename)&nbsp; &nbsp; }&nbsp; &nbsp; for range files.filenames {&nbsp; &nbsp; &nbsp; &nbsp; str := <-ch&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("%+v\n", str)&nbsp; &nbsp; }}简单很多,不是吗?希望有帮助!

慕的地8271018

这段代码有很多错误。您使用 WaitGroup 错误。Add 必须在主 goroutine 中调用,否则有可能在所有 Add 调用完成之前调用 Wait。在初始化与 Done() 调用不匹配的 WaitGroup 之后,有一个无关的 Add(1) 调用,因此 Wait 永远不会返回(假设上面的点是固定的)。您同时使用 WaitGroup 和 done 通道来表示完成。这充其量是多余的。您正在读取 filenamesSize 而没有持有锁(在if i == *filenamesSize语句中)。这是一个竞争条件。首先,这种i == *filenamesSize情况毫无意义。Goroutines 以任意顺序执行,所以你不能确定 i == 0 的 goroutine 是最后一个减少 filenamesSize这可以通过去掉大部分同步原语并在所有 goroutine 完成后简单地关闭 ch 通道来简化:func main() {&nbsp;&nbsp; &nbsp; ch := make(chan map[string][]string)&nbsp; &nbsp; var wg WaitGroup&nbsp; &nbsp; for _, filename := range getCSVsFromCurrentDir() {&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; filename := filename // capture loop var&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; checkFile(filename, orders, ch)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp;&nbsp; &nbsp; go func() {&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait() // after all goroutines are done...&nbsp; &nbsp; &nbsp; &nbsp; close(ch) // let range loop below exit&nbsp; &nbsp; }()&nbsp; &nbsp; for str := range ch {&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; // ...&nbsp; &nbsp; }&nbsp;}

慕少森

不是答案,而是一些不适合评论框的评论。在这部分代码中func main() {&nbsp; &nbsp; var (&nbsp; &nbsp; &nbsp; &nbsp; ordersFile *os.File&nbsp; &nbsp; &nbsp; &nbsp; files&nbsp; &nbsp; &nbsp; &nbsp;Files&nbsp; &nbsp; &nbsp; &nbsp; orders&nbsp; &nbsp; &nbsp;Orders&nbsp; &nbsp; &nbsp; &nbsp; err&nbsp; &nbsp; &nbsp; &nbsp; error&nbsp; &nbsp; )&nbsp; &nbsp; mu := new(sync.Mutex)&nbsp; &nbsp; wg := &sync.WaitGroup{}&nbsp; &nbsp; wg.Add(1)最后一条语句是对 wg.Add 的调用,它看起来是悬空的。我的意思是我们很难理解什么会触发所需的 wg.Done 对应部分。在没有 wg.Done 的情况下调用 wg.Add 是一个错误,如果不以这样的方式编写它们很容易出错,我们无法立即找到它们。在代码的那部分,显然是错误的&nbsp; &nbsp; go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()考虑到当例程执行时,您将 1 添加到等待组,父例程继续执行。看这个例子: https: //play.golang.org/p/N9Chaqkv4bd 主程序不等待等待组,因为它没有时间递增。还有更多要说的,但我发现很难理解你的代码的目的,所以我不确定如何在不重写它的情况下进一步帮助你。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go