Bufio Scanner Goroutine - 截断/无序输出

我正在编写一个程序,该程序从 CLI 命令读取 stderr 并通过 gRPC 流将 stderr 日志流式传输到客户端。


cmd 实例化如下(CLI 命令需要配置,我将其作为 stdin 传递):


ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Duration(t)*time.Second)


defer cancel()


cmd := exec.CommandContext(ctxTimeout, "java",

    "-jar", "/opt/myapp/myapp.jar", "scan", "-config", "-",

)


cmd.Stdin = config

我使用两个单独的缓冲区:一个将 stderr“实时”流式传输到客户端,另一个将日志保存在数据库中。为此,我使用 io.MultiWriter 并将其映射到 cmd 标准输入:


bufStream := bytes.NewBuffer(make([]byte, 0, 4096))


bufPersist := new(bytes.Buffer)



stderr := io.MultiWriter(bufStream, bufPersist)


// Map the command Standard Error Output to the multiwriter

cmd.Stderr = stderr

最后,在启动命令之前,我有一个 goroutine,它使用 bufio.Scanner 来读取 stderr 缓冲区并通过 gRPC 逐行流式传输:


// Go Routine to stream the scan job logs

go func() {

    for {

        select {

        case <-done:

            return

        default:

            scanner := bufio.NewScanner(bufStream)

            for scanner.Scan() {

                time.Sleep(3 * time.Second)

                logging.MyAppLog("warning", "%v", scanner.Text())

                _ = stream.Send(&agentpb.ScanResultsResponse{

                    ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{ScanLogs: scanner.Bytes()},

                },

                )

            }


        }

    }


}()

err := cmd.Run()


done <- true

我的问题是我必须time.sleep(time.Seconds * 3)在 goroutine 中使用才能获得正确的输出。如果没有,我得到的输出顺序不正确并被截断。


我相信这是由于 io.multiwriter 和 bufio.scanner 不“同步”,但我想要一些有关最佳方法的指导。


提前致谢。


料青山看我应如是
浏览 188回答 1
1回答

慕雪6442864

来自扫描仪文档:Bytes 返回调用 Scan 生成的最新令牌。底层数组可能指向将被后续调用 Scan 覆盖的数据。它不进行分配。gRPC 有自己的缓冲。这意味着当 Send 返回时,字节不一定已写入线路,并且下一个 Scan 调用会修改尚未写入的字节。复制 Scan 返回的字节,应该没问题:for scanner.Scan() {    b := append([]byte(nil), scanner.Bytes()...)    stream.Send(&agentpb.ScanResultsResponse{        ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{            ScanLogs: b,        },    })}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go