为什么我照着敲的,去掉sleep后,它监听哪个9193端口,但好像不是阻塞式的,执行go run log_process.go立刻就退出了

来源:2-8 运行状况监控

坐着看太阳敲代码

2018-05-09 21:48

https://img.mukewang.com/5af2fb980001a2f225341520.jpg


为什么我照着敲的,去掉sleep后,它监听哪个9193端口,但好像不是阻塞式的,执行go run log_process.go

写回答 关注

2回答

  • 坐着看太阳敲代码
    2018-05-09 21:52:10

    大神,上面那个是log_process.go的代码

    坐着看太阳敲... 回复麦可同学

    这么奇怪,是在mac配置的go环境。那我自己写个测试代码,看看行不行。

    2018-05-13 11:50:21

    共 2 条回复 >

  • 坐着看太阳敲代码
    2018-05-09 21:51:06
    package main
    
    import (
       "strings"
       "fmt"
       "time"
       "os"
       "bufio"
       "io"
       "regexp"
       "log"
       "strconv"
       "net/url"
       "flag"
       "net/http"
       //"influxdb-master/client/v2"
       "github.com/influxdata/influxdb/client/v2"
    
       "encoding/json"
    )
    
    // 系统状态监控
    type SystemInfo struct {
       HandleLine   int     `json:"handleLine"`   // 总处理日志行数
       Tps          float64 `json:"tps"`          // 系统吞出量
       ReadChanLen  int     `json:"readChanLen"`  // read channel 长度
       WriteChanLen int     `json:"writeChanLen"` // write channel 长度
       RunTime      string  `json:"runTime"`      // 运行总时间
       ErrNum       int     `json:"errNum"`       // 错误数
    }
    
    const (
       TypeHandleLine = 0
       TypeErrNum     = 1
    )
    
    var TypeMonitorChan = make(chan int, 200)
    
    type Monitor struct {
       startTime time.Time
       data      SystemInfo
       tpsSli    []int
    }
    
    func (m *Monitor) start(lp *LogProcess) {
    
       go func() {
          for n := range TypeMonitorChan {
             switch n {
             case TypeErrNum:
                m.data.ErrNum += 1
             case TypeHandleLine:
                m.data.HandleLine += 1
             }
          }
       }()
    
       ticker := time.NewTicker(time.Second * 5)
       go func() { //协程
          for {
             <-ticker.C
             m.tpsSli = append(m.tpsSli, m.data.HandleLine)
             if len(m.tpsSli) > 2 {
                m.tpsSli = m.tpsSli[1:]
             }
          }
       }()
    
       http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
          m.data.RunTime = time.Now().Sub(m.startTime).String()
          m.data.ReadChanLen = len(lp.rc)
          m.data.WriteChanLen = len(lp.wc)
    
          if len(m.tpsSli) >= 2 {
             m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5
          }
    
          ret, _ := json.MarshalIndent(m.data, "", "\t")
          io.WriteString(writer, string(ret))
       })
    
       http.ListenAndServe(":9193", nil)
    }
    
    type Reader interface {
       Read(rc chan []byte)
    }
    type Writer interface {
       Write(wc chan *Message)
    }
    type LogProcess struct {
       rc chan []byte
       wc chan *Message
       //path        string //读取文件的路径
       //influxDBDsn string //influx data source
       read  Reader
       write Writer
    }
    
    type ReadFromToInFile struct {
       path string //获取文件路径
    }
    type WriteFromTofluxDB struct {
       influxDBDsn string //influx data source
    }
    
    type Message struct {
       TimeLocal                    time.Time
       BytesSent                    int
       Path, Method, Scheme, Status string
       UpstreamTime, RequestTime    float64
    }
    
    func (r *ReadFromToInFile) Read(rc chan []byte) {
       //读取模块
       //打开文件
    
       f, err := os.Open(r.path) //返回file类型的结构体指针,因为这个结构体只能逐个读取这个字符,所以下面会换成newreader
    
       if err != nil {
          panic(fmt.Sprint("open file error:%s", err.Error()))
       }
    
       //从文件末尾开始逐行读取文件内容
       f.Seek(0, 2)             //把字符指针移动到末尾,参数2就是移动到末尾的意思
       rd := bufio.NewReader(f) //这样它会返回新的NewReader的类型指针,以至于可以使用更多的方法,逐行读取
    
       for {
          line, err := rd.ReadBytes('\n') //逐行读取;又或者是读取直到遇见换行符
          if err == io.EOF { //EOf代表是文件末尾的错误
             time.Sleep(500 * time.Millisecond)
             continue
          } else if err != nil {
             panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
          }
          //print(line)
          TypeMonitorChan <- TypeHandleLine
          rc <- line[:len(line)-1]
       }
    
       //rc<-line
       //
       //line := "message"
       //rc <- line
    }
    func (w *WriteFromTofluxDB) Write(wc chan *Message) {
       //写入模块
    
       infSli := strings.Split(w.influxDBDsn, "@")
       c, err := client.NewHTTPClient(client.HTTPConfig{
          Addr:     infSli[0],
          Username: infSli[1],
          Password: infSli[2],
       })
       if err != nil {
          log.Fatal(err)
       }
       defer c.Close()
    
       // Create a new point batch
       bp, err := client.NewBatchPoints(client.BatchPointsConfig{
          Database:  infSli[3],
          Precision: infSli[4],
       })
       if err != nil {
          log.Fatal(err)
       }
    
       //fmt.Println(<-wc)
       for v := range wc {
          // Create a point and add to batch
          tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
          fields := map[string]interface{}{
             "UpstreamTime": v.UpstreamTime,
             "RequestTime":  v.RequestTime,
             "BytesSent":    v.BytesSent,
          }
    
          pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
          if err != nil {
             log.Fatal(err)
          }
          bp.AddPoint(pt)
    
          // Write the batch
          if err := c.Write(bp); err != nil {
             log.Fatal(err)
          }
          log.Println("write success")
          // Close client resources
          //if err := c.Close(); err != nil {
          // log.Fatal(err)
          //}
          //fmt.Println(v)
       }
    
    }
    
    //func (l *LogProcess) ReadFromFile() {
    // //读取模块
    // line := "message"
    // l.rc <- line
    //}
    
    func (l *LogProcess) Proccess() {
       // 解析模块
    
       /**
       172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
       */
    
       r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
    
       loc, _ := time.LoadLocation("Asia/Shanghai")
       for v := range l.rc {
          ret := r.FindStringSubmatch(string(v))
    
          //log.Println(ret, len(ret))
          if len(ret) != 14 {
             TypeMonitorChan <- TypeErrNum
             log.Println("FindStringSubmatch fail:", string(v))
             continue
          }
    
          message := &Message{}
          t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
          if err != nil {
             TypeMonitorChan <- TypeErrNum
             log.Println("ParseInLocation fail:", err.Error(), ret[4])
             continue
          }
          message.TimeLocal = t
    
          byteSent, _ := strconv.Atoi(ret[8])
          message.BytesSent = byteSent
    
          // GET /foo?query=t HTTP/1.0
          reqSli := strings.Split(ret[6], " ")
          if len(reqSli) != 3 {
             TypeMonitorChan <- TypeErrNum
             log.Println("strings.Split fail", ret[6])
             continue
          }
          message.Method = reqSli[0]
    
          u, err := url.Parse(reqSli[1])
          if err != nil {
             log.Println("url parse fail:", err)
             TypeMonitorChan <- TypeErrNum
             continue
          }
          message.Path = u.Path
    
          message.Scheme = ret[5]
          message.Status = ret[7]
    
          upstreamTime, _ := strconv.ParseFloat(ret[12], 64)
          requestTime, _ := strconv.ParseFloat(ret[13], 64)
          message.UpstreamTime = upstreamTime
          message.RequestTime = requestTime
    
          l.wc <- message
       }
    }
    
    //func (l *LogProcess) WriteToInfluxDB() {
    // //写入模块
    // fmt.Println(<-l.wc)
    // fmt.Println(<-l.wc)
    //}
    
    func main() {
       var path, influxDsn string
       flag.StringVar(&path, "path", "./access.log", "read file path")
    
       flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@root@101@imooc@s", "influx data source")
       flag.Parse()
    
       r := &ReadFromToInFile{
          path: path,
       }
       w := &WriteFromTofluxDB{
          influxDBDsn: influxDsn,
       }
       lp := &LogProcess{
          rc:    make(chan []byte, 200),
          wc:    make(chan *Message, 200),
          read:  r,
          write: w,
       }
       //go lp.ReadFromFile() //lp是引用类型,出于性能考虑这样做。本来这里应该这样写 go (*lp).ReadFromFile(),
       //// 但goland编辑器对此优化,写成go lp.ReadFromFile(),也能达到上面一样的效果。知道他是个结构体。更利于阅读
       //go lp.Proccess()
       //go lp.WriteToInfluxDB()
       //time.Sleep(1 * time.Second)
    
       go lp.read.Read(lp.rc) //lp是引用类型,出于性能考虑这样做。本来这里应该这样写 go (*lp).ReadFromFile(),
       // 但goland编辑器对此优化,写成go lp.ReadFromFile(),也能达到上面一样的效果。知道他是个结构体。更利于阅读
       for i := 0; i < 2; i++ {
          go lp.Proccess()
       }
       for i := 0; i < 4; i++ {
          go lp.write.Write(lp.wc)
       }
    
       //
       //m := &Monitor{
       // startTime: time.Now(),
       // data:      SystemInfo{},
       //}
       //m.start(lp)
    
       m := &Monitor{
          startTime: time.Now(),
          data:      SystemInfo{},
       }
       m.start(lp)
       //time.Sleep(30 * time.Second)
    }

Go并发编程案例解析

课程带你通过一个真实的线上日志监控系统学习Golang以及并发的编程思想。

15253 学习 · 56 问题

查看课程

相似问题