前言
在之前的文章中已经完成了ELK的搭建(Elasticsearch , Logstash, Kibana); Nginx 的日志已经成功搜集到了 Elasticsearch 中;通过 Kibana 读取 ElasticSearch 中的数据,进行展示;但是对于整端采集回来的 Nginx 日志,只能作为一个 text 格式的字段储存,并没有对日志进行解析;
需求分析
Nginx日志格式
根据官网 Module nginx_http_log_module 的解析,在默认配置下,Nginx日志使用 combined 格式:
1
2
3
4
5
6
|
log_format main '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent"'
'"$request_time" "$upstream_response_time"'
'"$upstream_connect_time" "$upstream_header_time"'
'"$http_x_forwarded_for" ';
|
Nginx 的 log_format 配置在 http 域中,然后对应的 access_log 使用这个日志模版:
1
|
127.0.0.1 - - [14/Jun/2023:11:47:03 +0800] "GET / HTTP/1.1" 200 438 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" "0.035" "0.032" "0.000" "0.032"
|
查看日志说明:
配置项 |
解析 |
参考日志 |
remote_addr |
Client Ip |
127.0.0.1 |
remote_user |
用户端 |
|
|
|
|
time_local |
用具记录访问时间和时区,日期格式: |
[14/Jun/2023:11:47:03 +0800] |
request |
用于记录请求的url以及请求方法 |
GET / HTTP/1.1 |
status |
HTTP状态码 |
200 |
body_bytes_sent |
发送给客户端的文件主体内容的大小,不包括响应头的大小(可以将日志每条记录中的这个值累加起来以粗略估计服务器吞吐量) |
438 |
http_referer |
记录从哪个页面链接访问过来的(请求头 Referer 的内容 ) |
- |
http_user_agent |
客户端浏览器信息(请求头User-Agent的内容 ) |
Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 |
request_time |
整个请求的总时间,以秒为单位(包括接收客户端请求数据的时间、后端程序响应的时间、发送响应数据给客户端的时间(不包含写日志的时间)) |
0.035 |
upstream_response_time |
请求过程中,upstream 的响应时间,以秒为单位(向后端建立连接开始到接受完数据然后关闭连接为止的时间) |
0.032 |
upstream_connect_time |
与上游服务器建立连接所花费的时间 |
0.000 |
upstream_header_time |
从建立连接成功到接收第一个字节之间的时间 |
0.032 |
http_x_forwarded_for |
客户端的真实ip,通常web服务器放在反向代理的后面,这样就不能获取到客户的IP地址了,通过 $remote_add拿到的IP地址是反向代理服务器的iP地址。反向代理服务器在转发请求的 http 头信息中,可以增加 x_forwarded_for** 信息,用以记录原有客户端的IP地址和原来客户端的请求的服务器地址 |
|
|
|
|
时间模型
Elasticsearch 中的索引内容
**Kibana **的页面中可以发现,**ElasticSearch **对 **Nginx **日志识别成 text 字段
问题分析
不难看出,Nginx 的日志存储到 ElasticSearch 并没有对日志内容进行划分和内容的识别。所以我们也无法在 **Kibana **上对日志内容进行检索和统计。
为了解决这个问题。我们可以通过修改 **Logstash **的配置,让日志内容在入库 ElasticSearch 之前,先完成日志内容的字段划分和内容识别。
Logstash配置思路
Logstash 分为 Input、Output、Filter、Codec 等多种plugins。
- Input:数据的输入源也支持多种插件,如elk官网的beats、file、graphite、http、kafka、redis、exec等等等、、、
- Output:数据的输出目的也支持多种插件,如本文的elasticsearch,当然这可能也是最常用的一种输出。以及exec、stdout终端、graphite、http、zabbix、nagios、redmine等等、、、
- Filter:使用过滤器根据日志事件的特征,对数据事件进行处理过滤后,在输出。支持grok、date、geoip、mutate、ruby、json、kv、csv、checksum、dns、drop、xml等等、、
- Codec:编码插件,改变事件数据的表示方式,它可以作为对输入或输出运行该过滤。和其它产品结合,如rubydebug、graphite、fluent、nmap等等。
怎么修改采集到日志内容
在 **Note1 **搭建的ELK中,采集到 **Nginx **日志后并没有对日志进行处理;下面我们可以通过 Logstash 中的 Filter进行日志加工
如何对日志内容进行分段和识别
**grok **是目前在 **Logstash **中使用最广泛的插件之一,可以很轻易地识别出文本地结构内容,使其便于检索。
文档介绍:Debugging grok expressions
如何测试配置的运行结果
我们可以在 Logstash 的 output 中引用 codecs 插件,当在 Logstash 中配置了输出到 codecs 时,通过屏幕打印我们可以知道最终 Logstash 的输出结果。
引入不同的格式化插件: rubydebug
Logstash pipeline 配置
pipeline 配置草稿
1
2
3
4
5
6
7
8
9
|
input {
stdin {} # 通过键盘输入
}
output {
stdout {
codec => rubydebug
}
}
|
Logstash 配置修改
使用 grok 识别日志内容
在 Kibana 中通过 Dev Tools 中的 Grok Debugger 工具,可以对日志内容进行调试
先输入 Nginx 日志进行匹配:( Sample Data )
1
|
127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] "POST //third_party/store/getAllList HTTP/1.1" 200 81 "-" "Go-http-client/1.1" "0.042" "0.036" "0.000" "0.036" "148.70.84.110, 127.0.0.1"
|
根据日志内容写 Grok 的匹配规则:( **Grok Pattern **)
1
|
%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"
|
显示识别的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
{
"remote_addr": "127.0.0.1",
"response_code": "200",
"upstream_header_time": "0.036",
"time_local": "14/Jun/2023:12:58:06 +0800",
"http_version": "1.1",
"request_method": "POST",
"uri": "//third_party/store/getAllList",
"http_user_agent": "Go-http-client/1.1",
"remote_user": "-",
"request_time": "0.042",
"upstream_connect_time": "0.000",
"body_sent_bytes": "81",
"http_x_forwarded_for": "148.70.84.110, 127.0.0.1",
"http_referrer": "-",
"upstream_response_time": "0.036"
}
|
📢📢📢 注意:
补充一个:nginx的 error 日志;gork 规则;
先输入 Nginx 日志进行匹配:( Sample Data )
1
|
2024/05/09 17:42:51 [error] 32607#32607: *872048532 FastCGI sent in stderr: "PHP message: PHP Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 4096 bytes) in /home/ubuntu/www/kp/thinkphp/library/think/db/Connection.php on line 376" while reading response header from upstream, client: 219.137.50.203, server: kapeng.xjr2018.com, request: "POST /store/food_order/orderAddress HTTP/1.1", upstream: "fastcgi://unix:/run/php/php7.1-fpm.sock:", host: "kapeng.xjr2018.com", referrer: "http://kapeng.xjr2018.com/manage_store/"
|
根据日志内容写 Grok 的匹配规则:( Grok Pattern )
1
|
(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:errormessage}(?:, client: (?<clientip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?
|
显示识别的结果:
1
2
3
4
5
6
7
8
9
10
11
12
|
{
"severity": "error",
"errormessage": "*872048532 FastCGI sent in stderr: \"PHP message: PHP Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 4096 bytes) in /home/ubuntu/www/kp/thinkphp/library/think/db/Connection.php on line 376\" while reading response header from upstream",
"request_host": "\"kapeng.xjr2018.com\"",
"server": "kapeng.xjr2018.com",
"request": "\"POST /store/food_order/orderAddress HTTP/1.1\"",
"referrer": "http://kapeng.xjr2018.com/manage_store/",
"upstream": "\"fastcgi://unix:/run/php/php7.1-fpm.sock:\"",
"clientip": "219.137.50.203",
"pid": "32607",
"timestamp": "2024/05/09 17:42:51"
}
|
测试 Logstash Pipline配置
创建一个测试配置文件:logstash-test.conf, 通过键盘输入的方式把日志内容输入到** Logstash**,最后查看结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
input {
stdin {} # 通过键盘输入
}
filter {
grok {
match => {
"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
}
}
# 通过date插件,把nginx日志中的时间戳用作 Logstash 的 event 时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
stdout {
codec => rubydebug
}
}
|
Logstash 配置语法检测
通过 Logstash 添加参数 **–config.test_and_exit ,**检查配置文件是否正确
1
2
3
4
|
logstash --config.test_and_exit -f ./logstash-test.conf
## 执行后看到结果:(验证通过)
[LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
|
可以看到,在执行命令后生成:**Config Validation Result: OK, ** 说明结果成功;
验证Logstash对日志处理结果
通过键盘输入查看 Logstash 处理结果
1
|
logstash -f /etc/logstash/conf.d/logtash-test.conf
|
当logstash完成初始化后,会有屏显 The stdin plugin is now waiting for input:,通过键盘输入把日志内容输入进去,会得到输出结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# 输入
127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] "POST //third_party/store/getAllList HTTP/1.1" 200 81 "-" "Go-http-client/1.1" "0.042" "0.036" "0.000" "0.036" "148.70.84.110, 127.0.0.1"
# 输出
{
"http_referrer" => "-",
"upstream_response_time" => "0.036",
"http_version" => "1.1",
"event" => {
"original" => "127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.042\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\""
},
"upstream_header_time" => "0.036",
"request_method" => "POST",
"@timestamp" => 2023-06-14T04:58:06.000Z,
"http_x_forwarded_for" => "148.70.84.110, 127.0.0.1",
"message" => "127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.042\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\"",
"time_local" => "14/Jun/2023:12:58:06 +0800",
"upstream_connect_time" => "0.000",
"@version" => "1",
"remote_addr" => "127.0.0.1",
"uri" => "//third_party/store/getAllList",
"request_time" => "0.042",
"response_code" => "200",
"host" => {
"hostname" => "xjr-pc"
},
"http_user_agent" => "Go-http-client/1.1",
"remote_user" => "-",
"body_sent_bytes" => "81"
}
|
Logstash 配置修改(输入,输出)
经过上面的测试,我们可以确认 **logstash **已成功完成了对 nginx 日志内容的分片和识别,我们最后再调整一下配置文件,把经过处理的日志内容最终输出到ES中。
编辑配置文件:/etc/logstash/conf.d/nginx-access-food.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
input {
beats {
host => "0.0.0.0"
port => 5400 # 对应在filebeat的配置中,output到logstash的5400端口
}
}
filter {
grok {
match => {
"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
}
}
# 通过date插件,把nginx日志中的时间戳用作 Logstash 的 event 时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "rc_index_pattern-%{+YYYY.MM.dd}"
}
}
|
启动 **Logstash **服务;
1
2
3
4
5
|
# 启动服务
systemctl start logstash.service
# 重启服务
systemctl restart logstash.service
|
效果查看
总结
本文通过nginx日志分析这个场景,简单介绍了如何通过修改logstash的配置文件,实现对日志内容的字段划分和内容识别,使采集到的日志具备了被检索和统计的可能性。
也通过这个过程简单描述了我们在分析日志内容时候所用到的插件、工具以及思路。
通过这次logstash配置调整,我们便可以在kibana页面中轻易地完成日志内容的筛选。在实际的业务场景中,这一工具可以帮助我们更好地掌握后台的运行情况。