解析模块时的代码

来源:2-4 解析模块的实现

慕勒3157497

2018-07-19 21:09

package main

import (
   "fmt"
   "strings"
   "time"
   "os"
   "bufio"
   "io"
   "regexp"
   "log"
   "strconv"
)
// interface
type Reader interface {
   Read(rc chan string)
}

type Writer interface {
   Write(wc chan interface{})
}

type LogProcess struct {
   rc  chan string   // in file get message
   wc  chan interface{}   // out message in writer
    read Reader
    write Writer
}

type Message struct {
   IP string
   Logtime time.Time
   Url  string
   Code int
   Length float64
   Refer  string
   Client string
}
/*
   1. 从 channel 中读取每行日志数据
   2. 正则提取所需的数据
   3. 写入到 writer channel
 */
func (l *LogProcess) Process()  {
   /*
     nginx log format
     192.168.252.210 - - [03/Nov/2016:16:56:47 +0800] "POST /jsrpc.php?output=json-rpc HTTP/1.1" 200 149 "http://ip:port/zabbix.php?action=dashboard.view" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36(KHTML, like Gecko) Maxthon/4.9.3.1000 Chrome/39.0.2146.0 Safari/537.36"

     grok format
     (?m)(?<ip>[\d+.]+)\s+(?<drop>[^\[]+)\s+\[(?<logtime>[^\]]+)\]\s+\"(?<url>[^"]+)\"\s+(?<code>\d+)\s+(?<length>\d+)\s+\"(?<refer>[^"]+)\"\s+\"(?<client>[^"]+)

   */
   // 预格式化
   rex := regexp.MustCompile(`([\d+.]+)\s+([^\[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d+)\s+(\d+)\s+\"([^"]+)\"\s+\"([^"]+)`)
   // setting time zone
   loc, _ := time.LoadLocation("Asia/Shanghai")
   for v := range l.rc {
      // 利用正则分组的原则,将提取的字段分组
      ret := rex.FindStringSubmatch(v)

      if len(ret) != 9 {
         log.Println("match is error", v)
         continue
      }
      //init message struct
      message := &Message{}
      // log fromat is  03/Nov/2016:16:56:47 +0800
      // RFC1123Z    = "Mon, 02 Jan 2006 15:04:05 -0700" // RFC1123 with numeric zone
      // 我在这里必须要使用这个时间戳,RFC1123Z 格式 不然出错信息如下
      // 2018/07/19 21:00:09 ParseInLocation fail: parsing time "03/Nov/2016:16:56:47 +0800" as "02/Jan/2006:15:04:05 +0000": cannot parse "800" as " +0000" 03/Nov/2016:16:56:47 +0800
      //
      t, err := time.ParseInLocation("02/Jan/2006:15:04:05 -0700", ret[3], loc)
      if err != nil {
         log.Println("ParseInLocation fail:", err.Error(), ret[3])
      }
      message.Logtime = t
      message.IP = ret[1]
      // 这里偷懒了
      message.Code, _ = strconv.Atoi(ret[5])
      message.Length, _ = strconv.ParseFloat(ret[6], 64)
      message.Client = ret[8]
      message.Refer = ret[7]
      message.Url = ret[4]

      l.wc <- message
   }
}
type ReadFromFile struct {
   path          string    // file storage path
}
/*
   1. 读取模块
      a. 打开文件
      b. 从文件 末尾 开始逐行读取
 */
func (r *ReadFromFile) Read(rc chan string){
   // line := "string in message"

   file, err := os.Open(r.path)
   if err != nil {
      panic(fmt.Sprintf("open file error: %s", err.Error()))
   }
   defer file.Close()

   // 从文件末尾开始逐行读取
   file.Seek(0, 2)
   rd := bufio.NewReader(file)

   for {
      // why is not use method rd.ReadString()
      line, err := rd.ReadString('\n')
      if err != nil {
         if err == io.EOF {
            time.Sleep(500 * time.Microsecond)
            continue
         }
         panic(fmt.Sprintf("ReadString error: %s", err.Error()))
      }
      //rc <- line[:len(line)-1]
      rc <- strings.TrimSpace(line)
   }

}


type WriterToinfluxDB struct {
   influxDBDsn   string    // influxDB dsn
}

func (w *WriterToinfluxDB) Write(wc chan interface{}) {

   for v := range wc {
      fmt.Println(v)
   }
}



func main() {
   r := &ReadFromFile{
      path: "/tmp/access.log",
   }

   w := &WriterToinfluxDB{
      influxDBDsn: "username$password",
   }

   lp := &LogProcess{
      rc: make(chan string),
      wc: make(chan interface{}),
      read: r,
      write: w,
   }

   go lp.read.Read(lp.rc)
   go lp.Process()
   go lp.write.Write(lp.wc)


   time.Sleep( 30 * time.Second)

}


写回答 关注

1回答

  • 慕梦前来
    2021-09-19 10:32:51

    这么多的代码你是要干嘛呢

Go并发编程案例解析

课程带你通过一个真实的线上日志监控系统学习Golang以及并发的编程思想。

15207 学习 · 53 问题

查看课程

相似问题