构建基于Docker的ELK日志分析服务

通过构建基于Docker的ELK日志分析服务,进一步增强对日志文件的分析、检索能力

abstract.png

容器化部署

所谓ELK指的是ElasticSearch、Logstash、Kibana,其中ElasticSearch用于数据的存储、检索,Logstash用于数据的结构化处理,Kibana则进行可视化展示。这里直接通过Docker Compose搭建ELK服务,值得一提的是需保证ElasticSearch、Logstash、Kibana各组件的版本完全一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Compose 版本
version: '3.8'

# 定义Docker服务
services:

# ElasticSearch 服务
ElasticSearch-Service:
build:
# 指定dockerfile的路径
context: ./ESDockerFile
# 指定dockerfile的文件名
dockerfile: Dockerfile
# 设置dockerfile所构建镜像的名称、tag
image: es-ik:7.12.0
container_name: ElasticSearch-Service
ports:
- 9200:9200
- 9300:9300
environment:
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
- "discovery.type=single-node"
- "TZ=Asia/Shanghai"
volumes:
# ES数据目录
- /Users/zgh/Docker/ELK/ES/data:/usr/share/elasticsearch/data
# ES日志目录
- /Users/zgh/Docker/ELK/ES/logs:/usr/share/elasticsearch/logs

# Logstash 服务
Logstash-Service:
image: logstash:7.12.0
container_name: Logstash-Service
ports:
- 5044:5044
environment:
- "xpack.monitoring.enabled=false"
- "TZ=Asia/Shanghai"
volumes:
# Logstash配置文件
- /Users/zgh/Docker/ELK/Logstash/conf/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
# 自定义的索引模板文件
- /Users/zgh/Docker/ELK/Logstash/conf/customTemplate.json:/usr/share/logstash/config/customTemplate.json
# 日志文件所在目录
- /Users/zgh/Docker/ELK/Logstash/LocalLog:/home/Aaron/LocalLog
depends_on:
- ElasticSearch-Service

# Kibana 服务
Kibana-Service:
image: kibana:7.12.0
container_name: Kibana-Service
ports:
- 5601:5601
environment:
- "ELASTICSEARCH_HOSTS=http://ElasticSearch-Service:9200"
- "TZ=Asia/Shanghai"
depends_on:
- ElasticSearch-Service

同时为对中文分词提供更好的支持,这里以ElasticSearch官方镜像为基础重新构建安装了ik分词器的ES镜像,Dockerfile如下所示

1
2
3
4
5
6
7
# 基础镜像环境: ES 7.12.0
FROM elasticsearch:7.12.0
# 切换目录
WORKDIR /usr/share/elasticsearch/plugins
# 安装与ES版本相同的IK分词器
RUN elasticsearch-plugin install -b \
https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.0/elasticsearch-analysis-ik-7.12.0.zip

上述两文件的相对路径关系如下图所示

figure 1.jpeg

Logstash

配置

Logstash作为一款主流的日志处理框架,通过提供形式多样的插件极大地丰富了表现力。具体地,我们通过logstash.conf配置文件进行配置。如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
input {
file {
# 容器下的日志路径, 支持递归匹配路径
path => "/home/Aaron/LocalLog/**/*.*"
# 排除压缩文件类型
exclude => ["*.zip", "*.7z", "*.rar"]
# 使用read模式,一方面每次都会从文件开始处读取,另一方面读取完后会自动删除日志文件
mode => "read"

# 文件块大小, Unit: 字节。4194304字节 = 4MB
file_chunk_size => 4194304

# 检查文件的频率
stat_interval => "200 ms"
# 发现新文件的频率: stat_interval * discover_interval
discover_interval => 1

# 合并多行数据
codec => multiline {
# 正则表达式: 以时间信息开头
pattern => "^%{TIMESTAMP_ISO8601}"
# true: 否定正则表达式, 即如果不匹配的话
negate => true
# 当negate为true, 即正则表达式不匹配时, 当前日志归属于上一条日志
what => "previous"
# 检测多行的等待时间阈值,Unit: s
auto_flush_interval => 1
}
}
}

