猿问

Golang:同时处理 5 个大文件

我目前在 Perl 中处理了 5 个巨大的(每个 400 万行)日志文件,我想我可以尝试在 Go 及其并发功能中实现相同的功能。因此,由于对 Go 非常缺乏经验,我正在考虑按以下方式进行操作。对这种方法的任何评论将不胜感激。一些粗略的伪代码:


var wg1 sync.WaitGroup

var wg2 sync.WaitGroup


func processRow (r Row) {

    wg2.Add(1)

    defer wg2.Done()

    res = <process r>

    return res

}


func processFile(f File) {

    wg1.Add(1)

    open(newfile File)

    defer wg1.Done()

    line = <row from f>

    result = go processRow(line)

    newFile.Println(result) // Write new processed line to newFile

    wg2.Wait()

    newFile.Close()


}


func main() {


    for each f logfile {

        go processFile(f)

    }

    wg1.Wait()

}

所以,想法是我同时处理这 5 个文件,然后每个文件的所有行也将依次同时处理。


那行得通吗?


烙印99
浏览 211回答 1
1回答

江户川乱折腾

您绝对应该使用渠道来管理您处理过的行。或者,您也可以编写另一个 goroutine 来处理您的输出。var numGoWriters = 10func processRow(r Row, ch chan<- string) {&nbsp; &nbsp; res := process(r)&nbsp; &nbsp; ch <- res}func writeRow(f File, ch <-chan string) {&nbsp; &nbsp; w := bufio.NewWriter(f)&nbsp; &nbsp; for s := range ch {&nbsp; &nbsp; &nbsp; &nbsp; _, err := w.WriteString(s + "\n")&nbsp; &nbsp; }func processFile(f File) {&nbsp; &nbsp; outFile, err := os.Create("/path/to/file.out")&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; // handle it&nbsp; &nbsp; }&nbsp; &nbsp; defer outFile.Close()&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; ch := make(chan string, 10)&nbsp; // play with this number for performance&nbsp; &nbsp; defer close(ch) // once we're done processing rows, we close the channel&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // so our worker threads exit&nbsp; &nbsp; fScanner := bufio.NewScanner(f)&nbsp; &nbsp; for fScanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; processRow(fScanner.Text(), ch)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < numGoWriters; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go writeRow(outFile, ch)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp;&nbsp;}在这里,我们已经完成processRow了所有的处理(我假设string),完成writeRow了所有的 I/O,processFile并将每个文件绑定在一起。然后所有main要做的就是移交文件,产生 goroutines,等等。func main() {&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; filenames := [...]string{"here", "are", "some", "log", "paths"}&nbsp; &nbsp; for fname := range filenames {&nbsp; &nbsp; &nbsp; &nbsp; inFile, err := os.Open(fname)&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // handle it&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; defer inFile.Close()&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go processFile(inFile)&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()
随时随地看视频慕课网APP

相关分类

Go
我要回答