坐着看太阳敲代码
2018-05-09 21:48
为什么我照着敲的,去掉sleep后,它监听哪个9193端口,但好像不是阻塞式的,执行go run log_process.go
大神,上面那个是log_process.go的代码
package main import ( "strings" "fmt" "time" "os" "bufio" "io" "regexp" "log" "strconv" "net/url" "flag" "net/http" //"influxdb-master/client/v2" "github.com/influxdata/influxdb/client/v2" "encoding/json" ) // 系统状态监控 type SystemInfo struct { HandleLine int `json:"handleLine"` // 总处理日志行数 Tps float64 `json:"tps"` // 系统吞出量 ReadChanLen int `json:"readChanLen"` // read channel 长度 WriteChanLen int `json:"writeChanLen"` // write channel 长度 RunTime string `json:"runTime"` // 运行总时间 ErrNum int `json:"errNum"` // 错误数 } const ( TypeHandleLine = 0 TypeErrNum = 1 ) var TypeMonitorChan = make(chan int, 200) type Monitor struct { startTime time.Time data SystemInfo tpsSli []int } func (m *Monitor) start(lp *LogProcess) { go func() { for n := range TypeMonitorChan { switch n { case TypeErrNum: m.data.ErrNum += 1 case TypeHandleLine: m.data.HandleLine += 1 } } }() ticker := time.NewTicker(time.Second * 5) go func() { //协程 for { <-ticker.C m.tpsSli = append(m.tpsSli, m.data.HandleLine) if len(m.tpsSli) > 2 { m.tpsSli = m.tpsSli[1:] } } }() http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) { m.data.RunTime = time.Now().Sub(m.startTime).String() m.data.ReadChanLen = len(lp.rc) m.data.WriteChanLen = len(lp.wc) if len(m.tpsSli) >= 2 { m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5 } ret, _ := json.MarshalIndent(m.data, "", "\t") io.WriteString(writer, string(ret)) }) http.ListenAndServe(":9193", nil) } type Reader interface { Read(rc chan []byte) } type Writer interface { Write(wc chan *Message) } type LogProcess struct { rc chan []byte wc chan *Message //path string //读取文件的路径 //influxDBDsn string //influx data source read Reader write Writer } type ReadFromToInFile struct { path string //获取文件路径 } type WriteFromTofluxDB struct { influxDBDsn string //influx data source } type Message struct { TimeLocal time.Time BytesSent int Path, Method, Scheme, Status string UpstreamTime, RequestTime float64 } func (r *ReadFromToInFile) Read(rc chan []byte) { //读取模块 //打开文件 f, err := os.Open(r.path) //返回file类型的结构体指针,因为这个结构体只能逐个读取这个字符,所以下面会换成newreader if err != nil { panic(fmt.Sprint("open file error:%s", err.Error())) } //从文件末尾开始逐行读取文件内容 f.Seek(0, 2) //把字符指针移动到末尾,参数2就是移动到末尾的意思 rd := bufio.NewReader(f) //这样它会返回新的NewReader的类型指针,以至于可以使用更多的方法,逐行读取 for { line, err := rd.ReadBytes('\n') //逐行读取;又或者是读取直到遇见换行符 if err == io.EOF { //EOf代表是文件末尾的错误 time.Sleep(500 * time.Millisecond) continue } else if err != nil { panic(fmt.Sprintf("ReadBytes error:%s", err.Error())) } //print(line) TypeMonitorChan <- TypeHandleLine rc <- line[:len(line)-1] } //rc<-line // //line := "message" //rc <- line } func (w *WriteFromTofluxDB) Write(wc chan *Message) { //写入模块 infSli := strings.Split(w.influxDBDsn, "@") c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: infSli[0], Username: infSli[1], Password: infSli[2], }) if err != nil { log.Fatal(err) } defer c.Close() // Create a new point batch bp, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: infSli[3], Precision: infSli[4], }) if err != nil { log.Fatal(err) } //fmt.Println(<-wc) for v := range wc { // Create a point and add to batch tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status} fields := map[string]interface{}{ "UpstreamTime": v.UpstreamTime, "RequestTime": v.RequestTime, "BytesSent": v.BytesSent, } pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal) if err != nil { log.Fatal(err) } bp.AddPoint(pt) // Write the batch if err := c.Write(bp); err != nil { log.Fatal(err) } log.Println("write success") // Close client resources //if err := c.Close(); err != nil { // log.Fatal(err) //} //fmt.Println(v) } } //func (l *LogProcess) ReadFromFile() { // //读取模块 // line := "message" // l.rc <- line //} func (l *LogProcess) Proccess() { // 解析模块 /** 172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854 */ r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`) loc, _ := time.LoadLocation("Asia/Shanghai") for v := range l.rc { ret := r.FindStringSubmatch(string(v)) //log.Println(ret, len(ret)) if len(ret) != 14 { TypeMonitorChan <- TypeErrNum log.Println("FindStringSubmatch fail:", string(v)) continue } message := &Message{} t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc) if err != nil { TypeMonitorChan <- TypeErrNum log.Println("ParseInLocation fail:", err.Error(), ret[4]) continue } message.TimeLocal = t byteSent, _ := strconv.Atoi(ret[8]) message.BytesSent = byteSent // GET /foo?query=t HTTP/1.0 reqSli := strings.Split(ret[6], " ") if len(reqSli) != 3 { TypeMonitorChan <- TypeErrNum log.Println("strings.Split fail", ret[6]) continue } message.Method = reqSli[0] u, err := url.Parse(reqSli[1]) if err != nil { log.Println("url parse fail:", err) TypeMonitorChan <- TypeErrNum continue } message.Path = u.Path message.Scheme = ret[5] message.Status = ret[7] upstreamTime, _ := strconv.ParseFloat(ret[12], 64) requestTime, _ := strconv.ParseFloat(ret[13], 64) message.UpstreamTime = upstreamTime message.RequestTime = requestTime l.wc <- message } } //func (l *LogProcess) WriteToInfluxDB() { // //写入模块 // fmt.Println(<-l.wc) // fmt.Println(<-l.wc) //} func main() { var path, influxDsn string flag.StringVar(&path, "path", "./access.log", "read file path") flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@root@101@imooc@s", "influx data source") flag.Parse() r := &ReadFromToInFile{ path: path, } w := &WriteFromTofluxDB{ influxDBDsn: influxDsn, } lp := &LogProcess{ rc: make(chan []byte, 200), wc: make(chan *Message, 200), read: r, write: w, } //go lp.ReadFromFile() //lp是引用类型,出于性能考虑这样做。本来这里应该这样写 go (*lp).ReadFromFile(), //// 但goland编辑器对此优化,写成go lp.ReadFromFile(),也能达到上面一样的效果。知道他是个结构体。更利于阅读 //go lp.Proccess() //go lp.WriteToInfluxDB() //time.Sleep(1 * time.Second) go lp.read.Read(lp.rc) //lp是引用类型,出于性能考虑这样做。本来这里应该这样写 go (*lp).ReadFromFile(), // 但goland编辑器对此优化,写成go lp.ReadFromFile(),也能达到上面一样的效果。知道他是个结构体。更利于阅读 for i := 0; i < 2; i++ { go lp.Proccess() } for i := 0; i < 4; i++ { go lp.write.Write(lp.wc) } // //m := &Monitor{ // startTime: time.Now(), // data: SystemInfo{}, //} //m.start(lp) m := &Monitor{ startTime: time.Now(), data: SystemInfo{}, } m.start(lp) //time.Sleep(30 * time.Second) }
Go并发编程案例解析
15253 学习 · 56 问题
相似问题