猿问

如何等待执行

我有一个大型日志文件,您想对其进行并行分析。


我有这个代码:


package main


import (

    "bufio"

    "fmt"

    "os"

    "time"

)


func main() {

    filename := "log.txt"

    threads := 10


    // Read the  file

    file, err := os.Open(filename)

    if err != nil {

        fmt.Println("Could not open file with the database.")

        os.Exit(1)

    }

    defer file.Close()


    scanner := bufio.NewScanner(file)


    // Channel for strings

    tasks := make(chan string)


    // Run the threads that catch events from the channel and understand one line of the log file

    for i := 0; i < threads; i++ {

        go parseStrings(tasks)

    }


    // Start a thread load lines from a file into the channel

    go getStrings(scanner, tasks)


    // At this point I have to wait until all of the threads executed

    // For example, I set the sleep

    for {

        time.Sleep(1 * time.Second)

    }

}


func getStrings(scanner *bufio.Scanner, tasks chan<- string) {

    for scanner.Scan() {

        s := scanner.Text()

        tasks <- s

    }

}


func parseStrings(tasks <-chan string) {

    for {

        s := <-tasks

        event := parseLine(s)

        fmt.Println(event)

    }

}


func parseLine(line string) string {

    return line

}

实际上,当我等待所有线程结束时?有人建议我创建一个单独的线程,在其中添加结果,但如何添加?


忽然笑
浏览 154回答 3
3回答

繁华开满天机

使用管道模式和“扇出/扇入”模式:package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "strings"&nbsp; &nbsp; "sync")func main() {&nbsp; &nbsp; file := "here is first line\n" +&nbsp; &nbsp; &nbsp; &nbsp; "here is second line\n" +&nbsp; &nbsp; &nbsp; &nbsp; "here is line 3\n" +&nbsp; &nbsp; &nbsp; &nbsp; "here is line 4\n" +&nbsp; &nbsp; &nbsp; &nbsp; "here is line 5\n" +&nbsp; &nbsp; &nbsp; &nbsp; "here is line 6\n" +&nbsp; &nbsp; &nbsp; &nbsp; "here is line 7\n"&nbsp; &nbsp; scanner := bufio.NewScanner(strings.NewReader(file))&nbsp; &nbsp; // all lines onto one channel&nbsp; &nbsp; in := getStrings(scanner)&nbsp; &nbsp; // FAN OUT&nbsp; &nbsp; // Multiple functions reading from the same channel until that channel is closed&nbsp; &nbsp; // Distribute work across multiple functions (ten goroutines) that all read from in.&nbsp; &nbsp; xc := fanOut(in, 10)&nbsp; &nbsp; // FAN IN&nbsp; &nbsp; // multiplex multiple channels onto a single channel&nbsp; &nbsp; // merge the channels from c0 through c9 onto a single channel&nbsp; &nbsp; for n := range merge(xc) {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(n)&nbsp; &nbsp; }}func getStrings(scanner *bufio.Scanner) <-chan string {&nbsp; &nbsp; out := make(chan string)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- scanner.Text()&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(out)&nbsp; &nbsp; }()&nbsp; &nbsp; return out}func fanOut(in <-chan string, n int) []<-chan string {&nbsp; &nbsp; var xc []<-chan string&nbsp; &nbsp; for i := 0; i < n; i++ {&nbsp; &nbsp; &nbsp; &nbsp; xc = append(xc, parseStrings(in))&nbsp; &nbsp; }&nbsp; &nbsp; return xc}func parseStrings(in <-chan string) <-chan string {&nbsp; &nbsp; out := make(chan string)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for n := range in {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- parseLine(n)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; close(out)&nbsp; &nbsp; }()&nbsp; &nbsp; return out}func parseLine(line string) string {&nbsp; &nbsp; return line}func merge(cs []<-chan string) <-chan string {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; wg.Add(len(cs))&nbsp; &nbsp; out := make(chan string)&nbsp; &nbsp; for _, c := range cs {&nbsp; &nbsp; &nbsp; &nbsp; go func(c <-chan string) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for n := range c {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out <- n&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }(c)&nbsp; &nbsp; }&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(out)&nbsp; &nbsp; }()&nbsp; &nbsp; return out}

眼眸繁星

只需使用sync.WaitGrouppackage mainimport(&nbsp; &nbsp; "sync")func stuff(wg *sync.WaitGroup) {&nbsp; &nbsp; defer wg.Done() // tell the WaitGroup it's done&nbsp; &nbsp; /* stuff */}func main() {&nbsp; &nbsp; count := 50&nbsp; &nbsp; wg := new(sync.WaitGroup)&nbsp; &nbsp; wg.Add(count) // add number of gorutines to the WaitGroup&nbsp; &nbsp; for i := 0; i < count; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go stuff(wg)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait() // wait for all}

肥皂起泡泡

var wg sync.WaitGroup启动每个 goroutine 时执行以下操作:wg.Add(1)当 goroutine 工作完成时,计数器递减wg.Done()结果,而不是for&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;time.Sleep(1&nbsp;*&nbsp;time.Second)}做&nbsp;wg.Wait()
随时随地看视频慕课网APP

相关分类

Go
我要回答