【ELK】2-Logstash配置grok插件格式化Nginx日志

前言

在之前的文章中已经完成了ELK的搭建(Elasticsearch , Logstash, Kibana); Nginx 的日志已经成功搜集到了 Elasticsearch 中;通过 Kibana 读取 ElasticSearch 中的数据,进行展示;但是对于整端采集回来的 Nginx 日志,只能作为一个 text 格式的字段储存,并没有对日志进行解析; image.png

需求分析


Nginx日志格式

根据官网 Module nginx_http_log_module 的解析,在默认配置下,Nginx日志使用 combined 格式:

1
2
3
4
5
6
log_format main     '$remote_addr - $remote_user [$time_local] '
										'"$request" $status $body_bytes_sent '
										'"$http_referer" "$http_user_agent"'
										'"$request_time" "$upstream_response_time"'
										'"$upstream_connect_time" "$upstream_header_time"'
										'"$http_x_forwarded_for" ';

Nginxlog_format 配置在 http 域中,然后对应的 access_log 使用这个日志模版:

1
127.0.0.1 - - [14/Jun/2023:11:47:03 +0800] "GET / HTTP/1.1" 200 438 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" "0.035" "0.032" "0.000" "0.032"

查看日志说明:

配置项 解析 参考日志
remote_addr Client Ip 127.0.0.1
remote_user 用户端
time_local 用具记录访问时间和时区,日期格式: [14/Jun/2023:11:47:03 +0800]
request 用于记录请求的url以及请求方法 GET / HTTP/1.1
status HTTP状态码 200
body_bytes_sent 发送给客户端的文件主体内容的大小,不包括响应头的大小(可以将日志每条记录中的这个值累加起来以粗略估计服务器吞吐量) 438
http_referer 记录从哪个页面链接访问过来的(请求头 Referer 的内容 ) -
http_user_agent 客户端浏览器信息(请求头User-Agent的内容 ) Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36
request_time 整个请求的总时间,以秒为单位(包括接收客户端请求数据的时间、后端程序响应的时间、发送响应数据给客户端的时间(不包含写日志的时间)) 0.035
upstream_response_time 请求过程中,upstream 的响应时间,以秒为单位(向后端建立连接开始到接受完数据然后关闭连接为止的时间) 0.032
upstream_connect_time 与上游服务器建立连接所花费的时间 0.000
upstream_header_time 从建立连接成功到接收第一个字节之间的时间 0.032
http_x_forwarded_for 客户端的真实ip,通常web服务器放在反向代理的后面,这样就不能获取到客户的IP地址了,通过 $remote_add拿到的IP地址是反向代理服务器的iP地址。反向代理服务器在转发请求的 http 头信息中,可以增加 x_forwarded_for** 信息,用以记录原有客户端的IP地址和原来客户端的请求的服务器地址

时间模型 image.png

Elasticsearch 中的索引内容

**Kibana **的页面中可以发现,**ElasticSearch **对 **Nginx **日志识别成 text 字段 image.png

问题分析

不难看出,Nginx 的日志存储到 ElasticSearch 并没有对日志内容进行划分和内容的识别。所以我们也无法在 **Kibana **上对日志内容进行检索和统计。 为了解决这个问题。我们可以通过修改 **Logstash **的配置,让日志内容在入库 ElasticSearch 之前,先完成日志内容的字段划分和内容识别。

Logstash配置思路


Logstash 分为 Input、Output、Filter、Codec 等多种plugins。

  • Input:数据的输入源也支持多种插件,如elk官网的beats、file、graphite、http、kafka、redis、exec等等等、、、
  • Output:数据的输出目的也支持多种插件,如本文的elasticsearch,当然这可能也是最常用的一种输出。以及exec、stdout终端、graphite、http、zabbix、nagios、redmine等等、、、
  • Filter:使用过滤器根据日志事件的特征,对数据事件进行处理过滤后,在输出。支持grok、date、geoip、mutate、ruby、json、kv、csv、checksum、dns、drop、xml等等、、
  • Codec:编码插件,改变事件数据的表示方式,它可以作为对输入或输出运行该过滤。和其它产品结合,如rubydebug、graphite、fluent、nmap等等。