filter {
# 解析日志进行结构化
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:logtime} %{LOGLEVEL:level} %{WORD:componentId}\.%{WORD:segmentId} \[%{DATA:threaad}\] \[%{DATA:method}\] %{GREEDYDATA:message}"
}
# 重写message字段
overwrite => ["message"]
}

# 通过指定字段计算Hash值
fingerprint {
method => "MURMUR3"
source => ["logtime", "threaad", "message"]
concatenate_sources => "true"
target => "[@metadata][fingerprint]"
}

# 给SQL日志打标签
if [message] =~ "==> .*" or [message] =~ "<== .*" {
mutate {
# 对tags字段添加值
add_tag => ["SQL"]
}
}

# 解析时间并转存到 @timestamp 字段
date {
# logtime字段按ISO8601格式进行解析
match => ["logtime", "ISO8601"]
# 将解析后的时间存储到给定的目标字段
target => "@timestamp"
# 删除logtime字段
remove_field => ["logtime"]
}

# 解析日志名称并转存到logFile字段
mutate {
# 日志文件路径按/切分
split => { "path" => "/" }
# 添加logFile日志文件名字段
add_field => { "logFile" => "%{[path][-1]}" }
# 删除无用字段
remove_field => ["host", "@version", "path"]
}
}

output {
elasticsearch {
# ES地址
hosts => ["http://ElasticSearch-Service:9200"]
# 索引名称
index => "logstash-%{componentId}"
# 将计算出的Hash值作为文档ID
document_id => "%{[@metadata][fingerprint]}"

# 使能模板管理模板
manage_template => true
# 模板名称
template_name => "custom_template"
# 容器下的模板文件路径
template => "/usr/share/logstash/config/customTemplate.json"
}

# Only For Debug
# stdout {
# codec => rubydebug {
# # 是否输出元数据字段
# metadata => true
# }
# }
}

Plugin

这里对上述配置文件所使用的各种插件进行介绍

File

File作为Input Plugin输入插件的一种,用于实现通过日志文件读取日志信息。具体地

  • path:指定日志文件所在路径,支持递归
  • mode:在read模式下,一方面每次都会从文件开始处读取,另一方面读取完后会自动删除日志文件。这里由于我们处理的是静态的日志文件,故非常适合使用read模式
  • exclude:根据文件名(非路径)排除文件,这里选择排除压缩类型的文件
  • stat_interval:检查文件更新的频率
  • discover_interval:发现新文件的频率,该值是stat_interval配置项的倍数

Multiline

通常日志都是单行的,但由于异常错误的堆栈信息的存在,如下所示。日志中存在多行类型日志的可能性,故这里通过Codec Plugin编解码插件中的Multiline进行处理,将多行合并为一行

1
2
3
4
5
2021-08-12T17:42:31.850+08:00 ERROR hapddg.hapddgweb [http-nio-8080-exec-124] [c.a.t.common.e.GlobalExceptionHandler:49] unexpected exception
org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe
at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:746)
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:360)
... 109 common frames omitted

对于Multiline而言,其常见选项如下所示

  • pattern:正则表达式。上文配置中我们使用的模式为以时间信息开头
  • negate:是否对正则表达式的结果进行否定。上文配置为true,即不匹配pattern配置项指定的正则表达式视为多行并进行处理
  • what:其可选值有:previous、next。用于多行日志合并时,当前行视为归属于上一行还是下一行
  • auto_flush_interval:在处理当前行时检测多行的等待时间阈值(Unit: s)。该配置项无默认值,故如果不显式设置,则会发现处理日志文件时,输出结果会丢失最后一行日志

Grok

