package main import ( "fmt" "strings" "time" "os" "bufio" "io" ) // interface type Reader interface { Read(rc chan string) } type Writer interface { Write(wc chan string) } type LogProcess struct { rc chan string // in file get message wc chan string // out message in writer read Reader write Writer } func (l *LogProcess) Process() { for v := range l.rc { l.wc <- strings.ToUpper(string(v)) } } type ReadFromFile struct { path string // file storage path } /* 1. 读取模块 a. 打开文件 b. 从文件 末尾 开始逐行读取 */ func (r *ReadFromFile) Read(rc chan string){ // line := "string in message" file, err := os.Open(r.path) if err != nil { panic(fmt.Sprintf("open file error: %s", err.Error())) } defer file.Close() // 从文件末尾开始逐行读取 file.Seek(0, 2) rd := bufio.NewReader(file) for { // why is not use method rd.ReadString() line, err := rd.ReadString('\n') if err != nil { if err == io.EOF { time.Sleep(500 * time.Microsecond) continue } panic(fmt.Sprintf("ReadString error: %s", err.Error())) } //rc <- line[:len(line)-1] rc <- strings.TrimSpace(line) } } type WriterToinfluxDB struct { influxDBDsn string // influxDB dsn } func (w *WriterToinfluxDB) Write(wc chan string) { for v := range wc { fmt.Println(v) } } func main() { r := &ReadFromFile{ path: "/tmp/access.log", } w := &WriterToinfluxDB{ influxDBDsn: "username$password", } lp := &LogProcess{ rc: make(chan string), wc: make(chan string), read: r, write: w, } go lp.read.Read(lp.rc) go lp.Process() go lp.write.Write(lp.wc) time.Sleep( 30 * time.Second) }
嘎哈哈哈哈哈哈哈哈