猿问

读取超时。

我想做一些类似 unix 的东西tail -f,但是在通过 Go 的Cmd设施运行的进程产生的输出上。


显然,我的 google-fu 没有达到标准,但我确实找到了这篇文章,它引导我编写以下代码,几乎可以正常工作,但我希望能得到帮助。


如果重要的话,我会在 Mac 上运行它。


首先,这是编译为slowroll可执行文件的最小程序:


package main


import (

    "fmt"

    "time"

)


func main() {

    line := 1

    for {

        fmt.Println("This is line", line)

        line += 1

        time.Sleep(2 * time.Second)

    }

}

运行时,它会产生以下输出,每 2 秒一行:


    > ./slowroll

    This is line 1

    This is line 2

    This is line 3

    This is line 4

等等。


这是尝试读取此内容的包代码,但允许超时以便可以完成其他操作:


package timeout_io


import (

    "bufio"

    "bytes"

    "context"

    "errors"

    "time"

)


const BufferSize = 4096


var ErrTimeout = errors.New("timeout")


type TimeoutReader struct {

    b *bufio.Reader

    t time.Duration

}


func NewTimeoutReader(stdOut *bytes.Buffer) *TimeoutReader {

    return &TimeoutReader{b: bufio.NewReaderSize(stdOut, BufferSize), t: 0}

}


func (r *TimeoutReader) SetTimeout(t time.Duration) time.Duration {

    prev := r.t

    r.t = t

    return prev

}


type CallResponse struct {

    Resp string

    Err  error

}


func helper(r *bufio.Reader) <-chan *CallResponse {

    respChan := make(chan *CallResponse, 1)


    go func() {

        resp, err := r.ReadString('\n')


        if err != nil {

            respChan <- &CallResponse{resp, err}

        } else {

            respChan <- &CallResponse{resp, nil}

        }


        return

    }()


    return respChan

}


func (r *TimeoutReader) ReadLineCtx(ctx context.Context) (string, error) {

    select {

    case <-ctx.Done():

        return "", ErrTimeout

    case respChan := <-helper(r.b):

        return respChan.Resp, respChan.Err

    }

}


交互式爱情
浏览 121回答 3
3回答

慕姐4208626

通过创建管道并从该管道读取来简化代码:cmd := exec.Command("./slowroll")stdout, _ := cmd.StdoutPipe()if err := cmd.Start(); err != nil {&nbsp; &nbsp; log.Fatal(err)}s := bufio.NewScanner(stdout)for s.Scan() {&nbsp; &nbsp; fmt.Printf("%s\n", s.Bytes())}如果您的目标是监视 stderr 和 stdin 的组合输出,则对两者使用相同的管道:cmd := exec.Command("./slowroll")combined, _ := cmd.StdoutPipe()cmd.Stderr = cmd.Stdout // <-- use stdout pipe for stderrif err := cmd.Start(); err != nil {&nbsp; &nbsp; log.Fatal(err)}s := bufio.NewScanner(combined)for s.Scan() {&nbsp; &nbsp; fmt.Printf("%s\n", s.Bytes())}问题中的代码在 stdOut bytes.Buffer 上有数据竞争。

一只斗牛犬

if err != timeout_io.ErrTimeout && err != io.EOF { ...; break; }在这样的条件下, anErrTimeout将被默默地忽略并且不会中断您的阅读循环。另请注意,到达io.EOF会在无限循环中发送您的程序(尝试使用echo "Hello"而不是./slowroll作为命令)。您可能希望将break指令放在if 块之后:if err != timeout_io.ErrTimeout && err != io.EOF {&nbsp; &nbsp; fmt.Println("ReadLine got error", err)}break

缥缈止盈

