5. 日志处理转发

最后更新于:2022-04-02 07:47:29

> ##### [感谢TTA0168](https://zhidao.baidu.com/question/1174224044228167419.html "参考站点") > ##### [感谢Mr. Hu](http://www.cnblogs.com/huhangfei/p/6904994.html "感谢Mr. Hu") > ##### [感谢irow10](http://irow10.blog.51cto.com/ "感谢irow10") > ##### [感谢石瞳禅](http://www.cnblogs.com/stozen/p/5638369.html "感谢石瞳禅"),grok各个规则中文注释 > ##### [感谢飞走不可](http://www.cnblogs.com/hanyifeng/p/5871150.html "感谢飞走不可"),Kibana修改字段单位 写在前面:有时候我们需要分析请求的URL或请求的字符串(?后面的部分),这部分我是在Tomcat中分析,因为Tomcat有专门的query?string字段,不需要单独拆解Nginx日志,所以在Nginx中,我只保留了http请求的版本 对于json过滤,grok过滤等不同的过滤方法,我是这样认为的,grok是万能的,但是如果原始数据能够json化,那优先使用json ### Logstash配置(直接处理转发ES) ```yaml input { redis { host => "192.168.0.106" port => "6400" db => 0 key => "filebeat" password => "ding" data_type => "list" } } filter { if [type] == "proxy-nginx-accesslog" { json { source => "message" remove_field => [ "message" ] } mutate { split => { "request" => " " } } mutate { add_field => { "httpversion" => "%{[request][2]}" } } geoip { source => "xff" database => "/etc/logstash/GeoLite2-City.mmdb" fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone", "location"] remove_field => [ "[geoip][latitude]", "[geoip][longitude]" ] target => "geoip" } } if [type] == "nginx-accesslog" { json { source => "message" remove_field => [ "message" ] } mutate { split => { "request" => " " } } mutate { add_field => { "httpversion" => "%{[request][2]}" } } mutate { split => { "xff" => "," } } mutate { add_field => { "realip" => "%{[xff][0]}" } } geoip { source => "realip" database => "/etc/logstash/GeoLite2-City.mmdb" fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone", "location"] remove_field => [ "[geoip][latitude]", "[geoip][longitude]" ] target => "geoip" } } if [type] == "tomcat-accesslog" { json { source => "message" remove_field => [ "message" ] } mutate { split => { "method" => " " } } mutate { add_field => { "request_method" => "%{[method][0]}" "request_url" => "%{[method][1]}" "httpversion" => "%{[method][2]}" } } mutate { remove_field => [ "method" ] } } mutate { convert => [ "status", "integer" ] convert => [ "body_bytes_sent", "integer" ] convert => [ "request_time", "float" ] convert => [ "send bytes", "integer" ] } } output { if [type] == "proxy-nginx-accesslog" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "logstash-proxy-nginx-accesslog-%{+YYYY.MM.dd}" } } if [type] == "proxy-nginx-errorlog" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "logstash-proxy-nginx-errorlog-%{+YYYY.MM.dd}" } } if [type] == "nginx-accesslog" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "logstash-nginx-accesslog-%{+YYYY.MM.dd}" } } if [type] == "nginx-errorlog" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "logstash-nginx-errorlog-%{+YYYY.MM.dd}" } } if [type] == "systemlog" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "systemlog-%{+YYYY.MM.dd}" } } if [type] == "tomcat-catalina" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "tomcat-cataline-%{+YYYY.MM.dd}" } } if [type] == "tomcat-ding-info" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "tomcat-ding-info-%{+YYYY.MM.dd}" } } if [type] == "tomcat-dinge-error" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "tomcat-ding-error-%{+YYYY.MM.dd}" } } if [type] == "tomcat-accesslog" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "tomcat-accesslog-%{+YYYY.MM.dd}" } } } ``` 因为拆分了字段,原始字段就可以删除,这里要注意需要单独写删除代码跨,这里涉及优先级的问题,具体问题可以自己尝试。 如果Nginx上层有代理的话,xff字段中会是多个IP,我选择拆分字段,后保留第一个IP,但保留原始字段。 ### Logstash配置(消费Redis数据) ```yaml input { redis { host => "192.168.0.106" port => "6400" db => 0 key => "filebeat" password => "ding" data_type => "list" } } ``` 其他配置同上,写入配置请看《日志收集配置》章节 Logstash会根据filebeat中数据的type进行分析,不需要改动 ### Logstash配置(分析IIS日志) ```yaml input { beats { port => 5045 } } filter { if [type] == "iislog" { grok { match => {"message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{NOTSPACE:s_sitename}|-) (%{NOTSPACE:s_computername}|-) (%{IPORHOST:s_ip}|-) (%{WORD:cs_method}|-) %{NOTSPACE:cs_uri_stem} %{NOTSPACE:cs_uri_query} (%{NUMBER:s_port}|-) (%{NOTSPACE:cs_username}|-) (%{IPORHOST:c_ip}|-) (?:HTTP/%{NUMBER:http_version}) %{NOTSPACE:cs_useragent} (%{GREEDYDATA:cs_cookie}| -) (%{NOTSPACE:cs_referer}|-) %{NOTSPACE:cs_host} (%{NUMBER:sc_status}|-) (%{NUMBER:sc_substatus}|-) (%{NUMBER:sc_win32_status}|-) (%{NUMBER:sc_bytes}|-) (%{NUMBER:cs_bytes}|-) (%{NUMBER:time_taken}|-)"} add_tag => "iis" remove_field => ["message", "@version"] } date { match => [ "log_timestamp", "YYYY-MM-dd HH:mm:ss" ] timezone => "Etc/GMT" } useragent { source => "cs_useragent" target => "ua" remove_field => ["cs_useragent"] } geoip { source => "c_ip" database => "/etc/logstash/GeoLite2-City.mmdb" fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone", "location"] remove_field => [ "[geoip][latitude]", "[geoip][longitude]" ] target => "geoip" } mutate { convert => [ "sc_bytes", "integer" ] convert => [ "cs_bytes", "integer" ] convert => [ "time_taken", "float" ] convert => [ "sc_status", "integer" ] convert => [ "s_port", "integer" ] } } } output { if [type] == "iislog" { elasticsearch { hosts => ["192.168.0.231:9200", "192.168.0.232:9200"] index => "logstash-iislog-%{+YYYY.MM.dd}" } } } ``` #### 经验1:减少配置内容,更加易读 gork中的匹配规则一旦固定下下,最终放到指定目录中,配置中直接调用 ```shell /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns/iis ``` 实际内容 ```shell IIS_LOG %{TIMESTAMP_ISO8601:log_timestamp} ...略... (%{NUMBER:time_taken}|-) ``` Logstash中调用 ```shell match => { "message" => "%{IIS_LOG}" } ``` #### 经验2:日志过滤 IIS日志中的前4行是"#"开头,减少Logstash的工作,在Filebeat中配置 #### 经验3:grok的调试 ```shell 调试站点:http://grokdebug.herokuapp.com ``` ##### grok排错思路: - ##### 注意匹配时,字段间的空格 - ##### 要考虑字段默认值 - ##### 了解内置正则含义,请参考文档开始的链接 - ##### 注意一定要尝试多样本,生产数据 ```shell 比如cookie,最终,(%{NOTSPACE:cookie}|-)生产不适用,(%{GREEDYDATA:cookie}|-)生产适用 ``` #### 经验4:其他 logstash有时候关的慢,因为在处理数据,等待一会就好了 没用的字段去掉,但要注意先后顺序,拆分后再删除 时间的处理需要使用Logstash的plugins-filters-date插件 #### 经验5:IIS日志时区问题 >IIS日志时间为什么晚八小时的原因? 这要从W3C标准说起,W3C是按照GMT时间进行记录的,IIS默认的日志格式就是W3C标准日志文件。北京时间是东八时区,也就是GMT+8,IIS在记录时间时就会在系统时间基础上加上8小时,所以,你那服务器的日志文件记录的时间久延后了八个小时,但实际上仍然是实时记录的。解决这个问题的方法是,依次打开Internet信息服务(IIS)---Internet信息服务----本地计算机---网站,右击子项中相应的网站(如果要设置所有的网站,则直接在网站上点击),选择属性,在网站标签下找到活动日志格式,打开IIS日志属性,再选择常规,最后勾选文件命名和创建使用当地时间 设置当前时间为timezone => "Etc/GMT",kibana会自动根据当前时区转换时间。 > #### [常用时区](http://php.net/manual/zh/timezones.others.php "常用时区") #### 经验6:注意字段的转换 后期使用Kibana出图时,如果想使用范围、计算,有些字段需要转换成integer 可过滤时转换,也可以结尾同意转换 如果是计算网络流量,还需要在kibana中设置字段单位,具体可参照开头部分的站点。 #### 经验7:批量删除索引 ```shell curl -XDELETE 'http://192.168.0.230:9200/abcddd' ``` [https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html) #### 经验8:Logstash和Filebeat重启顺序 建议先停止Filebeat,再重启Logstash
';