并发的基础知识
并行和并发的区别
Golang 面向对象
并发编程思路
模块化编程
日志监控系统简易模型
优化内容:
读取比处理和写入速度快, 所以三个步骤可以启用不同数量的协程来处理
基于优化1,需要把 channel 迭代为带 buffer 的用于应对信息累计
package main
import (
"strings"
"fmt"
"time"
"os"
"bufio"
"io"
"regexp"
"log"
"strconv"
"net/url"
"github.com/influxdata/influxdb/client/v2"
"flag"
"net/http"
"encoding/json"
)
type Reader interface {
Read(rc chan []byte)
}
type Writer interface {
Write(wc chan *Message)
}
type ReadFromFile struct {
path string //log file path
}
type WriteToInfluxDB struct {
influxDBDsn string //influx data source
}
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
type LogProcess struct {
rc chan []byte //read channel
wc chan *Message //write channel
read Reader
write Writer
}
type SystemInfo struct {
HandleLine int `json:"handleLine"` //总处理日志行数
Tps float64 `json:"tps"` //系统吞吐量
ReadChanlen int `json:"readChanlen"` //读信道长度
WriteChanlen int `json:"writeChanlen"` //写信道长度
RunTime string `json:"runTime"` //总运行时间
ErrNum int `json:"errNum"` //错误数
}
type Monitor struct {
startTime time.Time
data SystemInfo
tpSli []int
}
const (
TypeHandleLine = 0
TypeErrNum = 1
)
var TypeMonitorChan = make(chan int, 200)
func (m *Monitor) start(lp *LogProcess) {
go func() {
for n := range TypeMonitorChan{
switch n {
case TypeErrNum:
m.data.ErrNum += 1
case TypeHandleLine:
m.data.HandleLine += 1
}
}
}()
ticker := time.NewTicker(5*time.Second)
go func() {
for {
<-ticker.C
m.tpSli = append(m.tpSli, m.data.HandleLine)
if len(m.tpSli) > 2 {
m.tpSli = m.tpSli[1:]
}
}
}()
http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
m.data.RunTime = time.Now().Sub(m.startTime).String()
m.data.ReadChanlen = len(lp.rc)
m.data.WriteChanlen = len(lp.wc)
if len(m.tpSli) >= 2 {
m.data.Tps = float64(m.tpSli[1]-m.tpSli[0]) / 5
}
ret, _ := json.MarshalIndent(m.data, "", "\t")
io.WriteString(writer, string(ret))
})
http.ListenAndServe(":9193", nil)
}
func (r *ReadFromFile) Read(rc chan []byte) {
//log reading module
//打开文件
f, err := os.Open(r.path)
if err!= nil {
TypeMonitorChan <- TypeErrNum
panic(fmt.Sprintf("open file error:%s", err.Error()))
}
//从文件末尾开始逐行读取文件内容
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500*time.Millisecond)
continue
} else if err != nil {
TypeMonitorChan <- TypeErrNum
panic(fmt.Sprintf("ReadBytes error:%s", err.Error()))
}
TypeMonitorChan <- TypeHandleLine
rc <- line[:len(line)-1]
}
}
func (w *WriteToInfluxDB) Write(wc chan *Message) {
//log writing module
infSli := strings.Split(w.influxDBDsn, "@")
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: infSli[0],
Username: infSli[1],
Password: infSli[2],
})
if err != nil {
TypeMonitorChan <- TypeErrNum
log.Fatal(err)
}
defer c.Close()
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: infSli[3],
Precision: infSli[4],
})
if err != nil {
TypeMonitorChan <- TypeErrNum
log.Fatal(err)
}
for v := range wc {
// Create a point and add to batch
tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
fields := map[string]interface{}{
"UpstreamTime": v.UpstreamTime,
"RequestTime": v.RequestTime,
"BytesSent": v.BytesSent,
}
pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
TypeMonitorChan <- TypeErrNum
log.Fatal(err)
}
// Close client resources
if err := c.Close(); err != nil {
TypeMonitorChan <- TypeErrNum
log.Fatal(err)
}
log.Println("Write success")
}
}
func (l *LogProcess) Process() {
//log parsing module
r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
loc, _ := time.LoadLocation("Asia/Shanghai")
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
if len(ret) != 13 {
TypeMonitorChan <- TypeErrNum
log.Println("FindStringSubMatch fail:", string(v))
continue
}
message := &Message{}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[4], loc)
if err != nil {
TypeMonitorChan <- TypeErrNum
log.Println("ParseInLocation fail", err.Error(), ret[4])
continue
}
message.TimeLocal = t
byteSent, _ := strconv.Atoi(ret[7])
message.BytesSent = byteSent
reqSli := strings.Split(ret[5], " ")
if len(reqSli) != 3 {
TypeMonitorChan <- TypeErrNum
log.Println("strings.Split fail", ret[5])
continue
}
message.Method = reqSli[0]
u, err := url.Parse(reqSli[1])
if err != nil {
log.Println("url parse fail", err)
continue
}
message.Path = u.Path
//message.Scheme = ret[5]
message.Status = ret[6]
upstreamTime, _ := strconv.ParseFloat(ret[11], 64)
requestTime, _ := strconv.ParseFloat(ret[12], 64)
message.UpstreamTime = upstreamTime
message.RequestTime = requestTime
l.wc <- message
}
//127.0.0.1 - - [14/May/2018:14:52:45 +0800] "GET /foo?query=t HTTP/1.1" 404 27 "-" "KeepAliveClient" "-" 1.005 1.854
//([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)
}
func main() {
var path, influxDsn string
flag.StringVar(&path, "path", "/usr/local/var/log/nginx/sd-mac-access-8081.log", "read file path")
flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@wangxu@wangxu26@imooc@s", "influx data source")
flag.Parse()
r := &ReadFromFile{
path:path,
}
w := &WriteToInfluxDB{
influxDBDsn:influxDsn,
}
lp := &LogProcess{
//使用带缓存的channel,防止阻塞
rc:make(chan []byte, 200),
wc:make(chan *Message, 200),
read:r,
write:w,
}
//开两个读goroutines
for i := 0; i < 2; i++ {
go lp.read.Read(lp.rc)
}
go lp.Process()
//读比写要快,多开几个写的goroutines
for i := 0; i < 4; i++ {
go lp.write.Write(lp.wc)
}
//time.Sleep(30*time.Second)
m := Monitor{
startTime:time.Now(),
data:SystemInfo{},
}
m.start(lp)
}