前言
在实际运维过程中,还需要进行一些必要的补充。
场景及解决方案
识别IP地址的地理位置
在常见的业务分析场景里面,我们往往需要对访问来源进行统计,比如说查出网站内访问量最高的文章,或者找出访问量最密集的时间点,甚至只是简单地统计页面打开的速度,我们都可以通过前面我们已经做好的日志内容检索去入手。但如果我们还想知道访问来源,在访问来源中统计出哪个省份或者城市的用户最多,就需要对IP地址进行识别了。
Logstash提供了插件 geoip,通过 **GeoLite2 **自动识别IP地址所在的区域,并自动添加需要的字段。示例配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
input {
stdin {} # 通过键盘输入
}
filter {
grok {
match => {
"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
}
}
# 通过date插件,把nginx日志中的时间戳用作 Logstash 的 event 时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "remote_addr"
target => "geoip"
}
}
output {
stdout {
codec => rubydebug
}
}
|
需要注意的是,必须要先完成日志内容的识别后,向geoip插件提供ip地址或者域名信息,提供到geoip的source字段,地址才可以被正确识别。同时通过target配置项指定geoip的识别结果会被组织到一个命名为geoip的字段中。
初次使用geoip的时候,可能需要等待几分钟的时候,待GeoLite2的数据库完成初始化,才可以正常工作。
经过地理位置识别后,返回的结果示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
{
"upstream_connect_time" => "0.000",
"http_x_forwarded_for" => "148.70.84.110, 127.0.0.1",
"response_code" => "200",
"request_method" => "POST",
"geoip" => {
"ip" => "148.70.84.110",
"geo" => {
"country_iso_code" => "CN",
"timezone" => "Asia/Shanghai",
"continent_code" => "AS",
"location" => {
"lat" => 34.7732,
"lon" => 113.722
},
"country_name" => "China"
}
},
"time_local" => "15/Jun/2023:15:05:52 +0800",
"message" => "148.70.84.110 - - [15/Jun/2023:15:05:52 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.038\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\"",
"body_sent_bytes" => "81",
"upstream_header_time" => "0.036",
"event" => {
"original" => "148.70.84.110 - - [15/Jun/2023:15:05:52 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.038\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\""
},
"http_referrer" => "-",
"remote_addr" => "148.70.84.110",
"upstream_response_time" => "0.036",
"remote_user" => "-",
"request_time" => "0.038",
"host" => {
"hostname" => "xjr-pc"
},
"@version" => "1",
"@timestamp" => 2023-06-15T07:05:52.000Z,
"uri" => "//third_party/store/getAllList",
"http_version" => "1.1",
"http_user_agent" => "Go-http-client/1.1"
}
|
参考文档:https://www.elastic.co/guide/en/logstash/8.8/plugins-filters-geoip.html
Pattern 集中管理
在 **pipeline **配置文件中使用 **grok **插件直接配置 **pattern **完成了对日志内容的识别
1
2
3
4
5
6
7
8
9
10
11
12
|
input {}
filter {
grok {
match => {
"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
}
}
...
}
output {}
|
这样的配置方式存在着不易维护的问题,当我们对同一种日志格式有多个 **pipeline 配置文件的时候,我们每次改动日志格式,都需要修改多个pipeline **配置文件,而且这种配置方式也使配置文件显得过于凌乱。
可以通过统一的一个地方去维护各个 Pattern:
- 创建文件 : /etc/logstash/pattern.d/mypattern
- 在文件 mypattern 中存放统一变量:
1
|
NGINXCOMBINEDLOG %{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"
|
- 修改 **pipeline **配置文件:
1
2
3
4
5
6
7
8
9
10
11
|
input {}
filter {
grok {
patterns_dir => [ "/etc/logstash/pattern.d" ]
match => { "message" => "%{NGINXCOMBINEDLOG}" }
}
...
}
output {}
|
这样,当我们需要修改pattern的时候,只需要修改一个文件,就可以在多个pipeline中生效了。
除此之外,logstash的开发者为我们提供了很多常见的日志格式的pattern,我们可以直接下载引用:
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
patterns_dir参考文档:https://www.elastic.co/guide/en/logstash/7.14/plugins-filters-grok.html#plugins-filters-grok-patterns_dir
添加or移除字段
在日常的工作中,我们一套业务都会有多个环境,一般最少会分为开发环境( **dev **)、测试环境( **test **)、生产环境( **products **)。虽然我们可以通过 **logstash 中自动生成的agent.hostname **字段去区分日志来源,但再考虑到其他的需求或场景,如:区分同一台服务器中的日志类型、对同一个日志文件里面的个别内容做差异化处理等,我们需要给日志增加字段。
FileBeat收集日志打上标识
在 **FileBeat **的配置文件中使用 **fields **配置项增加字段,**fields **的内容可以是字符串,数组,字典:
1
2
3
4
5
6
7
8
9
10
11
12
|
...
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
env: production
nginx_log_type: access
...
|
参考文档:https://www.elastic.co/guide/en/beats/filebeat/current/configuration-general-options.html#libbeat-configuration-fields
Logstash对自定义字段的处理
FileBeat 添加标识字段后,Logstash 配置中可以通过判断语句做不同的操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
...
filter {
if [fields][nginx_log_type] == "access" {
grok { ... }
}
}
output {
# Send production errors to pagerduty
if [fields][nginx_log_type] == "access" {
pagerduty {
...
}
}
}
...
|
参考文档:https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals
添加,移除,处理字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
...
filter{
geoip {
source => "http_x_forwarded_for"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
# 添加字段
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
# 字段类型转换
convert => [ "[geoip][coordinates]", "float" ]
convert => [ "response","integer" ]
convert => [ "bytes","integer" ]
replace => { "type" => "nginx_access" }
# 字段移除
remove_field => "message"
}
}
...
|
注意: 响应时间相关的最好可以转换成 float 类型,还有 body 体大小的数据最好也转换成对应的 integer 类型
参考文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-add_field
配置整合
FileBeat 配置文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
env: production
nginx_log_type: access
setup.template.settings:
index.number_of_shards: 1
output.logstash:
hosts: ["IP:端口"]
|
Logstash 配置文件:( **Pipelines **)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
input {
beats {
host => "0.0.0.0"
port => 5400 # 对应在filebeat的配置中,output到logstash的5400端口
}
}
filter {
# 数据结构化转换工具
grok {
patterns_dir => [ "/etc/logstash/pattern.d" ]
match => { "message" => "%{NGINXCOMBINEDLOG}" }
}
# 该过滤器从geoip中匹配ip字段,显示该ip的地理位置
geoip {
source => "remote_addr"
target => "geoip"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
# 数据的修改、删除、类型转换
mutate {
# 将坐标转为float类型
convert => [ "[geoip][coordinates]", "float" ]
convert => [ "body_sent_bytes","integer" ]
convert => [ "request_time","float" ]
convert => [ "upstream_response_time","float" ]
convert => [ "upstream_connect_time","float" ]
convert => [ "upstream_header_time","float" ]
# 替换一个字段
replace => { "type" => "nginx_access" }
# 移除一个字段
remove_field => "message"
}
date {
match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
}
}
# 输出数据
output {
# 输出到 ES 中
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "logstash-nginx-access-moda-%{+YYYY.MM}"
}
# stdout {codec => rubydebug}
}
|
** Logstash ** 配置文件:(Pattern)
1
|
NGINXCOMBINEDLOG %{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"
|
注意:
修改完毕后注意要重启服务;