【ELK】3-通过Geoip插件识别IP地址的地理位置

前言

在实际运维过程中,还需要进行一些必要的补充。

场景及解决方案


识别IP地址的地理位置

在常见的业务分析场景里面,我们往往需要对访问来源进行统计,比如说查出网站内访问量最高的文章,或者找出访问量最密集的时间点,甚至只是简单地统计页面打开的速度,我们都可以通过前面我们已经做好的日志内容检索去入手。但如果我们还想知道访问来源,在访问来源中统计出哪个省份或者城市的用户最多,就需要对IP地址进行识别了。 Logstash提供了插件 geoip,通过 **GeoLite2 **自动识别IP地址所在的区域,并自动添加需要的字段。示例配置如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
input {
	stdin {} # 通过键盘输入
}

filter {
    grok {

    	match => {
      	"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
  		}
  	}

  	# 通过date插件,把nginx日志中的时间戳用作 Logstash 的 event 时间戳
  	date {
    	match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }

    geoip {
      source => "remote_addr"
      target => "geoip"
    }
}

output {
	stdout {
  	codec => rubydebug
  }
}

需要注意的是,必须要先完成日志内容的识别后,向geoip插件提供ip地址或者域名信息,提供到geoipsource字段,地址才可以被正确识别。同时通过target配置项指定geoip的识别结果会被组织到一个命名为geoip的字段中。 初次使用geoip的时候,可能需要等待几分钟的时候,待GeoLite2的数据库完成初始化,才可以正常工作。 经过地理位置识别后,返回的结果示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
     "upstream_connect_time" => "0.000",
      "http_x_forwarded_for" => "148.70.84.110, 127.0.0.1",
             "response_code" => "200",
            "request_method" => "POST",
                     "geoip" => {
         "ip" => "148.70.84.110",
        "geo" => {
            "country_iso_code" => "CN",
                    "timezone" => "Asia/Shanghai",
              "continent_code" => "AS",
                    "location" => {
                "lat" => 34.7732,
                "lon" => 113.722
            },
                "country_name" => "China"
        }
    },
                "time_local" => "15/Jun/2023:15:05:52 +0800",
                   "message" => "148.70.84.110 - - [15/Jun/2023:15:05:52 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.038\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\"",
           "body_sent_bytes" => "81",
      "upstream_header_time" => "0.036",
                     "event" => {
        "original" => "148.70.84.110 - - [15/Jun/2023:15:05:52 +0800] \"POST //third_party/store/getAllList HTTP/1.1\" 200 81 \"-\" \"Go-http-client/1.1\" \"0.038\" \"0.036\" \"0.000\" \"0.036\" \"148.70.84.110, 127.0.0.1\""
    },
             "http_referrer" => "-",
               "remote_addr" => "148.70.84.110",
    "upstream_response_time" => "0.036",
               "remote_user" => "-",
              "request_time" => "0.038",
                      "host" => {
        "hostname" => "xjr-pc"
    },
                  "@version" => "1",
                "@timestamp" => 2023-06-15T07:05:52.000Z,
                       "uri" => "//third_party/store/getAllList",
              "http_version" => "1.1",
           "http_user_agent" => "Go-http-client/1.1"
}

参考文档:https://www.elastic.co/guide/en/logstash/8.8/plugins-filters-geoip.html

Pattern 集中管理

在 **pipeline **配置文件中使用 **grok **插件直接配置 **pattern **完成了对日志内容的识别

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
input {}

filter {
    grok {
    	match => {
      	"message" => '%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"'
  		}
  	}
  	...
}

output {}

这样的配置方式存在着不易维护的问题,当我们对同一种日志格式有多个 **pipeline 配置文件的时候,我们每次改动日志格式,都需要修改多个pipeline **配置文件,而且这种配置方式也使配置文件显得过于凌乱。 可以通过统一的一个地方去维护各个 Pattern:

  1. 创建文件 : /etc/logstash/pattern.d/mypattern
  2. 在文件 mypattern 中存放统一变量:
