将 csv.Reader() 用于“chan 字符串”的有效方法

我有一个“chan string”,其中每个条目都是一个 CSV 日志行,我想将其转换为列“[]string”,目前我正在(效率低下)创建一个 csv.NewReader(strings.NewReader(i) ) 对于每个项目,这看起来比实际需要的工作要多得多:


for i := range feederChan {

    r := csv.NewReader(strings.NewReader(i))

    a, err := r.Read()

    if err != nil {

         // log error...

         continue

    }

    // then do stuff with 'a'

    // ...

}

所以,如果有更有效的方法来做到这一点,我真的很感激分享,比如创建 csv.Reader 一次,然后以某种方式向它提供 chan 内容(将“chan”内容流式传输到实现“io.Reader”接口的东西?)。



胡子哥哥
浏览 92回答 2
2回答

梵蒂冈之花

使用以下内容将字符串通道转换为读取器:type chanReader struct {&nbsp; &nbsp; c&nbsp; &nbsp;chan string&nbsp; &nbsp; buf string}func (r *chanReader) Read(p []byte) (int, error) {&nbsp; &nbsp; // Fill the buffer when we have no data to return to the caller&nbsp; &nbsp; if len(r.buf) == 0 {&nbsp; &nbsp; &nbsp; &nbsp; var ok bool&nbsp; &nbsp; &nbsp; &nbsp; r.buf, ok = <-r.c&nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Return eof on channel closed&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return 0, io.EOF&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; n := copy(p, r.buf)&nbsp; &nbsp; r.buf = r.buf[n:]&nbsp; &nbsp; return n, nil}像这样使用它:r := csv.NewReader(&chanReader{c: feederChan})for {&nbsp; &nbsp; a, err := r.Read()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; // handle error, break out of loop&nbsp; &nbsp; }&nbsp; &nbsp; // do something with a}如果应用程序假定换行符分隔从通道接收的值,则将换行符附加到每个接收到的值:&nbsp; &nbsp; &nbsp; &nbsp; ...&nbsp; &nbsp; &nbsp; &nbsp; var ok bool&nbsp; &nbsp; &nbsp; &nbsp; r.buf, ok = <-r.c&nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Return eof on channel closed&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return 0, io.EOF&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; r.buf += "\n"&nbsp; &nbsp; &nbsp; &nbsp; ...复制+= "\n"字符串。如果这不能满足应用程序的效率要求,则引入一个新字段来管理行分隔符。type chanReader struct {&nbsp; &nbsp; c chan string&nbsp; // source of lines&nbsp; &nbsp; buf string&nbsp; &nbsp; &nbsp;// the current line&nbsp; &nbsp; nl bool&nbsp; &nbsp; &nbsp; &nbsp; // true if line separator is pending}func (r *chanReader) Read(p []byte) (int, error) {&nbsp; &nbsp; // Fill the buffer when we have no data to return to the caller&nbsp; &nbsp; if len(r.buf) == 0 && !r.nl {&nbsp; &nbsp; &nbsp; &nbsp; var ok bool&nbsp; &nbsp; &nbsp; &nbsp; r.buf, ok = <-r.c&nbsp; &nbsp; &nbsp; &nbsp; if !ok {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Return eof on channel closed&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return 0, io.EOF&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; r.nl = true&nbsp; &nbsp; }&nbsp; &nbsp; // Return data if we have it&nbsp; &nbsp; if len(r.buf) > 0 {&nbsp; &nbsp; &nbsp; &nbsp; n := copy(p, r.buf)&nbsp; &nbsp; &nbsp; &nbsp; r.buf = r.buf[n:]&nbsp; &nbsp; &nbsp; &nbsp; return n, nil&nbsp; &nbsp; }&nbsp; &nbsp; // No data, return the line separator&nbsp; &nbsp; n := copy(p, "\n")&nbsp; &nbsp; r.nl = n == 0&nbsp; &nbsp; return n, nil}另一种方法是按照问题评论中的建议,使用 io.Pipe 和 goroutine 将通道转换为 io.Reader。这种方法的第一步是:var nl = []byte("\n")func createChanReader(c chan string) io.Reader {&nbsp; &nbsp; r, w := io.Pipe()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer w.Close()&nbsp; &nbsp; &nbsp; &nbsp; for s := range c {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; io.WriteString(w, s)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; w.Write(nl)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; return r}像这样使用它:r := csv.NewReader(createChanReader(feederChan))for {&nbsp; &nbsp; a, err := r.Read()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; // handle error, break out of loop&nbsp; &nbsp; }&nbsp; &nbsp; // do something with a}当应用程序在将管道读取到 EOF 之前退出循环时, io.Pipe 解决方案的第一遍会泄漏 goroutine。应用程序可能会提前中断,因为 CSV 阅读器检测到语法错误,应用程序由于程序员错误或任何其他原因而崩溃。要修复 goroutine 泄漏,请在写入错误时退出写入 goroutine,并在完成读取后关闭管道读取器。var nl = []byte("\n")func createChanReader(c chan string) *io.PipeReader {&nbsp; &nbsp; r, w := io.Pipe()&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer w.Close()&nbsp; &nbsp; &nbsp; &nbsp; for s := range c {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if _, err := io.WriteString(w, s); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if _, err := w.Write(nl); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; return r}像这样使用它:cr := createChanReader(feederChan)defer cr.Close() // Required for goroutine cleanupr := csv.NewReader(cr)for {&nbsp; &nbsp; a, err := r.Read()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; // handle error, break out of loop&nbsp; &nbsp; }&nbsp; &nbsp; // do something with a}

凤凰求蛊

我最终还是使用了 io.Pipe() “正如 mh-cbon 提到的那样”,它更简单并且看起来更有效(如下所述):rp, wp := io.Pipe()go func() {    defer wp.Close()    for i := range feederChan {        fmt.Fprintln(wp, i)    }}()r := csv.NewReader(rp)for { // keep reading    a, err := r.Read()    if err == io.EOF {        break    }    // do stuff with 'a'    // ...}io.Pipe() 是同步的,并且应该相当高效:它将数据从写入器通过管道传输到读取器;我将 csv.NewReader() 提供给读者部分,并创建了一个 goroutine,将 chan 写入到作者部分。多谢。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go