Filter Plugin过滤器插件中最重要的就属Grok了,我们就是通过它进行正则捕获实现对日志数据的结构化处理。Grok内置定义了多种常见的正则匹配模式,以大大方便我们日常的使用。对于内置模式的使用语法如下,其中PATTERN_NAME为Grok内置的匹配模式的名称,并通过capture_name对捕获的文本进行命名。由于默认情况下,捕获结果都是保存为字符串类型,所以可以通过可选地data_type进行类型转换。但仅支持int、float两种类型

1
2
3
4
5
6
7
8
# 语法格式
%{PATTERN_NAME:capture_name:data_type}

# 使用.*正则进行捕获,并以message命名
%{GREEDYDATA:message}

# 使用.*正则进行捕获,并以age命名,最后将数据类型转换为int
%{GREEDYDATA:age:int}

对于内置模式无法满足的场景,Grok也支持用户自定义正则进行匹配。语法格式及示例如下。其中reg_exp为正则表达式

1
2
3
4
5
# 语法格式
(?<capture_name>reg_exp)

# 使用.*正则进行捕获,并以message命名
(?<message>.*)

这里就Grok中内置的常见模式进行介绍

  • TIMESTAMP_ISO8601:匹配ISO8601格式的时间。例如:2021-08-12T17:33:47.498+08:00
  • LOGLEVEL:匹配日志级别
  • WORD: 匹配字符串,包括数字、大小写字母、下划线
  • DATA、GREEDYDATA:从这两个模式的正则表达式.*?和.*就可以看出,后者是前者的贪婪模式版本

现在我们就可以利用Grok处理日志了,假设一条日志的格式如下所示

1
2021-08-12T17:33:47.498+08:00 INFO hapddg.hapddgweb [ActiveMQ Session Task-483] [c.a.s.h.EventHandler:187] add event message to queue

直接利用Grok内置的模式对上述日志格式进行结构化处理,如下所示。其中match配置项用于定义待匹配的字段、匹配方式。与此同时,由于已经将message中所有信息均捕获到各个字段了。为避免重复存储,我们可以通过overwrite重写默认的message字段

1
2
3
4
5
6
7
8
# 解析日志进行结构化
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:logtime} %{LOGLEVEL:level} %{WORD:componentId}\.%{WORD:segmentId} \[%{DATA:threaad}\] \[%{DATA:method}\] %{GREEDYDATA:message}"
}
# 重写message字段
overwrite => ["message"]
}

Fingerprint

Fingerprint同样是一种Filter Plugin,其通过指定字段计算出一个指纹值。后续我们会将计算出的指纹值作为文档ID,来实现重复文档的去重

  • method:指定计算指纹信息的算法,支持SHA1、SHA256、SHA384、SHA512、MD5、MURMUR3等。这里我们选用MURMUR3,其作为一种非加密型哈希算法,由于计算快、碰撞低等特点被广泛应用于哈希检索等场景
  • source:利用哪些字段计算指纹信息
  • concatenate_sources:计算指纹前是否将source配置项指定的所有字段的名称、值拼接为一个字符串,默认值为false。如果为false,则将只会使用source配置项中指定的最后一个字段进行指纹信息计算。故这里需要显式设为true
  • target:设置存储指纹值的字段名。这里使用了元数据字段

Logstash从1.5版本开始提供了一种特殊字段——@metadata元数据字段。@metadata中的内容不会对外进行输出(特殊条件下可以通过某种方式进行输出以便于调试)。故非常适合在Logstash DSL中作为临时字段使用。否则如果使用普通字段存储中间结果,还需要通过remove_field进行删除以避免对外输出。使用元数据字段也很简单,如果期望使用一个名为tempData1的元数据字段。则字段名即为[@metadata][tempData1]。当然其同样支持字段引用,通过[@metadata][tempData1]直接即可访问,而在字符串中的则可以通过%{[@metadata][tempData1]}方式进行字段引用。关于元字段的使用示例如下所示

