package main import ( "fmt" "strings" "time" "os" "bufio" "io" "regexp" "log" "strconv" ) // interface type Reader interface { Read(rc chan string) } type Writer interface { Write(wc chan interface{}) } type LogProcess struct { rc chan string // in file get message wc chan interface{} // out message in writer read Reader write Writer } type Message struct { IP string Logtime time.Time Url string Code int Length float64 Refer string Client string } /* 1. 从 channel 中读取每行日志数据 2. 正则提取所需的数据 3. 写入到 writer channel */ func (l *LogProcess) Process() { /* nginx log format 192.168.252.210 - - [03/Nov/2016:16:56:47 +0800] "POST /jsrpc.php?output=json-rpc HTTP/1.1" 200 149 "http://ip:port/zabbix.php?action=dashboard.view" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36(KHTML, like Gecko) Maxthon/4.9.3.1000 Chrome/39.0.2146.0 Safari/537.36" grok format (?m)(?<ip>[\d+.]+)\s+(?<drop>[^\[]+)\s+\[(?<logtime>[^\]]+)\]\s+\"(?<url>[^"]+)\"\s+(?<code>\d+)\s+(?<length>\d+)\s+\"(?<refer>[^"]+)\"\s+\"(?<client>[^"]+) */ // 预格式化 rex := regexp.MustCompile(`([\d+.]+)\s+([^\[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d+)\s+(\d+)\s+\"([^"]+)\"\s+\"([^"]+)`) // setting time zone loc, _ := time.LoadLocation("Asia/Shanghai") for v := range l.rc { // 利用正则分组的原则,将提取的字段分组 ret := rex.FindStringSubmatch(v) if len(ret) != 9 { log.Println("match is error", v) continue } //init message struct message := &Message{} // log fromat is 03/Nov/2016:16:56:47 +0800 // RFC1123Z = "Mon, 02 Jan 2006 15:04:05 -0700" // RFC1123 with numeric zone // 我在这里必须要使用这个时间戳,RFC1123Z 格式 不然出错信息如下 // 2018/07/19 21:00:09 ParseInLocation fail: parsing time "03/Nov/2016:16:56:47 +0800" as "02/Jan/2006:15:04:05 +0000": cannot parse "800" as " +0000" 03/Nov/2016:16:56:47 +0800 // t, err := time.ParseInLocation("02/Jan/2006:15:04:05 -0700", ret[3], loc) if err != nil { log.Println("ParseInLocation fail:", err.Error(), ret[3]) } message.Logtime = t message.IP = ret[1] // 这里偷懒了 message.Code, _ = strconv.Atoi(ret[5]) message.Length, _ = strconv.ParseFloat(ret[6], 64) message.Client = ret[8] message.Refer = ret[7] message.Url = ret[4] l.wc <- message } } type ReadFromFile struct { path string // file storage path } /* 1. 读取模块 a. 打开文件 b. 从文件 末尾 开始逐行读取 */ func (r *ReadFromFile) Read(rc chan string){ // line := "string in message" file, err := os.Open(r.path) if err != nil { panic(fmt.Sprintf("open file error: %s", err.Error())) } defer file.Close() // 从文件末尾开始逐行读取 file.Seek(0, 2) rd := bufio.NewReader(file) for { // why is not use method rd.ReadString() line, err := rd.ReadString('\n') if err != nil { if err == io.EOF { time.Sleep(500 * time.Microsecond) continue } panic(fmt.Sprintf("ReadString error: %s", err.Error())) } //rc <- line[:len(line)-1] rc <- strings.TrimSpace(line) } } type WriterToinfluxDB struct { influxDBDsn string // influxDB dsn } func (w *WriterToinfluxDB) Write(wc chan interface{}) { for v := range wc { fmt.Println(v) } } func main() { r := &ReadFromFile{ path: "/tmp/access.log", } w := &WriterToinfluxDB{ influxDBDsn: "username$password", } lp := &LogProcess{ rc: make(chan string), wc: make(chan interface{}), read: r, write: w, } go lp.read.Read(lp.rc) go lp.Process() go lp.write.Write(lp.wc) time.Sleep( 30 * time.Second) }
这么多的代码你是要干嘛呢