猿问

使用 Goroutines 同时加载大型 CSV 时的未定义行为

我正在尝试使用 Golang 使用 goroutines 加载一个大的 CSV 文件。csv 的维度是 (254882, 100)。但是当我解析 csv 并将其存储到 2D 列表中时使用我的 goroutines,我得到的行小于 254882 并且每次运行的数量都不同。我觉得这是由于 goroutines 而发生的,但似乎无法指出原因。谁能帮帮我吗。我也是 Golang 的新手。下面是我的代码


func loadCSV(csvFile string) (*[][]float64, error) {

    startTime := time.Now()

    var dataset [][]float64

    f, err := os.Open(csvFile)

    if err != nil {

        return &dataset, err

    }

    r := csv.NewReader(bufio.NewReader(f))

    counter := 0

    var wg sync.WaitGroup

    for {

        record, err := r.Read()

        if err == io.EOF {

            break

        }

        if counter != 0 {

            wg.Add(1)

            go func(r []string, dataset *[][]float64) {

                var temp []float64

                for _, each := range record {

                    f, err := strconv.ParseFloat(each, 64)

                    if err == nil {

                        temp = append(temp, f)

                    }

                }

                *dataset = append(*dataset, temp)

                wg.Done()

            }(record, &dataset)

        }

        counter++

    }

    wg.Wait()

    duration := time.Now().Sub(startTime)

    log.Printf("Loaded %d rows in %v seconds", counter, duration)

    return &dataset, nil

}

我的主要功能如下所示


func main() {

    // runtime.GOMAXPROCS(4)

    dataset, err := loadCSV("AvgW2V_train.csv")

    if err != nil {

        panic(err)

    }

    fmt.Println(len(*dataset))

}

如果有人也需要下载 CSV,请单击下面的链接 (485 MB) https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing


哆啦的时光机
浏览 128回答 2
2回答

鸿蒙传说

Go数据竞争检测器您的结果未定义,因为您有数据竞争。~/gopath/src$ go run -race racer.go==================WARNING: DATA RACEWrite at 0x00c00008a060 by goroutine 6:  runtime.mapassign_faststr()      /home/peter/go/src/runtime/map_faststr.go:202 +0x0  main.main.func2()      /home/peter/gopath/src/racer.go:16 +0x6aPrevious write at 0x00c00008a060 by goroutine 5:  runtime.mapassign_faststr()      /home/peter/go/src/runtime/map_faststr.go:202 +0x0  main.main.func1()      /home/peter/gopath/src/racer.go:11 +0x6aGoroutine 6 (running) created at:  main.main()      /home/peter/gopath/src/racer.go:14 +0x88Goroutine 5 (running) created at:  main.main()      /home/peter/gopath/src/racer.go:9 +0x5b==================fatal error: concurrent map writes==================WARNING: DATA RACEWrite at 0x00c00009a088 by goroutine 6:  main.main.func2()      /home/peter/gopath/src/racer.go:16 +0x7fPrevious write at 0x00c00009a088 by goroutine 5:  main.main.func1()      /home/peter/gopath/src/racer.go:11 +0x7fGoroutine 6 (running) created at:  main.main()      /home/peter/gopath/src/racer.go:14 +0x88Goroutine 5 (running) created at:  main.main()      /home/peter/gopath/src/racer.go:9 +0x5b==================goroutine 34 [running]:runtime.throw(0x49e156, 0x15)    /home/peter/go/src/runtime/panic.go:608 +0x72 fp=0xc000094718 sp=0xc0000946e8 pc=0x44b342runtime.mapassign_faststr(0x48ace0, 0xc00008a060, 0x49c9c3, 0x8, 0xc00009a088)    /home/peter/go/src/runtime/map_faststr.go:211 +0x46c fp=0xc000094790 sp=0xc000094718 pc=0x43598cmain.main.func1(0x49c9c3, 0x8)    /home/peter/gopath/src/racer.go:11 +0x6b fp=0xc0000947d0 sp=0xc000094790 pc=0x47ac6bruntime.goexit()    /home/peter/go/src/runtime/asm_amd64.s:1340 +0x1 fp=0xc0000947d8 sp=0xc0000947d0 pc=0x473061created by main.main    /home/peter/gopath/src/racer.go:9 +0x5cgoroutine 1 [sleep]:time.Sleep(0x5f5e100)    /home/peter/go/src/runtime/time.go:105 +0x14amain.main()    /home/peter/gopath/src/racer.go:19 +0x96goroutine 35 [runnable]:main.main.func2(0x49c9c3, 0x8)    /home/peter/gopath/src/racer.go:16 +0x6bcreated by main.main    /home/peter/gopath/src/racer.go:14 +0x89exit status 2~/gopath/src$ racer.go:package mainimport (    "bufio"    "encoding/csv"    "fmt"    "io"    "log"    "os"    "strconv"    "sync"    "time")func loadCSV(csvFile string) (*[][]float64, error) {    startTime := time.Now()    var dataset [][]float64    f, err := os.Open(csvFile)    if err != nil {        return &dataset, err    }    r := csv.NewReader(bufio.NewReader(f))    counter := 0    var wg sync.WaitGroup    for {        record, err := r.Read()        if err == io.EOF {            break        }        if counter != 0 {            wg.Add(1)            go func(r []string, dataset *[][]float64) {                var temp []float64                for _, each := range record {                    f, err := strconv.ParseFloat(each, 64)                    if err == nil {                        temp = append(temp, f)                    }                }                *dataset = append(*dataset, temp)                wg.Done()            }(record, &dataset)        }        counter++    }    wg.Wait()    duration := time.Now().Sub(startTime)    log.Printf("Loaded %d rows in %v seconds", counter, duration)    return &dataset, nil}func main() {    // runtime.GOMAXPROCS(4)    dataset, err := loadCSV("/home/peter/AvgW2V_train.csv")    if err != nil {        panic(err)    }    fmt.Println(len(*dataset))}

狐的传说

没有必要使用,*[][]float64因为那将是一个双指针。我对你的程序做了一些小的修改。dataset可用于新的 goroutine,因为它是在它上面的代码块中声明的。similarlyrecord也是可用的,但是由于recordvariable 是不时变化的,我们需要将它传递给新的 goroutine。虽然不需要传递dataset,因为它没有改变,而这正是我们想要的,这样我们就可以将 temp 附加到dataset.但是当多个 goroutines 试图附加到同一个变量时,就会发生竞争条件,即多个 goroutines 试图写入同一个变量。所以我们需要确保在任何时候只有一个 can goroutine 可以添加。所以我们使用锁来进行顺序追加。package mainimport (    "bufio"    "encoding/csv"    "fmt"    "os"    "strconv"    "sync")func loadCSV(csvFile string) [][]float64 {    var dataset [][]float64    f, _ := os.Open(csvFile)    r := csv.NewReader(f)    var wg sync.WaitGroup    l := new(sync.Mutex) // lock    for record, err := r.Read(); err == nil; record, err = r.Read() {        wg.Add(1)        go func(record []string) {            defer wg.Done()            var temp []float64            for _, each := range record {                if f, err := strconv.ParseFloat(each, 64); err == nil {                    temp = append(temp, f)                }            }            l.Lock() // lock before writing            dataset = append(dataset, temp) // write            l.Unlock() // unlock        }(record)    }    wg.Wait()    return dataset}func main() {    dataset := loadCSV("train.csv")    fmt.Println(len(dataset))}有些错误没有得到处理以使其最小化,但您应该处理错误。
随时随地看视频慕课网APP

相关分类

Go
我要回答