加载带有和不带有 go-routines 的地图

这是我遇到的一个有趣的情况。在使用 go-routines 进行一些数据操作之后,我需要从文件中读取,并根据我们发现的内容填充地图。这是简化的问题陈述和示例:


生成运行所需的数据gen_data.sh


#!/bin/bash 


rm some.dat || : 

for i in `seq 1 10000`; do 

    echo "$i `date` tx: $RANDOM rx:$RANDOM" >> some.dat

done

如果我使用 将这些行读some.dat入map[int]string没有 go-routines 中loadtoDict.go,它会保持对齐。(因为第一个和第二个词是一样的,见下面的o/p。)


在现实生活中,我确实需要在将线条加载到地图之前对其进行处理(昂贵),使用 go-routines 加快了我的字典创建速度,这是解决实际问题的重要要求。


loadtoDict.go


package main


import (

    "bufio"

    "fmt"

    "log"

    "os"

)


var (

    fileName = "some.dat"

)


func checkerr(err error) {

    if err != nil {

        fmt.Println(err)

        log.Fatal(err)

    }

}


func main() {

    ourDict := make(map[int]string)

    f, err := os.Open(fileName)

    checkerr(err)

    defer f.Close()


    fscanner := bufio.NewScanner(f)


    indexPos := 1


    for fscanner.Scan() {

        text := fscanner.Text()

        //fmt.Println("text", text)

        ourDict[indexPos] = text

        indexPos++


    }


    for i, v := range ourDict {

        fmt.Printf("%d: %s\n", i, v)

    }


}

跑步:


$ ./loadtoDict

...

8676: 8676 Mon Dec 23 15:52:24 PST 2019 tx: 17718 rx:1133

2234: 2234 Mon Dec 23 15:52:20 PST 2019 tx: 13170 rx:15962

3436: 3436 Mon Dec 23 15:52:21 PST 2019 tx: 17519 rx:5419

6177: 6177 Mon Dec 23 15:52:23 PST 2019 tx: 5731 rx:5449

注意第一个和第二个词是如何“对齐”的。但是,如果我使用 go-routines 加载我的地图,这会出错:


async_loadtoDict.go


package main


import (

    "bufio"

    "fmt"

    "log"

    "os"

    "sync"

)


var (

    fileName = "some.dat"

    mu       = &sync.RWMutex{}

    MAX = 9000

)


func checkerr(err error) {

    if err != nil {

        fmt.Println(err)

        log.Fatal(err)

    }

}


大话西游666
浏览 106回答 3
3回答

泛舟湖上清波郎朗

您的信号量sem不起作用,因为您对其进行了深度缓冲。一般来说,这是为此类任务设置地图的错误方法,因为读取文件会很慢。如果你有一个更复杂的任务——例如,读一行,想很多,设置一些东西——你会想要这个作为你的伪代码结构:type workType struct {&nbsp; &nbsp; index int&nbsp; &nbsp; line&nbsp; string}var wg sync.WaitGroupwg.Add(nWorkers)// I made this buffered originally but there's no real point, so// fixing that in an editwork := make(chan workType)for i := 0; i < nWorkers; i++ {&nbsp; &nbsp; go readAndDoWork(work, &wg)}for i := 1; fscanner.Scan(); i++ {&nbsp; &nbsp; work <- workType{index: i, line: fscanner.Text()}}close(work)wg.Wait()... now your dictionary is ready ...工人这样做:func readAndDoWork(ch chan workType, wg *sync.WorkGroup) {&nbsp; &nbsp; for item := range ch {&nbsp; &nbsp; &nbsp; &nbsp; ... do computation ...&nbsp; &nbsp; &nbsp; &nbsp; insertIntoDict(item.index, result)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Done()}抓住互斥锁(以insertIntoDict保护映射从索引到结果)并写入字典。(如果你愿意,你可以内联它。)这里的想法是设置一定数量的工作人员——可能基于可用 CPU 的数量——每个工作人员抓取下一个工作项并处理它。主 goroutine 只是打包工作,然后关闭工作通道——这将导致所有工作人员看到输入结束——然后等待他们发出信号表明他们已经完成了计算。(如果您愿意,您可以再创建一个 goroutine 来读取工作人员计算的结果并将它们放入映射中。这样您就不需要映射本身的互斥体。)

噜噜哒