1
2
3
4
5
6
7
8
filter{
mutate {
# 将message字段的值 存储到 [@metadata][tempData1] 字段中
add_field => { "[@metadata][tempData1]" => "%{[message]}" }
# 将 [@metadata][tempData1] 字段的值 存储到 outdata2 字段中
add_field => { "outdata2" => "%{[@metadata][tempData1]}" }
}
}

Mutate

Filter Plugin中的Mutate则可以对字段进行修改,包括但不限于split、join、rename、lowercase等操作。但需要注意的是在一个Mutate块内各操作不是按书写的顺序执行的,而是根据Mutate Plugin内部某种固定的预设顺序依次执行的。故如果期望控制各操作的执行顺序,可以定义多个Mutate块。因为各Mutate块的执行顺序是按书写顺序依次进行的。这里我们利用split对日志文件的path字段按路径分隔符进行切分,并通过下标-1访问数组中的最后一个元素取得日志文件名称,最后对原path字段进行删除

而对于各种不同类型的日志,我们期望能够进行分类、打标签。这样便于后续在检索日志时更加高效。具体地,对于SQL类型的日志而言,其结构特征为”==>”、”<==”开头。如下所示

1
2
3
2021-08-17T16:33:11.958+08:00 DEBUG hapddg.hapddgweb [http-nio-8080-exec-1] [c.a.t.m.a.A.deleteData:143] ==>  Preparing: delete from tb_person where id in ( ? , ? , ? ) 
2021-08-17T16:44:11.958+08:00 DEBUG hapddg.hapddgweb [http-nio-8080-exec-1] [c.a.t.m.a.A.deleteData:143] ==> Parameters: 22(Integer), 47(Integer), 60(Integer)
2021-08-17T16:55:11.959+08:00 DEBUG hapddg.hapddgweb [http-nio-8080-exec-1] [c.a.t.m.a.A.deleteData:143] <== Updates: 3

这样即可结合Logstash条件判断语句、Mutate的add_tag操作实现对SQL日志打标签,其中值为”SQL”。此外Logstash还提供了 =~、!~ 运算符分别用于匹配正则、不匹配正则

1
2
3
4
5
6
7
# 给SQL日志打标签
if [message] =~ "==> .*" or [message] =~ "<== .*" {
mutate {
# 对tags字段添加值
add_tag => ["SQL"]
}
}

Date

由于Logstash输出的@timestamp字段值默认为其处理日志文件过程的当前时间,故这里我们需要将日志中记录的时间信息写入@timestamp字段,以便后续的搜索。这里选择Filter Plugin中的Date实现对于时间字符串的解析处理

  • match:指定时间字段及其对应的时间格式。其中,支持的时间格式有:ISO8601(例如:2021-08-17T16:55:11.959+08:00)、UNIX(Unix纪元以来的秒数)、UNIX_MS(Unix纪元以来的毫秒数)等
  • target:存储解析后时间信息的字段

Elasticsearch

当Logstash完成日志的结构化处理后,即可输出到ElasticSearch。这里直接选择Elasticsearch Plugin即可。如下所示,这里我们指定了ES的地址、索引、文档ID等信息。同时还在其中定义了一个名为custom_template的ES索引模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
elasticsearch {
# ES地址
hosts => ["http://ElasticSearch-Service:9200"]
# 索引名称
index => "logstash-%{componentId}"
# 将计算出的Hash值作为文档ID
document_id => "%{[@metadata][fingerprint]}"

# 使能模板管理模板
manage_template => true
# 模板名称
template_name => "custom_template"
# 容器下的模板文件路径
template => "/usr/share/logstash/config/customTemplate.json"
}

Rubydebug

前面提到默认情况下,元数据字段不会进行输出。但实际上在调试过程中可以通过Codec Plugin编解码插件中的Rubydebug使能输出元数据字段。下面即是在一个标准输出Stdout中包含元数据字段输出的示例

1
2
3
4
5
6
stdout {
codec => rubydebug {
# 是否输出元数据字段
metadata => true
}
}

