猿问

并发运行速度不快

我已经写了一个代码,试图使用并发,但它无助于更快的运行。我该如何改进?


package main


import (

    "bufio"

    "fmt"

    "os"

    "strings"

    "sync"

)


var wg sync.WaitGroup


func checkerr(e error) {


    if e != nil {

        fmt.Println(e)

    }

}


func readFile() {


    file, err := os.Open("data.txt")

    checkerr(err)

    fres, err := os.Create("resdef.txt")

    checkerr(err)


    defer file.Close()

    defer fres.Close()


    scanner := bufio.NewScanner(file)

    for scanner.Scan() {

        wg.Add(1)

        go func() {

            words := strings.Fields(scanner.Text())

            shellsort(words)

            writeToFile(fres, words)

            wg.Done()

        }()

        wg.Wait()

    }

}


func shellsort(words []string) {


    for inc := len(words) / 2; inc > 0; inc = (inc + 1) * 5 / 11 {

        for i := inc; i < len(words); i++ {

            j, temp := i, words[i]

            for ; j >= inc && strings.ToLower(words[j-inc]) > strings.ToLower(temp); j -= inc {

                words[j] = words[j-inc]

            }

            words[j] = temp

        }

    }

}


func writeToFile(f *os.File, words []string) {


    datawriter := bufio.NewWriter(f)

    for _, s := range words {

        datawriter.WriteString(s + " ")

    }

    datawriter.WriteString("\n")

    datawriter.Flush()

}


func main() {

    readFile()

}


一切都很好,除了在没有并发的情况下,做所有事情需要相同的时间。


阿晨1998
浏览 135回答 3
3回答

狐的传说