正如我在评论中提到的,您无法控制 goroutine 的执行顺序,因此不应从它们内部更改索引。这是一个示例,其中与地图的交互在单个 goroutine 中,而您在其他 goroutine 中进行处理:package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "os"&nbsp; &nbsp; "sync")var (&nbsp; &nbsp; fileName = "some.dat"&nbsp; &nbsp; MAX&nbsp; &nbsp; &nbsp; = 9000)func checkerr(err error) {&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(err)&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }}type result struct {&nbsp; &nbsp; index int&nbsp; &nbsp; data string}func main() {&nbsp; &nbsp; ourDict := make(map[int]string)&nbsp; &nbsp; f, err := os.Open(fileName)&nbsp; &nbsp; checkerr(err)&nbsp; &nbsp; defer f.Close()&nbsp; &nbsp; fscanner := bufio.NewScanner(f)&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; sem := make(chan struct{}, MAX) // Use empty structs for semaphores as they have no allocation&nbsp; &nbsp; defer close(sem)&nbsp; &nbsp; out := make(chan result)&nbsp; &nbsp; defer close(out)&nbsp; &nbsp; indexPos := 1&nbsp; &nbsp; for fscanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; text := fscanner.Text()&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; sem <- struct{}{}&nbsp; &nbsp; &nbsp; &nbsp; go func(index int, data string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Defer the release of your resources, otherwise if any error occur in your goroutine&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // you'll have a deadlock&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <-sem&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Process your data&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- result{index, data}&nbsp; &nbsp; &nbsp; &nbsp; }(indexPos, text) // Pass in the data that will change on the iteration, go optimizer will move it around better&nbsp; &nbsp; &nbsp; &nbsp; indexPos++&nbsp; &nbsp; }&nbsp; &nbsp; // The goroutine is the only one to write to the dict, so no race condition&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if entry, ok := <-out; ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ourDict[entry.index] = entry.data&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return // Exit goroutine when channel closes&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; for i, v := range ourDict {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("%d: %s\n", i, v)&nbsp; &nbsp; }}

元芳怎么了

好的,我已经弄清楚了。通过复制给goroutine一个值来挂起,似乎有效。改变:for fscanner.Scan() {&nbsp; &nbsp; text := fscanner.Text()&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; sem <- 1&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; mu.Lock()&nbsp; &nbsp; &nbsp; &nbsp; defer mu.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; ourDict[indexPos] = text&nbsp; &nbsp; &nbsp; &nbsp; indexPos++&nbsp; &nbsp; &nbsp; &nbsp; <- sem&nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; }()}到for fscanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; text := fscanner.Text()&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; sem <- 1&nbsp; &nbsp; &nbsp; &nbsp; go func(mypos int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mu.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer mu.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ourDict[mypos] = text&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; <-sem&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }(indexPos)&nbsp; &nbsp; &nbsp; &nbsp; indexPos++}完整代码: https: //play.golang.org/p/dkHaisPHyHz使用工人池,package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "os"&nbsp; &nbsp; "sync")const (&nbsp; &nbsp; MAX&nbsp; &nbsp; &nbsp; = 10&nbsp; &nbsp; fileName = "some.dat")type gunk struct {&nbsp; &nbsp; line string&nbsp; &nbsp; id&nbsp; &nbsp;int}func main() {&nbsp; &nbsp; ourDict := make(map[int]string)&nbsp; &nbsp; wg := sync.WaitGroup{}&nbsp; &nbsp; mu := sync.RWMutex{}&nbsp; &nbsp; cha := make(chan gunk)&nbsp; &nbsp; for i := 0; i < MAX; i++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(id int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; textin, ok := <-cha&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mu.Lock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ourDict[textin.id] = textin.line&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mu.Unlock()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }(i)&nbsp; &nbsp; }&nbsp; &nbsp; f, err := os.Open(fileName)&nbsp; &nbsp; checkerr(err)&nbsp; &nbsp; defer f.Close()&nbsp; &nbsp; fscanner := bufio.NewScanner(f)&nbsp; &nbsp; indexPos := 1&nbsp; &nbsp; for fscanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; text := fscanner.Text()&nbsp; &nbsp; &nbsp; &nbsp; thisgunk := gunk{line: text, id: indexPos}&nbsp; &nbsp; &nbsp; &nbsp; cha <- thisgunk&nbsp; &nbsp; &nbsp; &nbsp; indexPos++&nbsp; &nbsp; }&nbsp; &nbsp; close(cha)&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; for i, v := range ourDict {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("%d: %s\n", i, v)&nbsp; &nbsp; }}func checkerr(err error) {&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(err)&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go