5. 日志处理转发
最后更新于:2022-04-02 07:47:29
> ##### [感谢TTA0168](https://zhidao.baidu.com/question/1174224044228167419.html "参考站点")
> ##### [感谢Mr. Hu](http://www.cnblogs.com/huhangfei/p/6904994.html "感谢Mr. Hu")
> ##### [感谢irow10](http://irow10.blog.51cto.com/ "感谢irow10")
> ##### [感谢石瞳禅](http://www.cnblogs.com/stozen/p/5638369.html "感谢石瞳禅"),grok各个规则中文注释
> ##### [感谢飞走不可](http://www.cnblogs.com/hanyifeng/p/5871150.html "感谢飞走不可"),Kibana修改字段单位
写在前面:有时候我们需要分析请求的URL或请求的字符串(?后面的部分),这部分我是在Tomcat中分析,因为Tomcat有专门的query?string字段,不需要单独拆解Nginx日志,所以在Nginx中,我只保留了http请求的版本
对于json过滤,grok过滤等不同的过滤方法,我是这样认为的,grok是万能的,但是如果原始数据能够json化,那优先使用json
### Logstash配置(直接处理转发ES)
```yaml
input {
redis {
host => "192.168.0.106"
port => "6400"
db => 0
key => "filebeat"
password => "ding"
data_type => "list"
}
}
filter {
if [type] == "proxy-nginx-accesslog" {
json {
source => "message"
remove_field => [ "message" ]
}
mutate {
split => { "request" => " " }
}
mutate {
add_field => {
"httpversion" => "%{[request][2]}"
}
}
geoip {
source => "xff"
database => "/etc/logstash/GeoLite2-City.mmdb"
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone", "location"]
remove_field => [ "[geoip][latitude]", "[geoip][longitude]" ]
target => "geoip"
}
}
if [type] == "nginx-accesslog" {
json {
source => "message"
remove_field => [ "message" ]
}
mutate {
split => { "request" => " " }
}
mutate {
add_field => {
"httpversion" => "%{[request][2]}"
}
}
mutate {
split => { "xff" => "," }
}
mutate {
add_field => {
"realip" => "%{[xff][0]}"
}
}
geoip {
source => "realip"
database => "/etc/logstash/GeoLite2-City.mmdb"
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone", "location"]
remove_field => [ "[geoip][latitude]", "[geoip][longitude]" ]
target => "geoip"
}
}
if [type] == "tomcat-accesslog" {
json {
source => "message"
remove_field => [ "message" ]
}
mutate {
split => { "method" => " " }
}
mutate {
add_field => {
"request_method" => "%{[method][0]}"
"request_url" => "%{[method][1]}"
"httpversion" => "%{[method][2]}"
}
}
mutate {
remove_field => [ "method" ]
}
}
mutate {
convert => [ "status", "integer" ]
convert => [ "body_bytes_sent", "integer" ]
convert => [ "request_time", "float" ]
convert => [ "send bytes", "integer" ]
}
}
output {
if [type] == "proxy-nginx-accesslog" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "logstash-proxy-nginx-accesslog-%{+YYYY.MM.dd}"
}
}
if [type] == "proxy-nginx-errorlog" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "logstash-proxy-nginx-errorlog-%{+YYYY.MM.dd}"
}
}
if [type] == "nginx-accesslog" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "logstash-nginx-accesslog-%{+YYYY.MM.dd}"
}
}
if [type] == "nginx-errorlog" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "logstash-nginx-errorlog-%{+YYYY.MM.dd}"
}
}
if [type] == "systemlog" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "systemlog-%{+YYYY.MM.dd}"
}
}
if [type] == "tomcat-catalina" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "tomcat-cataline-%{+YYYY.MM.dd}"
}
}
if [type] == "tomcat-ding-info" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "tomcat-ding-info-%{+YYYY.MM.dd}"
}
}
if [type] == "tomcat-dinge-error" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "tomcat-ding-error-%{+YYYY.MM.dd}"
}
}
if [type] == "tomcat-accesslog" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "tomcat-accesslog-%{+YYYY.MM.dd}"
}
}
}
```
因为拆分了字段,原始字段就可以删除,这里要注意需要单独写删除代码跨,这里涉及优先级的问题,具体问题可以自己尝试。
如果Nginx上层有代理的话,xff字段中会是多个IP,我选择拆分字段,后保留第一个IP,但保留原始字段。
### Logstash配置(消费Redis数据)
```yaml
input {
redis {
host => "192.168.0.106"
port => "6400"
db => 0
key => "filebeat"
password => "ding"
data_type => "list"
}
}
```
其他配置同上,写入配置请看《日志收集配置》章节
Logstash会根据filebeat中数据的type进行分析,不需要改动
### Logstash配置(分析IIS日志)
```yaml
input {
beats {
port => 5045
}
}
filter {
if [type] == "iislog" {
grok {
match => {"message" => "%{TIMESTAMP_ISO8601:log_timestamp} (%{NOTSPACE:s_sitename}|-) (%{NOTSPACE:s_computername}|-) (%{IPORHOST:s_ip}|-) (%{WORD:cs_method}|-) %{NOTSPACE:cs_uri_stem} %{NOTSPACE:cs_uri_query} (%{NUMBER:s_port}|-) (%{NOTSPACE:cs_username}|-) (%{IPORHOST:c_ip}|-) (?:HTTP/%{NUMBER:http_version}) %{NOTSPACE:cs_useragent} (%{GREEDYDATA:cs_cookie}| -) (%{NOTSPACE:cs_referer}|-) %{NOTSPACE:cs_host} (%{NUMBER:sc_status}|-) (%{NUMBER:sc_substatus}|-) (%{NUMBER:sc_win32_status}|-) (%{NUMBER:sc_bytes}|-) (%{NUMBER:cs_bytes}|-) (%{NUMBER:time_taken}|-)"}
add_tag => "iis"
remove_field => ["message", "@version"]
}
date {
match => [ "log_timestamp", "YYYY-MM-dd HH:mm:ss" ]
timezone => "Etc/GMT"
}
useragent {
source => "cs_useragent"
target => "ua"
remove_field => ["cs_useragent"]
}
geoip {
source => "c_ip"
database => "/etc/logstash/GeoLite2-City.mmdb"
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone", "location"]
remove_field => [ "[geoip][latitude]", "[geoip][longitude]" ]
target => "geoip"
}
mutate {
convert => [ "sc_bytes", "integer" ]
convert => [ "cs_bytes", "integer" ]
convert => [ "time_taken", "float" ]
convert => [ "sc_status", "integer" ]
convert => [ "s_port", "integer" ]
}
}
}
output {
if [type] == "iislog" {
elasticsearch {
hosts => ["192.168.0.231:9200", "192.168.0.232:9200"]
index => "logstash-iislog-%{+YYYY.MM.dd}"
}
}
}
```
#### 经验1:减少配置内容,更加易读
gork中的匹配规则一旦固定下下,最终放到指定目录中,配置中直接调用
```shell
/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns/iis
```
实际内容
```shell
IIS_LOG %{TIMESTAMP_ISO8601:log_timestamp} ...略... (%{NUMBER:time_taken}|-)
```
Logstash中调用
```shell
match => { "message" => "%{IIS_LOG}" }
```
#### 经验2:日志过滤
IIS日志中的前4行是"#"开头,减少Logstash的工作,在Filebeat中配置
#### 经验3:grok的调试
```shell
调试站点:http://grokdebug.herokuapp.com
```
##### grok排错思路:
- ##### 注意匹配时,字段间的空格
- ##### 要考虑字段默认值
- ##### 了解内置正则含义,请参考文档开始的链接
- ##### 注意一定要尝试多样本,生产数据
```shell
比如cookie,最终,(%{NOTSPACE:cookie}|-)生产不适用,(%{GREEDYDATA:cookie}|-)生产适用
```
#### 经验4:其他
logstash有时候关的慢,因为在处理数据,等待一会就好了
没用的字段去掉,但要注意先后顺序,拆分后再删除
时间的处理需要使用Logstash的plugins-filters-date插件
#### 经验5:IIS日志时区问题
>IIS日志时间为什么晚八小时的原因?
这要从W3C标准说起,W3C是按照GMT时间进行记录的,IIS默认的日志格式就是W3C标准日志文件。北京时间是东八时区,也就是GMT+8,IIS在记录时间时就会在系统时间基础上加上8小时,所以,你那服务器的日志文件记录的时间久延后了八个小时,但实际上仍然是实时记录的。解决这个问题的方法是,依次打开Internet信息服务(IIS)---Internet信息服务----本地计算机---网站,右击子项中相应的网站(如果要设置所有的网站,则直接在网站上点击),选择属性,在网站标签下找到活动日志格式,打开IIS日志属性,再选择常规,最后勾选文件命名和创建使用当地时间
设置当前时间为timezone => "Etc/GMT",kibana会自动根据当前时区转换时间。
> #### [常用时区](http://php.net/manual/zh/timezones.others.php "常用时区")
#### 经验6:注意字段的转换
后期使用Kibana出图时,如果想使用范围、计算,有些字段需要转换成integer
可过滤时转换,也可以结尾同意转换
如果是计算网络流量,还需要在kibana中设置字段单位,具体可参照开头部分的站点。
#### 经验7:批量删除索引
```shell
curl -XDELETE 'http://192.168.0.230:9200/abcddd'
```
[https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html)
#### 经验8:Logstash和Filebeat重启顺序
建议先停止Filebeat,再重启Logstash
';