怎么修改采集到日志内容

在 **Note1 **搭建的ELK中,采集到 **Nginx **日志后并没有对日志进行处理;下面我们可以通过 Logstash 中的 Filter进行日志加工

如何对日志内容进行分段和识别

**grok **是目前在 **Logstash **中使用最广泛的插件之一,可以很轻易地识别出文本地结构内容,使其便于检索。 文档介绍:Debugging grok expressions

如何测试配置的运行结果

我们可以在 Logstashoutput 中引用 codecs 插件,当在 Logstash 中配置了输出到 codecs 时,通过屏幕打印我们可以知道最终 Logstash 的输出结果。 引入不同的格式化插件: rubydebug

Logstash pipeline 配置

pipeline 配置草稿

1
2
3
4
5
6
7
8
9
input {
	stdin {} # 通过键盘输入
}

output {
	stdout {
  	codec => rubydebug
  }
}

Logstash 配置修改

使用 grok 识别日志内容

Kibana 中通过 Dev Tools 中的 Grok Debugger 工具,可以对日志内容进行调试 image.png 先输入 Nginx 日志进行匹配:( Sample Data

1
127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] "POST //third_party/store/getAllList HTTP/1.1" 200 81 "-" "Go-http-client/1.1" "0.042" "0.036" "0.000" "0.036" "148.70.84.110, 127.0.0.1"

根据日志内容写 Grok 的匹配规则:( **Grok Pattern **)

1
%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"

显示识别的结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
  "remote_addr": "127.0.0.1",
  "response_code": "200",
  "upstream_header_time": "0.036",
  "time_local": "14/Jun/2023:12:58:06 +0800",
  "http_version": "1.1",
  "request_method": "POST",
  "uri": "//third_party/store/getAllList",
  "http_user_agent": "Go-http-client/1.1",
  "remote_user": "-",
  "request_time": "0.042",
  "upstream_connect_time": "0.000",
  "body_sent_bytes": "81",
  "http_x_forwarded_for": "148.70.84.110, 127.0.0.1",
  "http_referrer": "-",
  "upstream_response_time": "0.036"
}

📢📢📢 注意:

补充一个:nginx的 error 日志;gork 规则;

