使用logstash收集nginx访问日志,


首先安装logstash,这个非常简单,不赘述。建议把所有插件都安装上,省心。

然后要配置一下logstash for nginx。logstash基本原理:input => filter => output。在我们这里input就是nginx的access日志,output就是ElasticSearch。filter则是用来解析和过滤日志用。一般我们要把message结构化再存储,方便后面的搜索和统计。因此需要对message进行解析。logstash是使用grok过滤器,使用match正则表达式解析。要根据自己的log_format来定制。

比如这里我们的log_format配置是:

log_format  main      '$http_host '
                      '$remote_addr - $remote_user [$time_local] '
                      '"$request" $status $body_bytes_sent "$request_body" '
                      '"$http_referer" "$http_user_agent" "$http_x_forwarded_for" '
                      '$request_time '  
                      '$upstream_response_time';

那么相应的grok就是:

%{IPORHOST:http_host} %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{INT:status} %{INT:body_bytes_sent} %{QS:request_body} %{QS:http_referer} %{QS:http_user_agent} %{QS:http_x_forwarded_for} %{NUMBER:request_time:float} %{NUMBER:upstream_response_time:float}

可以用logstash.study.arganzheng.me 95.159.86.100 - - [18/Apr/2016:11:38:51 +0800] "GET /ad.php?p=com.adslib.mobovee&hp=com.pay.m.KOF95&l=d784ee59431b73536284463336b26c03&c=mobojoy HTTP/1.1" 200 10 "-" "-" "Dalvik/2.1.0 (Linux; U; Android 5.0.2; SM-T810 Build/LRX22G)" "-" 0.002 0.002在Grok Debugger测试一下,或者直接用input { stdin { } }测试。

可以把这个Grok抽取表达式作为一个pattern固定下来:

$ cd logstash
$ mkdir patterns # 可以在grok中通过 patterns_dir 指定,默认是这个位置
$ vim /home/work/logstash/patterns/nginx

添加如下一行:

NGINXACCESS %{IPORHOST:http_host} %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{INT:status} %{INT:body_bytes_sent} %{QS:request_body} %{QS:http_referer} %{QS:http_user_agent} %{QS:http_x_forwarded_for} %{NUMBER:request_time:float} %{NUMBER:upstream_response_time:float}

TIPS

1、logstash默认定义了一些 grok pattern,一般来说对于字符串,有双引号包含的用QS,没有的用DATA类型,如%{DATA:request_body}。

2、grok匹配很容易出错,可以使用Grok Debugger进行在线调试。

3、如果想要让URL参数也解析并且成为索引字段,比如一些通用参数,如uid, country, language, etc. 那么可以使用KV插件:

filter {

    grok {
        ...
    }

    ...

    # 再单独将取得的URL、request字段取出来进行key-value值匹配
    # 需要kv插件。提供字段分隔符"&?",值键分隔符"=",则会自动将字段和值采集出来。
    kv {
        source => "request" # 默认是message,我们这里只需要解析上面grok抽取出来的request字段
        field_split => "&?"
        value_split => "="
        include_keys => [ "network", "country", "language", "deviceId" ]
    }
 
    # 把所有字段进行urldecode(显示中文)
    urldecode {
        all_fields => true
    }

}

4、过滤掉安全扫描。logstash的drop filter可以简单对消息进行过滤,例如:

if [message] !~ ".*ERROR.*|.*error.*|.*FATAL.*|.*fatal.*|.*WARN.*|.*warn.*" {
    drop { }
}

对于安全扫描,只需要过滤http_user_agent中含有inf-ssl-duty-scan的请求就可以了:

if [http_user_agent] =~ "inf-ssl-duty-scan" { # 过滤安全扫描
    drop { }
}

综上,整个logstash-nginx.conf配置文件如下:

## input { stdin { } }
input {
    file {
        path => ["/home/work/nginx/logs/access*.log"]
        # exclude => ["*.gz"]
        # start_position => "beginning"
    }
}

## nginx log format config
# log_format  main     '$http_host '
#                      '$remote_addr - $remote_user [$time_local] '
#                      '"$request" $status $body_bytes_sent "$request_body" '
#                      '"$http_referer" "$http_user_agent" "$http_x_forwarded_for" '
#                      '$request_time $upstream_response_time';

filter {
    grok {
        # patterns_dir => [ "/home/work/logstash/patterns" ]
        match => { "message" => "%{NGINXACCESS}" }
    }

    if [http_user_agent] =~ "inf-ssl-duty-scan" { 
        drop { }
    }

    date {
        match => [ "time_local" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }

    geoip {
        source => "remote_addr"
    }

    kv {
        source => "request"
        field_split => "&?"
        value_split => "="
        include_keys => [ "network", "country", "language", "deviceId" ]
    }

    urldecode {
        all_fields => true
    }
}

output {
    elasticsearch {
        hosts => ["10.242.122.23:8200","10.242.99.47:8200","10.242.103.33:8200"]
        index => "logstash-nginx-%{+YYYY.MM.dd}"
  }
  # stdout { codec => rubydebug }
}

然后我们就可以启动logstash查看一下效果了:

$ bin/logstash -f conf/logstash-nginx.conf

会看到logstash已经在扫描nginx日志发送到ES集群了。可以配置一下Kibana,对日志进行分析和可视化。具体参见参考文章。

TIPS

注意,这里收集了所有的nginx访问日志,索引文件会很大(3G左右的日志量可以产生10G左右的索引文件),虽然logstash默认是按天分割了,但是还是不能存储太久的日志,需要定时清理,可以使用 elasticsearch-curator 定时清理。

参考文章

相关内容

    暂无相关文章