您必须在循环后放置:wg.Wait()for&nbsp; &nbsp; for condition {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // a concurrent job here&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()注意:工作本身应具有并发性质。这是我测试的解决方案 - 按顺序从输入文件中读取,然后执行并发任务,最后按顺序写入输出文件,请尝试以下操作:npackage mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "log"&nbsp; &nbsp; "os"&nbsp; &nbsp; "runtime"&nbsp; &nbsp; "sort"&nbsp; &nbsp; "strings"&nbsp; &nbsp; "sync")type sortQueue struct {&nbsp; &nbsp; index int&nbsp; &nbsp; data&nbsp; []string}func main() {&nbsp; &nbsp; n := runtime.NumCPU()&nbsp; &nbsp; a := make(chan sortQueue, n)&nbsp; &nbsp; b := make(chan sortQueue, n)&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; for i := 0; i < n; i++ {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go parSort(a, b, &wg)&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; file, err := os.Open("data.txt")&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; defer file.Close()&nbsp; &nbsp; &nbsp; &nbsp; scanner := bufio.NewScanner(file)&nbsp; &nbsp; &nbsp; &nbsp; i := 0&nbsp; &nbsp; &nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; a <- sortQueue{index: i, data: strings.Fields(scanner.Text())}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; i++&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(a)&nbsp; &nbsp; &nbsp; &nbsp; err = scanner.Err()&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; fres, err := os.Create("resdef.txt")&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }&nbsp; &nbsp; defer fres.Close()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(b)&nbsp; &nbsp; }()&nbsp; &nbsp; writeToFile(fres, b, n)}func writeToFile(f *os.File, b chan sortQueue, n int) {&nbsp; &nbsp; m := make(map[int][]string, n)&nbsp; &nbsp; order := 0&nbsp; &nbsp; for v := range b {&nbsp; &nbsp; &nbsp; &nbsp; m[v.index] = v.data&nbsp; &nbsp; &nbsp; &nbsp; var slice []string&nbsp; &nbsp; &nbsp; &nbsp; exist := true&nbsp; &nbsp; &nbsp; &nbsp; for exist {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; slice, exist = m[order]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if exist {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; delete(m, order)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; order++&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; s := strings.Join(slice, " ")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(s)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _, err := f.WriteString(s + "\n")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}func parSort(a, b chan sortQueue, wg *sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; for q := range a {&nbsp; &nbsp; &nbsp; &nbsp; sort.Slice(q.data, func(i, j int) bool { return q.data[i] < q.data[j] })&nbsp; &nbsp; &nbsp; &nbsp; b <- q&nbsp; &nbsp; }}data.txt文件:1 2 0 3a 1 b d 0 c&nbsp;aa cc bb输出:0 1 2 30 1 a b c daa bb cc

一只名叫tom的猫

您没有并行化任何内容,因为对于每个调用,您都有匹配的调用。这是一对一的:你生成一个Go例程,然后立即阻止主要的Go例程,等待新生成的例程完成。wg.Add(1)wg.Wait()a 的要点是等待许多事情完成,只需调用一次,即可生成所有 Go 例程。WaitGroupwg.Wait()但是,除了将呼叫固定为 ,还需要控制对扫描仪的并发访问。一种方法可能是使用一个通道,让扫描仪向等待的 Go 例程发出文本行:wg.Wait&nbsp; &nbsp; lines := make(chan string)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for line := range lines {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func(line string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; words := strings.Fields(line)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; shellsort(words)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; writeToFile(fres, words)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }(line)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; scanner := bufio.NewScanner(file)&nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; lines <- scanner.Text()&nbsp; &nbsp; }&nbsp; &nbsp; close(lines)请注意,这可能会导致文件中出现乱码输出,因为您有许多并发的 Go 例程同时写入其结果。您可以通过第二个通道控制输出:&nbsp; &nbsp; lines := make(chan string)&nbsp; &nbsp; out := make(chan []string)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for line := range lines {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func(line string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; words := strings.Fields(line)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; shellsort(words)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- words&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }(line)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for words := range out {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; writeToFile(fres, words)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; scanner := bufio.NewScanner(file)&nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; lines <- scanner.Text()&nbsp; &nbsp; }&nbsp; &nbsp; close(lines)&nbsp; &nbsp; close(out)此时,您可以重构为“读取器”、“处理器”和“写入器”,它们形成通过通道进行通信的管道。读取器和写入器使用单个 go 例程来防止对资源的并发访问,而处理器生成许多 go 例程(当前未绑定)以跨多个处理器“扇出”工作:package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "os"&nbsp; &nbsp; "strings")func main() {&nbsp; &nbsp; lines := reader()&nbsp; &nbsp; out := processor(lines)&nbsp; &nbsp; writer(out)}func reader() chan<- string {&nbsp; &nbsp; lines := make(chan string)&nbsp; &nbsp; file, err := os.Open("data.txt")&nbsp; &nbsp; checkerr(err)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; scanner := bufio.NewScanner(file)&nbsp; &nbsp; &nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines <- scanner.Text()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(lines)&nbsp; &nbsp; }()&nbsp; &nbsp; return lines}func processor(lines chan<- string) chan []string {&nbsp; &nbsp; out := make(chan []string)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for line := range lines {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func(line string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; words := strings.Fields(line)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; shellsort(words)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- words&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }(line)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(out)&nbsp; &nbsp; }()&nbsp; &nbsp; return out}func writer(out chan<- []string) {&nbsp; &nbsp; fres, err := os.Create("resdef.txt")&nbsp; &nbsp; checkerr(err)&nbsp; &nbsp; for words := range out {&nbsp; &nbsp; &nbsp; &nbsp; writeToFile(fres, words)&nbsp; &nbsp; }}

喵喵时光机

正如其他答案所说,通过等待每次循环迭代,您将并发性限制为1(无并发)。有很多方法可以解决这个问题,但什么是正确的完全取决于什么需要时间,而这个问题还没有表现出来。并发不会神奇地使事情变得更快;它只是让事情同时发生,这只会让事情变得更快,如果花费大量时间的事情可以同时发生。WaitGroup据推测,在您的代码中,需要很长时间的事情是排序。如果是这种情况,您可以执行如下操作:results := make(chan []string)for scanner.Scan() {&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go func(line string) {&nbsp; &nbsp; &nbsp; &nbsp; words := strings.Fields(line)&nbsp; &nbsp; &nbsp; &nbsp; shellsort(words)&nbsp; &nbsp; &nbsp; &nbsp; result <- words&nbsp; &nbsp; }(scanner.Text())}go func() {&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; close(results)}()for words := range results {&nbsp; &nbsp; writeToFile(fres, words)}这会将 移动到应有的位置,并避免并发使用扫描程序和写入器。这应该比串行处理更快,如果排序花费了大量的处理时间。Wait
随时随地看视频慕课网APP

相关分类

Go
我要回答