使用通道转到管道

我正在探索 Go 并尝试使用通道建立一种管道。我只想在 main() 中读取一些内容并将它们发送到 process() 进行处理,在这种情况下只需将值打印到屏幕上。


不幸的是,在下面的代码中, process() 似乎从不从通道读取,或者至少它不打印任何内容;我究竟做错了什么?


package main


import ( "fmt" ; "database/sql" ; _ "github.com/lib/pq" ; "time" ; "gopkg.in/redis.v3" )//; "strconv" )


type Record struct {

    userId, myDate int

    prodUrl string

}



func main(){


    //connect to db

    db, err := sql.Open(...)

    defer db.Close()


    //error check here...


    //exec query

    rows, err := db.Query("select userID,url,date from mytable limit 10")

    defer rows.Close()


    //error check here...   


    //create channel to buffer rows read

    bufferChan := make(chan *Record,1000)

    go process(bufferChan)


    //iterate through results and send them to process()

    row := new(Record)

    for rows.Next(){

        err := rows.Scan(&row.userId, &row.prodUrl, &row.myDate)        

        bufferChan <- row

        fmt.Printf("row sent %v",row.userId)                    

    }   

}


//prints Record values

func process (buffer chan *Record) {

    row := <- buffer

    fmt.Printf("row received: %d %v %d ", row.userId,row.prodUrl,row.myDate)

}


侃侃无极
浏览 128回答 3
3回答

手掌心

func 进程不打印任何内容的原因是您 func main 在行的 for 循环后退出。Next 完成从而退出程序。你需要做几件事。在 for 循环之后添加对 close 的调用以指示结束向缓冲通道添加消息,否则可能导致死锁。所以调用 close(bufferChan)使用 range 在您的 func 进程中迭代通道。将额外的通道传递给进程以了解它何时完成,以便 main 可以等到进程完成。看看下面的代码片段,例如:package mainimport "fmt"func main() {&nbsp; &nbsp; bufferChan := make(chan int, 1000)&nbsp; &nbsp; done := make(chan bool)&nbsp; &nbsp; go process(bufferChan, done)&nbsp; &nbsp; for i := 0; i < 100; i++ {&nbsp; &nbsp; &nbsp; &nbsp; bufferChan <- i&nbsp; &nbsp; }&nbsp; &nbsp; close(bufferChan)&nbsp; &nbsp; select {&nbsp; &nbsp; case <-done:&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println("Done")&nbsp; &nbsp; }}func process(c chan int, done chan bool) {&nbsp; &nbsp; for s := range c {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(s)&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp; &nbsp; done <- true}

qq_遁去的一_1

您的 main 函数退出,因此整个程序结束。它应该等待处理结束。此外,进程函数应该使用 range 关键字在通道上循环。工作解决方案的脚手架如下所示:package mainimport "fmt"func process(input chan int, done chan struct{}) {&nbsp; &nbsp; for i := range input {&nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(i)&nbsp; &nbsp; }&nbsp; &nbsp; done <- struct{}{}}func main() {&nbsp; &nbsp; input := make(chan int)&nbsp; &nbsp; done := make(chan struct{})&nbsp; &nbsp; go process(input, done)&nbsp; &nbsp; for i := 1; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; input <- i&nbsp; &nbsp; }&nbsp; &nbsp; close(input)&nbsp; &nbsp; <-done}

汪汪一只猫

我相信您正在寻找io.pipe()go API,它在写入器和读取器之间创建同步内存管道。这里没有缓冲。它可用于将需要 的代码io.Reader与需要 的代码连接起来io.Writer。在您的情况下,io.PipeWriter代码是“从数据库中读取值”,而“io.PipeReader”是“将值写入屏幕”的代码。这里是一个没有任何缓冲区的流数据示例,即bytes.Buffer.// Set up the pipe to write data directly into the Reader.pr, pw := io.Pipe()// Write JSON-encoded data to the Writer end of the pipe.// Write in a separate concurrent goroutine, and remember// to Close the PipeWriter, to signal to the paired PipeReader// that we’re done writing.go func() {&nbsp; err := json.NewEncoder(pw).Encode(&v)&nbsp; pw.Close()}()// Send the HTTP request. Whatever is read from the Reader// will be sent in the request body.// As data is written to the Writer, it will be available// to read from the Reader.resp, err := http.Post(“example.com”, “application/json”, pr)参考:https://medium.com/stupid-gopher-tricks/streaming-data-in-go-without-buffering-3285ddd2a1e5
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go