1
NGINXCOMBINEDLOG %{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"
  1. 修改 **pipeline **配置文件:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
input {}

filter {
  grok {
    patterns_dir => [ "/etc/logstash/pattern.d" ]
    match => { "message" => "%{NGINXCOMBINEDLOG}" }
  }
...
}

output {}
   这样,当我们需要修改pattern的时候,只需要修改一个文件,就可以在多个pipeline中生效了。

除此之外,logstash的开发者为我们提供了很多常见的日志格式的pattern,我们可以直接下载引用: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns patterns_dir参考文档https://www.elastic.co/guide/en/logstash/7.14/plugins-filters-grok.html#plugins-filters-grok-patterns_dir

添加or移除字段

在日常的工作中,我们一套业务都会有多个环境,一般最少会分为开发环境( **dev **)、测试环境( **test **)、生产环境( **products **)。虽然我们可以通过 **logstash 中自动生成的agent.hostname **字段去区分日志来源,但再考虑到其他的需求或场景,如:区分同一台服务器中的日志类型、对同一个日志文件里面的个别内容做差异化处理等,我们需要给日志增加字段。

FileBeat收集日志打上标识

在 **FileBeat **的配置文件中使用 **fields **配置项增加字段,**fields **的内容可以是字符串,数组,字典:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
...

filebeat.inputs:
- type: log
  enabled: true
  paths:
   - /var/log/nginx/access.log
  fields:
    env: production
    nginx_log_type: access

...

参考文档:https://www.elastic.co/guide/en/beats/filebeat/current/configuration-general-options.html#libbeat-configuration-fields

Logstash对自定义字段的处理

FileBeat 添加标识字段后,Logstash 配置中可以通过判断语句做不同的操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
...

filter {
  if [fields][nginx_log_type] == "access" {
    grok { ... }
  }
}

output {
  # Send production errors to pagerduty
  if [fields][nginx_log_type] == "access" {
    pagerduty {
    ...
    }
  }
}
...

参考文档:https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals

添加,移除,处理字段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...

filter{
  geoip {
          source => "http_x_forwarded_for"
          target => "geoip"
          database => "/etc/logstash/GeoLiteCity.dat"
        	
					# 添加字段
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
      }

  mutate {
        	# 字段类型转换
          convert => [ "[geoip][coordinates]", "float" ]
          convert => [ "response","integer" ]
          convert => [ "bytes","integer" ]
          replace => { "type" => "nginx_access" }

        	# 字段移除
          remove_field => "message"
      }
}

...

注意: 响应时间相关的最好可以转换成 float 类型,还有 body 体大小的数据最好也转换成对应的 integer 类型 参考文档:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html#plugins-filters-mutate-add_field

配置整合

FileBeat 配置文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7

filebeat.inputs:
- type: log
  enabled: true
  paths:
   - /var/log/nginx/access.log
  fields:
    env: production
    nginx_log_type: access

setup.template.settings:
  index.number_of_shards: 1

output.logstash:
  hosts: ["IP:端口"]

Logstash 配置文件:( **Pipelines **)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
input {
  beats {
    host => "0.0.0.0"
    port => 5400    # 对应在filebeat的配置中,output到logstash的5400端口
  }
}

filter {
  #  数据结构化转换工具
  grok { 
    patterns_dir => [ "/etc/logstash/pattern.d" ]
    match => { "message" => "%{NGINXCOMBINEDLOG}" }
  }

  # 该过滤器从geoip中匹配ip字段,显示该ip的地理位置
  geoip {
    source => "remote_addr"
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
  }

  # 数据的修改、删除、类型转换
  mutate {
    # 将坐标转为float类型
    convert => [ "[geoip][coordinates]", "float" ]
    convert => [ "body_sent_bytes","integer" ]
    convert => [ "request_time","float" ]
    convert => [ "upstream_response_time","float" ]
    convert => [ "upstream_connect_time","float" ]
    convert => [ "upstream_header_time","float" ]
    # 替换一个字段
    replace => { "type" => "nginx_access" }
    # 移除一个字段
    remove_field => "message"
  }

  date {
    match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
  }

}

# 输出数据
output {
  # 输出到 ES 中
  elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "logstash-nginx-access-moda-%{+YYYY.MM}"
  }
  # stdout {codec => rubydebug}
}

** Logstash ** 配置文件:(Pattern

1
NGINXCOMBINEDLOG %{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" "%{DATA:request_time}" "%{DATA:upstream_response_time}" "%{DATA:upstream_connect_time}" "%{DATA:upstream_header_time}" "%{DATA:http_x_forwarded_for}"

注意: 修改完毕后注意要重启服务;

0%