 
		xyang4
2019-01-14 00:43
package main
import (
   "strings"
   "fmt"
   "time"
   "os"
   "bufio"
   "io"
)
type Reader3 interface {
   read(rc chan []byte)
}
type Writer3 interface {
   write(wc chan string)
}
type ReadFromFile3 struct {
   path string
}
type WriteToInfluxDB3 struct {
   influxDBDsn string
}
type LogProcess3 struct {
   rc     chan []byte
   wc     chan string
   reader Reader3
   writer Writer3
}
// 1 读取模块
func (r *ReadFromFile3) read(rc chan []byte) {
   fmt.Println("Begin read File")
   file, e := os.OpenFile(r.path, os.O_WRONLY|os.O_APPEND, os.ModePerm)
   if e != nil {
      panic(fmt.Sprintf("open file error:%s", e.Error()))
   }
   // 从文件末尾开始逐行读取文件内容
   file.Seek(0, 2)
   rd := bufio.NewReader(file)
   for {
      line, err := rd.ReadBytes('\n')
      if err != io.EOF { // 到结尾
         fmt.Println(line)
         time.Sleep(500 * time.Millisecond)
         continue
      } else if err != nil {
         panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
      }
      fmt.Println("> ",line)
      // rc <- line
      rc <- line[:len(line)-1]
   }
}
func (l *LogProcess3) process() {
   // 解析模块
   
   for v := range l.rc {
      l.wc <- strings.ToUpper(string(v))
   }
}
// 3 写入模块
func (w *WriteToInfluxDB3) write(wc chan string) {
   fmt.Println(">> ",*&wc) // >>  0xc4200760c0 这里为什么是地址呢?
   for v := range wc {
      fmt.Printf(v)
   }
}
func main() {
   read := &ReadFromFile3{
      path: "/Users/xyang/go_code/src/xyang.com/logcollect/data/access.log",
      //path: "data/access.log",
   }
   writer := &WriteToInfluxDB3{
      influxDBDsn: "username=?&password=?",
   }
   lp := &LogProcess3{
      rc:     make(chan []byte),
      wc:     make(chan string),
      reader: read,
      writer: writer,
   }
   go lp.reader.read(lp.rc)
   go lp.process()
   go lp.writer.write(lp.wc)
   time.Sleep(30 * time.Second)
} 
				file, e := os.OpenFile(r.path, os.O_WRONLY|os.O_APPEND, os.ModePerm)
改为:
file, e := os.Open(r.path)
if err != io.EOF 改为: if err == io.EOF
Go并发编程案例解析
15279 学习 · 56 问题
相似问题