昨晚深夜意识到我有点像围棋的标准行为。应该解释说目标是能够同时观看标准输出和标准错误。接受上面@Zombo 的建议,我切换到cmd.StdoutPipeand cmd.StderrPipe。主要思想是只有 goroutines 读取管道并将找到的内容放入通道,然后select在通道之间。所以slowroll.go不会产生无限输出,以表明 EOF 不会导致无限循环:package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "os"&nbsp; &nbsp; "time")func main() {&nbsp; &nbsp; line := 1&nbsp; &nbsp; for {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("This is line", line)&nbsp; &nbsp; &nbsp; &nbsp; line += 1&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(2 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; if line%3 == 0 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Fprintf(os.Stderr, "This is error %d\n", line)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if line > 10 {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}现在更简单的工作watcher.go是:package mainimport (&nbsp; &nbsp; "bufio"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "os"&nbsp; &nbsp; "os/exec"&nbsp; &nbsp; "sync")func main() {&nbsp; &nbsp; runCommand := &exec.Cmd{&nbsp; &nbsp; &nbsp; &nbsp; Path: "./slowroll",&nbsp; &nbsp; }&nbsp; &nbsp; stdOut, err := runCommand.StdoutPipe()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Can't create StdoutPipe:", err)&nbsp; &nbsp; &nbsp; &nbsp; os.Exit(1)&nbsp; &nbsp; }&nbsp; &nbsp; stdErr, err := runCommand.StderrPipe()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Can't create StderrPipe:", err)&nbsp; &nbsp; &nbsp; &nbsp; os.Exit(1)&nbsp; &nbsp; }&nbsp; &nbsp; var wg sync.WaitGroup&nbsp; &nbsp; go func(wg *sync.WaitGroup) {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; err := runCommand.Run()&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("failed due to error:", err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; os.Exit(1)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }(&wg)&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; stdOutChan := make(chan string, 1)&nbsp; &nbsp; go func(wg *sync.WaitGroup) {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; scanner := bufio.NewScanner(stdOut)&nbsp; &nbsp; &nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stdOutChan <- string(scanner.Bytes())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Ran out of stdout input, read thread bailing.")&nbsp; &nbsp; &nbsp; &nbsp; close(stdOutChan)&nbsp; &nbsp; }(&wg)&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; stdErrChan := make(chan string, 1)&nbsp; &nbsp; go func(wg *sync.WaitGroup) {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; scanner := bufio.NewScanner(stdErr)&nbsp; &nbsp; &nbsp; &nbsp; for scanner.Scan() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stdErrChan <- string(scanner.Bytes())&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Ran out of stderr input, read thread bailing.")&nbsp; &nbsp; &nbsp; &nbsp; close(stdErrChan)&nbsp; &nbsp; }(&wg)&nbsp; &nbsp; wg.Add(1)&nbsp; &nbsp; index := 1&nbsp; &nbsp; keepGoing := true&nbsp; &nbsp; for keepGoing {&nbsp; &nbsp; &nbsp; &nbsp; select {&nbsp; &nbsp; &nbsp; &nbsp; case res, isOpen := <-stdOutChan:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !isOpen {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("stdOutChan is no longer open, main bailing.")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; keepGoing = false&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(index, "s:", res)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; index += 1&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; case res, isOpen := <-stdErrChan:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if !isOpen {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("stdErrChan is no longer open, main bailing.")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; keepGoing = false&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } else {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(index, "error s:", res)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; index += 1&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; wg.Wait()&nbsp; &nbsp; fmt.Println("Done!")}输出:&nbsp; &nbsp; > go run watcher.go&nbsp; &nbsp; 1 s: This is line 1&nbsp; &nbsp; 2 s: This is line 2&nbsp; &nbsp; 3 error s: This is error 3&nbsp; &nbsp; 4 s: This is line 3&nbsp; &nbsp; 5 s: This is line 4&nbsp; &nbsp; 6 s: This is line 5&nbsp; &nbsp; 7 s: This is line 6&nbsp; &nbsp; 8 error s: This is error 6&nbsp; &nbsp; 9 s: This is line 7&nbsp; &nbsp; 10 s: This is line 8&nbsp; &nbsp; 11 s: This is line 9&nbsp; &nbsp; 12 error s: This is error 9&nbsp; &nbsp; 13 s: This is line 10&nbsp; &nbsp; Ran out of stdout input, read thread bailing.&nbsp; &nbsp; stdOutChan is no longer open, main bailing.&nbsp; &nbsp; Ran out of stderr input, read thread bailing.&nbsp; &nbsp; Done!显然,可以进行一些重构,但它可以工作,这就是目标。
随时随地看视频慕课网APP

相关分类

Go
我要回答