io.Pipe() 导致 WaitGroup 卡住

我正在处理一个巨大的数据文件,大约是。100 GB。这个巨大文件中的每一行都是一段 JSON 数据,我想读取、压缩这些数据并将其存储在内存数据库中。


var wg sync.WaitGroup

for {

    line, err := reader.ReadString('\n')

    if err != nil {

        break

    }

    go func(index int) {

        wg.Add(1)

        pr, pw := io.Pipe()

        zw := lzw.NewWriter(pw, lzw.LSB, 8)

        _, err := io.Copy(zw, strings.NewReader(line))

        pw.Close()

        zw.Close()

        if err != nil {

            fmt.Println(err.Error())

        }

        b, err := io.ReadAll(pr)

        if err != nil {

            fmt.Println(err.Error())

        }

        client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)

        pr.Close()

        wg.Done()

    }(index)

    if index%10000 == 0 {

        fmt.Println(index)

        wg.Wait()

    }

    index += 1

}

但是,此代码在处理前 10000 行后停止。当我向下移动时wg.Add(1),zw.Close()它继续处理该行的其余部分(但变得不稳定)。如果没有lzw,io.Pipe()当我尝试以未压缩的方式存储确切的值时,一切都可以正常工作。


我不确定我是否没有WaitGroup正确使用 the 还是有一些io.Pipe()我还不知道的与 the 相关的东西。


九州编程
浏览 109回答 1
1回答

心有法竹

1-删除pr, pw := io.Pipe()使代码更简单,因为它是多余的,试试这个:line, err := reader.ReadString('\n')if err == io.EOF {&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; break}if err != nil {&nbsp; &nbsp; log.Fatal(err)}wg.Add(1)go func(index int) {&nbsp; &nbsp; var buf bytes.Buffer&nbsp; &nbsp; { // lexical scoping (static scoping)&nbsp; &nbsp; &nbsp; &nbsp; zw := lzw.NewWriter(&buf, lzw.LSB, 8)&nbsp; &nbsp; &nbsp; &nbsp; n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line))&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if int(n) != len(line) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(n, len(line))&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // It is the caller's responsibility to call Close on the WriteCloser when finished writing.&nbsp; &nbsp; &nbsp; &nbsp; if err = zw.Close(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)&nbsp; &nbsp; client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour)&nbsp; &nbsp; cancelFunc()&nbsp; &nbsp; wg.Done()}(index)if index%tenThousand == 0 {&nbsp; &nbsp; wg.Wait()}2-你需要把wg.Add(1)之前go func(index int) {:&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; go func(index int) {3-wg.Wait()逻辑:if index%10000 == 0 {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(index)&nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; }如果 . 最后一次迭代会发生什么index%10000 != 0。所以在这里当err == io.EOF你需要wg.Wait()让所有的 goroutines 加入时:if err == io.EOF {&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Println("\n**** All done **** index =", index)&nbsp; &nbsp; break}4-您可以使用词法范围(静态范围)来限制一些变量范围并使代码更易于管理 - 并知道何时Close:lzw.NewWriter{ // lexical scoping (static scoping)&nbsp; &nbsp; zw := lzw.NewWriter(bufio.NewWriter(&buf), lzw.LSB, 8)&nbsp; &nbsp; n, err := io.Copy(zw, strings.NewReader(line))&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }&nbsp; &nbsp; if int(n) != len(line) {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(n, len(line))&nbsp; &nbsp; }&nbsp; &nbsp; // It is the caller's responsibility to call Close on the WriteCloser when finished writing.&nbsp; &nbsp; if err = zw.Close(); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; }}5- 始终检查错误,例如:&nbsp;if err = zw.Close(); err != nil {&nbsp; &nbsp; log.Fatal(err)}这是接近你的代码的工作版本 - 试试这个只是为了试验并发逻辑看看会发生什么(不推荐因为它有多余的 goroutines 并且io.Pipe- 只是工作:package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "compress/lzw"&nbsp; &nbsp; "context"&nbsp; &nbsp; "encoding/base64"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "io"&nbsp; &nbsp; "log"&nbsp; &nbsp; "strings"&nbsp; &nbsp; "sync"&nbsp; &nbsp; "time")func main() {&nbsp; &nbsp; index := 0&nbsp; &nbsp; client := &myClient{}&nbsp; &nbsp; reader := bufio.NewReader(file)&nbsp; &nbsp; // your code:&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; index++&nbsp; &nbsp; &nbsp; &nbsp; line, err := reader.ReadString('\n')&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg <- fmt.Sprint(index, " Done not waiting with err: ", err, time.Now())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Wait() // break waiting // if index%tenThousand != 0&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; &nbsp; &nbsp; go func(i int) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg <- fmt.Sprint(i, " Enter running ... ", time.Now())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; asyncReader, asyncWriter := io.Pipe() // make it async to read and write&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go func() { // async&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _, err := io.Copy(zipWriter, strings.NewReader(line))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _ = zipWriter.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; _ = asyncWriter.Close() // for io.ReadAll&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; b, err := io.ReadAll(asyncReader)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Fatal(err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; client.Set(context.Background(), fmt.Sprintf("%d", i), base64.StdEncoding.EncodeToString(b), time.Hour*1000)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; asyncReader.Close()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(1 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg <- fmt.Sprint(i, " Exit running ... ", time.Now())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; }(index)&nbsp; &nbsp; &nbsp; &nbsp; msg <- fmt.Sprint(index, " ", index%tenThousand == 0, " after go call")&nbsp; &nbsp; &nbsp; &nbsp; if index%tenThousand == 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg <- fmt.Sprint("..", index, " Done waiting after go call. ", time.Now())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; msg <- "Bye forever."&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; close(msg)&nbsp; &nbsp; wgMsg.Wait()}// just for the Go Playground:const tenThousand = 2type myClient struct {}func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {&nbsp; &nbsp; // fmt.Println("a =", a, ", b =", b, ", t =", t)&nbsp; &nbsp; if ctx.Err() != nil {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(ctx.Err())&nbsp; &nbsp; }}var file, myw = io.Pipe()func init() {&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; for i := 1; i <= tenThousand+1; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Fprintf(myw, "%d text to compress aaaaaaaaaaaaaa\n", i)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; myw.Close()&nbsp; &nbsp; }()&nbsp; &nbsp; wgMsg.Add(1)&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer wgMsg.Done()&nbsp; &nbsp; &nbsp; &nbsp; for s := range msg {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(s)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()}var msg = make(chan string, 100)var wgMsg sync.WaitGroup输出:1 false after go call2 true after go call1 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.0000000012 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.0000000011 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.0000000012 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001..2 Done waiting after go call. 2009-11-10 23:00:01 +0000 UTC m=+1.0000000013 false after go call3 Enter running ... 2009-11-10 23:00:01 +0000 UTC m=+1.0000000014 Done not waiting with err: EOF 2009-11-10 23:00:01 +0000 UTC m=+1.0000000013 Exit running ... 2009-11-10 23:00:02 +0000 UTC m=+2.000000001Bye forever.
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go