先输入 Nginx 日志进行匹配:( Sample Data

1
2024/05/09 17:42:51 [error] 32607#32607: *872048532 FastCGI sent in stderr: "PHP message: PHP Fatal error:  Allowed memory size of 134217728 bytes exhausted (tried to allocate 4096 bytes) in /home/ubuntu/www/kp/thinkphp/library/think/db/Connection.php on line 376" while reading response header from upstream, client: 219.137.50.203, server: kapeng.xjr2018.com, request: "POST /store/food_order/orderAddress HTTP/1.1", upstream: "fastcgi://unix:/run/php/php7.1-fpm.sock:", host: "kapeng.xjr2018.com", referrer: "http://kapeng.xjr2018.com/manage_store/"

根据日志内容写 Grok 的匹配规则:( Grok Pattern )

1
(?<timestamp>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:errormessage}(?:, client: (?<clientip>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?

显示识别的结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
  "severity": "error",
  "errormessage": "*872048532 FastCGI sent in stderr: \"PHP message: PHP Fatal error:  Allowed memory size of 134217728 bytes exhausted (tried to allocate 4096 bytes) in /home/ubuntu/www/kp/thinkphp/library/think/db/Connection.php on line 376\" while reading response header from upstream",
  "request_host": "\"kapeng.xjr2018.com\"",
  "server": "kapeng.xjr2018.com",
  "request": "\"POST /store/food_order/orderAddress HTTP/1.1\"",
  "referrer": "http://kapeng.xjr2018.com/manage_store/",
  "upstream": "\"fastcgi://unix:/run/php/php7.1-fpm.sock:\"",
  "clientip": "219.137.50.203",
  "pid": "32607",
  "timestamp": "2024/05/09 17:42:51"
}

测试 Logstash Pipline配置

创建一个测试配置文件:logstash-test.conf, 通过键盘输入的方式把日志内容输入到** Logstash**,最后查看结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
input {
	stdin {} # 通过键盘输入
}

filter {
	grok {
  	match => {
    	"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
		}
  }
  # 通过date插件,把nginx日志中的时间戳用作 Logstash 的 event 时间戳
	date {
  	match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
	stdout {
  	codec => rubydebug
  }
}

Logstash 配置语法检测

通过 Logstash 添加参数 **–config.test_and_exit ,**检查配置文件是否正确

1
2
3
4
logstash --config.test_and_exit -f ./logstash-test.conf

## 执行后看到结果:(验证通过)
[LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

可以看到,在执行命令后生成:**Config Validation Result: OK, ** 说明结果成功;

验证Logstash对日志处理结果

通过键盘输入查看 Logstash 处理结果

1
logstash -f /etc/logstash/conf.d/logtash-test.conf

当logstash完成初始化后,会有屏显 The stdin plugin is now waiting for input:,通过键盘输入把日志内容输入进去,会得到输出结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 输入
127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] "POST //third_party/store/getAllList HTTP/1.1" 200 81 "-" "Go-http-client/1.1" "0.042" "0.036" "0.000" "0.036" "148.70.84.110, 127.0.0.1"

# 输出
{
             "http_referrer" => "-",
    "upstream_response_time" => "0.036",
              "http_version" => "1.1",
                     "event" => {
        "original" => "127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.042\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\""
    },
      "upstream_header_time" => "0.036",
            "request_method" => "POST",
                "@timestamp" => 2023-06-14T04:58:06.000Z,
      "http_x_forwarded_for" => "148.70.84.110, 127.0.0.1",
                   "message" => "127.0.0.1 - - [14/Jun/2023:12:58:06 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.042\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\"",
                "time_local" => "14/Jun/2023:12:58:06 +0800",
     "upstream_connect_time" => "0.000",
                  "@version" => "1",
               "remote_addr" => "127.0.0.1",
                       "uri" => "//third_party/store/getAllList",
              "request_time" => "0.042",
             "response_code" => "200",
                      "host" => {
        "hostname" => "xjr-pc"
    },
           "http_user_agent" => "Go-http-client/1.1",
               "remote_user" => "-",
           "body_sent_bytes" => "81"
}

Logstash 配置修改(输入,输出)

经过上面的测试,我们可以确认 **logstash **已成功完成了对 nginx 日志内容的分片和识别,我们最后再调整一下配置文件,把经过处理的日志内容最终输出到ES中。 编辑配置文件:/etc/logstash/conf.d/nginx-access-food.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
input {
  beats {
          host => "0.0.0.0"
          port => 5400	# 对应在filebeat的配置中,output到logstash的5400端口
  }
}

filter {
	grok {
  	match => {
    	"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
		}
  }
  # 通过date插件,把nginx日志中的时间戳用作 Logstash 的 event 时间戳
	date {
  	match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
          hosts => ["127.0.0.1:9200"]
          index => "rc_index_pattern-%{+YYYY.MM.dd}"
  }
}

启动 **Logstash **服务;

1
2
3
4
5
# 启动服务
systemctl start logstash.service

# 重启服务
systemctl restart logstash.service

效果查看


image.png

总结


本文通过nginx日志分析这个场景,简单介绍了如何通过修改logstash的配置文件,实现对日志内容的字段划分和内容识别,使采集到的日志具备了被检索和统计的可能性。 也通过这个过程简单描述了我们在分析日志内容时候所用到的插件、工具以及思路。 通过这次logstash配置调整,我们便可以在kibana页面中轻易地完成日志内容的筛选。在实际的业务场景中,这一工具可以帮助我们更好地掌握后台的运行情况。

0%