go+influxdb+grafana 制作日志监控系统

最后更新于:2022-04-02 03:01:03

[TOC] ## 概述 安装 influxdb 数据库 安装 grafana 图形分析工具 ## code
main.go ``` package main import ( "bufio" "fmt" "github.com/influxdata/influxdb/client/v2" "io" "log" "os" "regexp" "strconv" "time" "encoding/json" "net/http" "strings" ) type Reader interface { Read(rc chan string) } type Writer interface { Write(rc chan *LogStruct) } type LogStruct struct { Ip string Time time.Time Scheme string //get ,post Path string Protocal string //http,htpps Code int //200,304 Bytes int //请求字节数 } //系统监控参数 type SystemInfo struct { HandleLine int `json:"handle_line"` Tps float64 `json:"tps"` ReadChanLen int `json:"read_chan_len"` WriteChanLen int `json:"write_chan_len"` RunTime string `json:"run_time"` ErrNum int `json:"err_num"` } const ( HandleLine = 0 ErrNum = 1 ) var MonitorChan = make(chan int, 200) type LogProcess struct { rc chan string //从读取模块到解析模块传递数据 wc chan *LogStruct //从解析模块到写入模块传递数据 read Reader write Writer RunTime string `json:"run_time"` ErrNum int `json:"err_num"` } type Monitor struct { startTime time.Time data SystemInfo tpsSli []int } func (m *Monitor) start(lp *LogProcess) { go func() { for n := range MonitorChan { switch n { case HandleLine: m.data.HandleLine += 1 case ErrNum: m.data.ErrNum += 1 } } }() ticker := time.NewTicker(time.Second * 5) go func() { for { <-ticker.C m.tpsSli = append(m.tpsSli, m.data.HandleLine) if len(m.tpsSli) > 2 { m.tpsSli = m.tpsSli[1:] } } }() http.HandleFunc("/monitor", func(w http.ResponseWriter, r *http.Request) { m.data.RunTime = time.Now().Sub(m.startTime).String() m.data.ReadChanLen = len(lp.rc) m.data.WriteChanLen = len(lp.wc) m.data.Tps = float64((m.tpsSli[1] - m.tpsSli[0]) / 5) ret, _ := json.MarshalIndent(m.data, "", "\t") io.WriteString(w, string(ret)) }) http.ListenAndServe(":9193", nil) } type ReadFromFile struct { path string } func (r *ReadFromFile) Read(rc chan string) { //读取模块 file, e := os.Open(r.path) if e != nil { panic(fmt.Sprintf("open file error: %s", e.Error())) } //将文件指针移动末尾 file.Seek(0, 2) reader := bufio.NewReader(file) for { b, err := reader.ReadBytes('\n') if err == io.EOF { time.Sleep(500 * time.Millisecond) continue } else if err != nil { panic(fmt.Sprintf("readbytes error %s", err.Error())) } MonitorChan <- HandleLine rc <- string(b[:len(b)-1]) //去掉最后一个换行符 } } type WriteToInfluxDb struct { influxDBDsn string //influx source data } func (w *WriteToInfluxDb) Write(wc chan *LogStruct) { splitStr := strings.Split(w.influxDBDsn, "@") //写入模块 c, err := client.NewHTTPClient(client.HTTPConfig{ Addr: splitStr[0], Username: splitStr[1], Password: splitStr[2], }) if err != nil { log.Fatal(err) } defer c.Close() for v := range wc { fmt.Println(v) // Create a new point batch bp, err := client.NewBatchPoints(client.BatchPointsConfig{ Database: splitStr[3], Precision: splitStr[4], }) if err != nil { log.Fatal(err) } // Create a point and add to batch tags := map[string]string{"ip": v.Ip, "scheme": v.Scheme, "path": v.Path, "protocol": v.Protocal} fields := map[string]interface{}{ "request_time": v.Time, "code": v.Code, "bytes": v.Bytes, } // 表名, tags,fields, 时间戳 pt, err := client.NewPoint("apache_log", tags, fields, v.Time) if err != nil { log.Fatal(err) } bp.AddPoint(pt) // Write the batch if err := c.Write(bp); err != nil { log.Fatal(err) } // Close client resources if err := c.Close(); err != nil { log.Fatal(err) } } } func (l *LogProcess) Process() { //127.0.0.1 - - [24/Jun/2018:23:35:18 +0800] "POST /index.php?g=Admin&m=public&a=dologin HTTP/1.1" 200 71 reg := regexp.MustCompile(`([\d+\.]+) - - \[(\d+\/\w+\/\w+:\d+:\d+:\d+ \+\d+)\] \"(\w+) (\/.*?) (\w+\/\d\.\d)\" (\d+) (\d+)`) locatTime, _ := time.LoadLocation("Asia/Shanghai") //解析模块 for v := range l.rc { ret := reg.FindStringSubmatch(v) if len(ret) != 8 { MonitorChan <- ErrNum log.Println(" len !=8 len:", len(ret), ret) continue } //格式化时间 location, e := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[2], locatTime) if e != nil { MonitorChan <- ErrNum log.Println(" format time error :", e.Error()) continue } logStruct := &LogStruct{} logStruct.Ip = ret[1] logStruct.Time = location logStruct.Scheme = ret[3] logStruct.Path = ret[4] logStruct.Protocal = ret[5] logStruct.Code, _ = strconv.Atoi(ret[6]) logStruct.Bytes, _ = strconv.Atoi(ret[7]) l.wc <- logStruct } //此方法只能读取一次 //data := <-l.rc //l.wc <- strings.ToUpper(data) } func main() { r := &ReadFromFile{ path: "/usr/local/var/log/httpd/access_log", } w := &WriteToInfluxDb{ influxDBDsn: "http://127.0.0.1:8086@@@imooc@s", } log := &LogProcess{ rc: make(chan string,200), //添加200 个缓存 wc: make(chan *LogStruct,200), read: r, write: w, } go log.read.Read(log.rc) // 由于Read 比较快 而 process 有正则比较慢 可以开多个 Process ,且 rc 拥有缓存 for i := 0; i < 2; i++ { go log.Process() } // write 有 http 请求 较慢 可以开4个 for i := 0; i < 4; i++ { go log.write.Write(log.wc) } m := &Monitor{ startTime: time.Now(), data: SystemInfo{}, } m.start(log) } ```

## 启动 `docker run -d -p 8086:8086 influxdb` `docker run -d -p 3000:3000 grafana/grafana` `go run main.go` ## 测试 访问 `http://127.0.0.1:9193/monitor` 接口返回当前信息情况 ``` { "handle_line": 3098, //处理总数 "tps": 73, //并发 "read_chan_len": 0, //read chan 情况 0 为 情况良好没有堆积 "write_chan_len": 0, "run_time": "54.315217004s", //运行时间 "err_num": 2 //错误总数 } ```
';