Index Template索引模板

在Logstash的DSL配置文件中我们通过Elasticsearch Plugin定义了一个ES索引模板,并且通过template字段指定在Logstash容器下的索引模板文件路径。具体地,索引模板文件customTemplate.json的内容如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
{
"index_patterns" : "logstash-*",
"order": 20,
"settings":{
"index.refresh_interval": "1s",
"index.number_of_replicas": 0,
"index.lifecycle.name": "autoRemoveOldData"
},
"mappings":{
"properties" : {
"@timestamp" : {
"type" : "date"
},
"level" : {
"type" : "keyword",
"norms" : false
},
"componentId" : {
"type" : "keyword",
"norms" : false
},
"segmentId" : {
"type" : "keyword",
"norms" : false
},
"threaad" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 556
}
},
"norms" : false
},
"method" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 556
}
},
"norms" : false
},
"message" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"logFile" : {
"type": "text",
"index" : false
},
"tags" : {
"type" : "keyword",
"norms" : false
}
}
}
}

部分配置项说明如下:

  • index_patterns:该模板匹配索引名称的通配符表达式
  • order:优先级,值越大优先级越高
  • index.refresh_interval:索引的刷新时间间隔
  • index.number_of_replicas:每个主分片的副本数
  • index.lifecycle.name:ILM索引生命周期中的策略名称

至此,ELK服务中的两个关键性配置文件均已介绍完毕,其在宿主机下的相关层次结构如下所示

figure 2.png

Kibana

本地化

通过Docker Compose创建、启动ELK服务所需的各容器。进入Kibana容器,修改/usr/share/kibana/config路径下的配置文件kibana.yml,增加 i18n.locale: “zh-CN” 配置项

figure 3.jpeg

修改完毕后,重启Kibana容器。然后通过 http://localhost:5601 访问Kibana的Web页面。可以看到,页面语言已经被修改为中文

figure 4.jpeg

个性化配置

点击【Management】下的【Stack Management】,然后进入【高级设置】进行个性化修改、并保存。具体地:

  • 将日期的显示格式修改为YYYY-MM-DD HH:mm:ss.SSS,并且将Monday周一作为一周的开始

figure 5.jpeg

  • 去除不必要的元字段展示,仅保留 _source, _id, _index 字段

figure 6.jpeg

  • 将各种通知信息的保留时间设为5秒

figure 7.jpeg

ILM索引生命周期管理

在上文的customTemplate.json索引模板文件中,我们通过index.lifecycle.name配置项定义、关联了一个名为autoRemoveOldData的ILM策略。可通过【Management】-【Stack Management】-【索引管理】-【索引模板】查看,如下所示

figure 8.jpeg

现在我们来创建该策略,通过【Management】-【开发工具】-【控制台】发送如下的PUT请求即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 创建名为autoRemoveOldData的策略:自动删除超过20天的索引
PUT _ilm/policy/autoRemoveOldData?pretty
{
"policy": {
"phases": {
"hot": {
"min_age": "0",
"actions": {}
},
"delete": {
"min_age": "20d",
"actions": {
"delete": {
"delete_searchable_snapshot": true
}
}
}
}
}
}

效果如下所示

figure 9.jpeg

通过【Management】-【Stack Management】-【索引生命周期策略】查看,可以看到相应的ILM策略已经创建成功

figure 10.jpeg

建立索引模式

现在,将我们用于测试的日志文件aaron.log放入LocalLog目录下,已便让ELK分析日志

figure 11.jpeg

其中测试的日志文件aaron.log的内容如下所示

