心有法竹
1-删除pr, pw := io.Pipe()使代码更简单,因为它是多余的,试试这个:line, err := reader.ReadString('\n')if err == io.EOF { wg.Wait() break}if err != nil { log.Fatal(err)}wg.Add(1)go func(index int) { var buf bytes.Buffer { // lexical scoping (static scoping) zw := lzw.NewWriter(&buf, lzw.LSB, 8) n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line)) if err != nil { log.Fatal(err) } if int(n) != len(line) { log.Fatal(n, len(line)) } // It is the caller's responsibility to call Close on the WriteCloser when finished writing. if err = zw.Close(); err != nil { log.Fatal(err) } } ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond) client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour) cancelFunc() wg.Done()}(index)if index%tenThousand == 0 { wg.Wait()}2-你需要把wg.Add(1)之前go func(index int) {: wg.Add(1) go func(index int) {3-wg.Wait()逻辑:if index%10000 == 0 { fmt.Println(index) wg.Wait() }如果 . 最后一次迭代会发生什么index%10000 != 0。所以在这里当err == io.EOF你需要wg.Wait()让所有的 goroutines 加入时:if err == io.EOF { wg.Wait() fmt.Println("\n**** All done **** index =", index) break}4-您可以使用词法范围(静态范围)来限制一些变量范围并使代码更易于管理 - 并知道何时Close:lzw.NewWriter{ // lexical scoping (static scoping) zw := lzw.NewWriter(bufio.NewWriter(&buf), lzw.LSB, 8) n, err := io.Copy(zw, strings.NewReader(line)) if err != nil { log.Fatal(err) } if int(n) != len(line) { log.Fatal(n, len(line)) } // It is the caller's responsibility to call Close on the WriteCloser when finished writing. if err = zw.Close(); err != nil { log.Fatal(err) }}5- 始终检查错误,例如: if err = zw.Close(); err != nil { log.Fatal(err)}这是接近你的代码的工作版本 - 试试这个只是为了试验并发逻辑看看会发生什么(不推荐因为它有多余的 goroutines 并且io.Pipe- 只是工作:package mainimport ( "bufio" "compress/lzw" "context" "encoding/base64" "fmt" "io" "log" "strings" "sync" "time")func main() { index := 0 client := &myClient{} reader := bufio.NewReader(file) // your code: var wg sync.WaitGroup for { index++ line, err := reader.ReadString('\n') if err != nil { msg <- fmt.Sprint(index, " Done not waiting with err: ", err, time.Now()) wg.Wait() // break waiting // if index%tenThousand != 0 break } wg.Add(1) go func(i int) { msg <- fmt.Sprint(i, " Enter running ... ", time.Now()) asyncReader, asyncWriter := io.Pipe() // make it async to read and write zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8) go func() { // async _, err := io.Copy(zipWriter, strings.NewReader(line)) if err != nil { log.Fatal(err) } _ = zipWriter.Close() _ = asyncWriter.Close() // for io.ReadAll }() b, err := io.ReadAll(asyncReader) if err != nil { log.Fatal(err) } client.Set(context.Background(), fmt.Sprintf("%d", i), base64.StdEncoding.EncodeToString(b), time.Hour*1000) asyncReader.Close() time.Sleep(1 * time.Second) msg <- fmt.Sprint(i, " Exit running ... ", time.Now()) wg.Done() }(index) msg <- fmt.Sprint(index, " ", index%tenThousand == 0, " after go call") if index%tenThousand == 0 { wg.Wait() msg <- fmt.Sprint("..", index, " Done waiting after go call. ", time.Now()) } } msg <- "Bye forever." wg.Wait() close(msg) wgMsg.Wait()}// just for the Go Playground:const tenThousand = 2type myClient struct {}func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) { // fmt.Println("a =", a, ", b =", b, ", t =", t) if ctx.Err() != nil { fmt.Println(ctx.Err()) }}var file, myw = io.Pipe()func init() { go func() { for i := 1; i <= tenThousand+1; i++ { fmt.Fprintf(myw, "%d text to compress aaaaaaaaaaaaaa\n", i) } myw.Close() }() wgMsg.Add(1) go func() { defer wgMsg.Done() for s := range msg { fmt.Println(s) } }()}var msg = make(chan string, 100)var wgMsg sync.WaitGroup输出:1 false after go call2 true after go call1 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.0000000012 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.0000000011 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.0000000012 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001..2 Done waiting after go call. 2009-11-10 23:00:01 +0000 UTC m=+1.0000000013 false after go call3 Enter running ... 2009-11-10 23:00:01 +0000 UTC m=+1.0000000014 Done not waiting with err: EOF 2009-11-10 23:00:01 +0000 UTC m=+1.0000000013 Exit running ... 2009-11-10 23:00:02 +0000 UTC m=+2.000000001Bye forever.