1
2
3
4
5
6
7
8
9
10
11
12
2021-08-12T17:33:47.498+08:00 INFO hapddg.hapddgweb [ActiveMQ Session Task-483] [c.a.s.h.EventHandler:187] add event message to queue
2021-08-12T17:42:31.850+08:00 ERROR hapddg.hapddgweb [http-nio-8080-exec-124] [c.a.t.common.e.GlobalExceptionHandler:49] unexpected exception
org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe
at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:746)
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:360)
... 109 common frames omitted
2021-08-12T18:22:01.959+08:00 INFO hapddg.hapddgweb [http-nio-8080-exec-137] [c.a.t.s.CallBackServiceImpl:71] sublist is empty
2021-08-12T18:25:02.146+08:00 DEBUG hapddg.hapddgweb [http-nio-8080-exec-135] [c.a.t.s.CallBackServiceImpl:72] query down result is ok
2021-08-12T18:29:02.356+08:00 INFO hapddg.hapddgweb [http-nio-8080-exec-137] [c.a.t.s.CallBackServiceImpl:73] start task
2021-08-17T16:33:11.958+08:00 DEBUG hapddg.hapddgweb [http-nio-8080-exec-1] [c.a.t.m.a.A.deleteData:143] ==> Preparing: delete from tb_person where id in ( ? , ? , ? )
2021-08-17T16:44:11.958+08:00 DEBUG hapddg.hapddgweb [http-nio-8080-exec-1] [c.a.t.m.a.A.deleteData:143] ==> Parameters: 22(Integer), 47(Integer), 60(Integer)
2021-08-17T16:55:11.959+08:00 DEBUG hapddg.hapddgweb [http-nio-8080-exec-1] [c.a.t.m.a.A.deleteData:143] <== Updates: 3

当ELK读取完日志文件后会自动将其进行删除,同时通过【Management】-【Stack Management】-【索引管理】-【索引】可以查看相应的索引、文档已经建立完毕

figure 12.jpeg

通过【Management】-【Stack Management】-【索引模式】来建立索引模式

figure 13.jpeg

定义索引模式的名称为 logstash-*

figure 14.jpeg

同时将 @timestamp 字段作为全局时间筛选的时间字段

figure 15.jpeg

实践

只需第一次在Kibana中建立好索引模式即可,后续无需再次重复建立。现在通过【Analytics】-【Discover】即可进行日志的检索,效果如下所示

figure 16.jpeg

Note

Docker挂载Volume

挂载Volume前,一方面需要提前在宿主机下创建相应的目录、文件;另一方面需要将其加入Docker的File Sharing配置中,由于File Sharing对所配置目录的子目录同样会生效,故一般只需将父目录加入即可。设置方法如下所示

figure 17.jpeg

特别地,在Windows系统下挂载Volume时,需要注意路径的写法。例如,Windows系统的路径 D:\Docker\ELK 应该写为 /D/Docker/ELK

查看Logstash Plugin版本信息

进入Logstash容器后,可通过执行如下命令查看Logstash中各Plugin的版本信息

1
2
# 查看Logstash Plugin版本信息
/usr/share/logstash/bin/logstash-plugin list --verbose

调试Logstash Filter

对于Logstash DSL中的Filter Plugin,可通过创建一个临时的Logstash容器进行调试。具体地,输入、输出分别使用标准输入、标准输出,过滤器则为待调试的Filter DSL。示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
docker run --rm -it logstash:7.12.0  \
--path.settings= -e '
# 使用标准输入
input {
stdin {}
}
# 待调试的Filter DSL
filter {
mutate {
# 按/进行切分
split => { "message" => "/" }
# 取message数组最后一个元素值赋给新增字段logFile
add_field => { "logFile" => "%{[message][-1]}" }
}
}
# 使用标准输出
output {
stdout {}
}
'

其中,选项说明如下:

  • rm:容器退出后自动被删除
  • it:为容器分配一个伪输入终端,并保持容器的标准输入打开

调试效果如下所示

figure 18.jpeg

调试Grok

特别地,对于Filter Plugin中的Grok而言,Kibana还专门提供了一个在线调试工具——Grok Debugger。通过【Management】-【开发工具】-【Grok Debugger】进入即可使用,效果如下所示

figure 19.jpeg

参考文献

0%