ElasticSearch学习笔记之三:Logstash数据分析

打印 上一主题 下一主题

主题 666|帖子 666|积分 1998

第3章 Logstash数据分析

Logstash使用管道方式进行日记的搜集处理和输出。有点类似*NIX系统的管道下令 xxx | ccc | ddd,xxx实行完了会实行ccc,然后实行ddd。
在logstash中,包括了三个阶段:
输入input --> 处理filter(不是必须的) --> 输出output

每个阶段都由很多的插件配合工作,比如file、elasticsearch、redis等等。
每个阶段也可以指定多种方式,比如输出既可以输出到elasticsearch中,也可以指定到stdout在控制台打印。
   logstash支持多输入和多输出
  ELFK架构表示图:

1.Logstash基础摆设


  • 安装软件
  1. [root@host3 ~]# yum install logstash --enablerepo=es -y                         # 偶尔需要使用的仓库可以将它关闭,用到的时候临时打开
  2. [root@host3 ~]# ln -sv /usr/share/logstash/bin/logstash /usr/local/bin/        # 做软连接,命令就可以直接使用了
  3. "/usr/local/bin/logstash" -> "/usr/share/logstash/bin/logstash"
复制代码

  • 创建第一个配置文件
  1. [root@host3 ~]# vim 01-stdin-stdout.conf
  2. input {
  3.   stdin {}
  4. }
  5. output {
  6.   stdout {}
  7. }
复制代码

  • 测试配置文件
  1. [root@host3 ~]# logstash -tf 01-stdin-stdout.conf
复制代码

  • 自界说启动,这种方式通常用于实行环境,业务环境下,通常将配置修改后,使用systemctl来管理服务
  1. [root@host3 ~]# logstash -f 01-stdin-stdout.conf
  2. Using bundled JDK: /usr/share/logstash/jdk
  3. OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
  4. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
  5. Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
  6. [INFO ] 2022-09-15 21:49:37.109 [main] runner - Starting Logstash {"logstash.version"=>"7.17.6", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.16+8 on 11.0.16+8 +indy +jit [linux-x86_64]"}
  7. [INFO ] 2022-09-15 21:49:37.115 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
  8. [INFO ] 2022-09-15 21:49:37.160 [main] settings - Creating directory {:setting=>"path.queue", :path=>"/usr/share/logstash/data/queue"}
  9. [INFO ] 2022-09-15 21:49:37.174 [main] settings - Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/share/logstash/data/dead_letter_queue"}
  10. [WARN ] 2022-09-15 21:49:37.687 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
  11. [INFO ] 2022-09-15 21:49:38.843 [LogStash::Runner] Reflections - Reflections took 114 ms to scan 1 urls, producing 119 keys and 419 values
  12. [WARN ] 2022-09-15 21:49:39.658 [LogStash::Runner] line - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
  13. [WARN ] 2022-09-15 21:49:39.703 [LogStash::Runner] stdin - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
  14. Configuration OK
  15. [INFO ] 2022-09-15 21:49:39.917 [LogStash::Runner] runner - Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
  16. [root@host3 ~]# logstash -f 01-stdin-stdout.conf
  17. Using bundled JDK: /usr/share/logstash/jdk
  18. OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
  19. WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
  20. Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
  21. [INFO ] 2022-09-15 21:50:25.095 [main] runner - Starting Logstash {"logstash.version"=>"7.17.6", "jruby.version"=>"jruby 9.2.20.1 (2.5.8) 2021-11-30 2a2962fbd1 OpenJDK 64-Bit Server VM 11.0.16+8 on 11.0.16+8 +indy +jit [linux-x86_64]"}
  22. [INFO ] 2022-09-15 21:50:25.103 [main] runner - JVM bootstrap flags: [-Xms1g, -Xmx1g, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djdk.io.File.enableADS=true, -Djruby.compile.invokedynamic=true, -Djruby.jit.threshold=0, -Djruby.regexp.interruptible=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true]
  23. [WARN ] 2022-09-15 21:50:25.523 [LogStash::Runner] multilocal - Ignoring the 'pipelines.yml' file because modules or command line options are specified
  24. [INFO ] 2022-09-15 21:50:25.555 [LogStash::Runner] agent - No persistent UUID file found. Generating new UUID {:uuid=>"3fc04af1-7665-466e-839f-1eb42348aeb0", :path=>"/usr/share/logstash/data/uuid"}
  25. [INFO ] 2022-09-15 21:50:27.119 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
  26. [INFO ] 2022-09-15 21:50:28.262 [Converge PipelineAction::Create<main>] Reflections - Reflections took 110 ms to scan 1 urls, producing 119 keys and 419 values
  27. [WARN ] 2022-09-15 21:50:29.084 [Converge PipelineAction::Create<main>] line - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
  28. [WARN ] 2022-09-15 21:50:29.119 [Converge PipelineAction::Create<main>] stdin - Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
  29. [INFO ] 2022-09-15 21:50:29.571 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>250, "pipeline.sources"=>["/root/01-stdin-stdout.conf"], :thread=>"#<Thread:0x32e464e6 run>"}
  30. [INFO ] 2022-09-15 21:50:30.906 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>1.33}
  31. WARNING: An illegal reflective access operation has occurred
  32. WARNING: Illegal reflective access by com.jrubystdinchannel.StdinChannelLibrary$Reader (file:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/jruby-stdin-channel-0.2.0-java/lib/jruby_stdin_channel/jruby_stdin_channel.jar) to field java.io.FilterInputStream.in
  33. WARNING: Please consider reporting this to the maintainers of com.jrubystdinchannel.StdinChannelLibrary$Reader
  34. WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
  35. WARNING: All illegal access operations will be denied in a future release
  36. [INFO ] 2022-09-15 21:50:31.128 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
  37. The stdin plugin is now waiting for input:
  38. [INFO ] 2022-09-15 21:50:31.270 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
  39. abc
  40. {
  41.        "message" => " abc",
  42.       "@version" => "1",
  43.           "host" => "host3.test.com",
  44.     "@timestamp" => 2022-09-15T13:52:02.984Z
  45. }
  46. bbb
  47. {
  48.        "message" => "bbb",
  49.       "@version" => "1",
  50.           "host" => "host3.test.com",
  51.     "@timestamp" => 2022-09-15T13:52:06.177Z
  52. }
复制代码
2.输入类型

在上例中,输入类型是stdin,也就是手动输入,而在生产环境中,日记不大概通过手工输入的发生产生,因此stdin通常都是用于测试环境是否搭建成功,下面会介绍几种常见的输入类型。
2.1 file

  1. input {
  2.   file {
  3.     path => ["/tmp/test/*.txt"]
  4.     # 从最开始读日志文件(默认是末尾),仅在读取记录没有任何记录的情况下生效,也就是说,在服务停止的时候有新文件产生,服务器启动后可以读取到(旧文件不行)
  5.     start_position => "beginning"  
  6.   }
  7. }
复制代码
文件的读取记载放在/usr/share/logstash/data/plugins/inputs/file/.sincedb_3cd99a80ca58225ec14dc0ac340abb80中
  1. [root@host3 ~]# cat /usr/share/logstash/data/plugins/inputs/file/.sincedb_3cd99a80ca58225ec14dc0ac340abb80
  2. 5874000 0 64768 4 1663254379.147252 /tmp/test/1.txt
复制代码
2.2 tcp

和filebeat一样,Logstash同样支持监听TCP的某一个端口,用来吸收日记。可以同时监听多个端口
   这种方式通常用于无法安装客户端的服务器
    也可以使用HTTP协议,配置方法和TCP类似
  1. [root@host3 ~]#vim 03-tcp-stdout.conf
  2. input {
  3.   tcp {
  4.     port => 9999
  5.   }
  6. }
  7. output {
  8.   stdout {}
  9. }
复制代码
  1. [root@host2 ~]# telnet 192.168.19.103 9999
  2. Trying 192.168.19.103...
  3. Connected to 192.168.19.103.
  4. Escape character is '^]'.
  5. 123456
  6. test
  7. hello
复制代码
  1. {
  2.        "message" => "123456\r",
  3.       "@version" => "1",
  4.     "@timestamp" => 2022-09-15T15:30:23.123Z,
  5.           "host" => "host2",
  6.           "port" => 51958
  7. }
  8. {
  9.        "message" => "test\r",
  10.       "@version" => "1",
  11.     "@timestamp" => 2022-09-15T15:30:24.494Z,
  12.           "host" => "host2",
  13.           "port" => 51958
  14. }
  15. {
  16.        "message" => "hello\r",
  17.       "@version" => "1",
  18.     "@timestamp" => 2022-09-15T15:30:26.336Z,
  19.           "host" => "host2",
  20.           "port" => 51958
  21. }
复制代码
2.3 redis

Logstash支持直接从redis数据库中拿数据。支持三种redis数据类型:

  • list,表示的redis下令为blpop,代表从redis list的左边获取第一个元素,如无元素则壅闭;
  • channel,表示的redis下令为subscribe,代表从redis频道获取最新的数据;
  • pattern_channel,表示的redis下令为psubscribe,代表通过pattern正则表达式匹配频道,获取最新的数据。
数据类型之间的区别:

  • channel与pattern_channel的区别在于,pattern_channel可以通过正则表达式匹配多个频道,而channel是单一频道;
  • list与别的两个channel的区别在于,1个channel的数据会被多个订阅的logstash重复获取,1个list的数据被多个logstash获取时不会重复,会被分摊在各个Logstash中。
输入配置如下
  1. input {
  2.   redis {
  3.     data_type => "list"         # 指定数据类型
  4.     db => 5                     # 指定数据库,默认是0
  5.     host => "192.168.19.101"    # 指定redis服务器IP,默认是localhost
  6.     port => 6379
  7.     password => "bruce"
  8.     key => "test-list"
  9.   }
  10. }
复制代码
redis中追加数据
  1. [root@host1 ~]# redis-cli -h host1 -a bruce
  2. host1:6379> select 5
  3. OK
  4. host1:6379[5]> lpush test-list bruce
  5. (integer) 1
  6. host1:6379[5]> lrange test-list 0 -1
  7. (empty list or set)
  8. host1:6379[5]> lpush test-list hello
  9. (integer) 1
  10. host1:6379[5]> lrange test-list 0 -1                # 可以看到,Logstash获取数据后,会将列表清空
  11. (empty list or set)
  12. host1:6379[5]> lpush test-list '{"requestTime":"[12/Sep/2022:23:30:56 +0800]","clientIP":"192.168.19.1","threadID":"http-bio-8080-exec-7","protocol":"HTTP/1.1","requestMethod":"GET / HTTP/1.1","requestStatus":"404","sendBytes":"-","queryString":"","responseTime":"0ms","partner":"-","agentVersion":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"}'
复制代码
Logstash获取数据
  1. {
  2.        "message" => "bruce",
  3.     "@timestamp" => 2022-09-16T08:17:38.213Z,
  4.       "@version" => "1",
  5.           "tags" => [
  6.         [0] "_jsonparsefailure"
  7.     ]
  8. }
  9. # 非json格式数据会报错,但是能接收
  10. [ERROR] 2022-09-16 16:18:21.688 [[main]<redis] json - JSON parse error, original data now in message field {:message=>"Unrecognized token 'hello': was expecting ('true', 'false' or 'null')\n at [Source: (String)"hello"; line: 1, column: 11]", :exception=>LogStash::Json::ParserError, :data=>"hello"}
  11. {
  12.        "message" => "hello",
  13.     "@timestamp" => 2022-09-16T08:18:21.689Z,
  14.       "@version" => "1",
  15.           "tags" => [
  16.         [0] "_jsonparsefailure"
  17.     ]
  18. }
  19. # json格式的数据过来,Logstash可以自动解析
  20. {
  21.          "clientIP" => "192.168.19.1",
  22.       "requestTime" => "[12/Sep/2022:23:30:56 +0800]",
  23.       "queryString" => "",
  24.          "@version" => "1",
  25.      "agentVersion" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36",
  26.           "partner" => "-",
  27.        "@timestamp" => 2022-09-16T08:23:10.320Z,
  28.          "protocol" => "HTTP/1.1",
  29.     "requestStatus" => "404",
  30.          "threadID" => "http-bio-8080-exec-7",
  31.     "requestMethod" => "GET / HTTP/1.1",
  32.         "sendBytes" => "-",
  33.      "responseTime" => "0ms"
  34. }
复制代码
2.4 beats

在FileBeat中已经配置好了将日记输出到Logstash,在Logstash中,只必要吸收数据即可。
filebeat配置
  1. filebeat.inputs:
  2. - type: log
  3.   paths: /tmp/1.txt
  4. output.logstash:
  5.   hosts: ["192.168.19.103:5044"]
复制代码
Logstash配置
  1. input {
  2.   beats {
  3.     port => 5044
  4.   }
  5. }
复制代码
host2上在/tmp/1.txt中追加111,Logstash的输出
  1. {
  2.        "message" => "111",
  3.           "tags" => [
  4.         [0] "beats_input_codec_plain_applied"
  5.     ],
  6.          "agent" => {
  7.                   "id" => "76b7876b-051a-4df8-8b13-bd013ac5ec59",
  8.              "version" => "7.17.4",
  9.             "hostname" => "host2.test.com",
  10.                 "type" => "filebeat",
  11.                 "name" => "host2.test.com",
  12.         "ephemeral_id" => "437ac89f-7dc3-4898-a457-b2452ac4223b"
  13.     },
  14.          "input" => {
  15.         "type" => "log"
  16.     },
  17.           "host" => {
  18.         "name" => "host2.test.com"
  19.     },
  20.            "log" => {
  21.         "offset" => 0,
  22.           "file" => {
  23.             "path" => "/tmp/1.txt"
  24.         }
  25.     },
  26.       "@version" => "1",
  27.            "ecs" => {
  28.         "version" => "1.12.0"
  29.     },
  30.     "@timestamp" => 2022-09-16T08:53:20.975Z
  31. }
复制代码
3. 输出类型

3.1 redis

redis也可以作为输出类型,配置方式和输入类似
  1. output {
  2.   redis {
  3.     data_type => "list"
  4.     db => 6
  5.     host => "192.168.19.101"
  6.     port => 6379
  7.     password => "bruce"
  8.     key => "test-list"
  9.   }
  10. }
复制代码
查看redis数据库
  1. [root@host1 ~]# redis-cli -h host1 -a bruce
  2. host1:6379> select 6
  3. OK
  4. host1:6379[6]> lrange test-list 0 -1
  5. 1) "{"message":"1111","@version":"1","@timestamp":"2022-09-16T09:12:29.890Z","host":"host3.test.com"}"
复制代码
3.2 file

file类型是输出到本地磁盘保存。
  1. output {
  2.   file {
  3.     path => "/tmp/test-file.log"
  4.   }
  5. }
复制代码
3.3 elasticsearch

  1. output {
  2.   elasticsearch {
  3.     hosts => ["192.168.19.101:9200","192.168.19.102:9200","192.168.19.103:9200"]
  4.     index => "centos-logstash-elasticsearh-%{+YYYY.MM.dd}"
  5.   }
  6. }
复制代码
4. filter

filter是一个可选插件,在吸收到日记信息后,可以对日记进行格式化,然后再输出。
4.1 grok

grok可以用来解析任意文本并进行结构化。该工具适合syslog日记、Apache和其他网络服务器日记。
①简单示例

  1. input {
  2.   file {
  3.     path => ["/var/log/nginx/access.log*"]
  4.     start_position => "beginning"
  5.   }
  6. }
  7. filter {
  8.   grok {
  9.     match => {
  10.       "message" => "%{COMBINEDAPACHELOG}"
  11.       # "message" => "%{HTTPD_COMMONLOG}"                # 新版本Logstash可能会用这个变量
  12.     }
  13.   }
  14. }
  15. output {
  16.   stdout {}
  17.   elasticsearch {
  18.     hosts => ["192.168.19.101:9200","192.168.19.102:9200","192.168.19.103:9200"]
  19.     index => "nginx-logs-es-%{+YYYY.MM.dd}"
  20.   }
  21. }
复制代码
解析出来的结果:
  1. {
  2.         "request" => "/",
  3.           "bytes" => "4833",
  4.        "@version" => "1",
  5.            "auth" => "-",
  6.           "agent" => ""curl/7.29.0"",
  7.            "path" => "/var/log/nginx/access.log-20220913",
  8.           "ident" => "-",
  9.            "verb" => "GET",
  10.         "message" => "192.168.19.102 - - [12/Sep/2022:21:48:29 +0800] "GET / HTTP/1.1" 200 4833 "-" "curl/7.29.0" "-"",
  11.     "httpversion" => "1.1",
  12.            "host" => "host3.test.com",
  13.      "@timestamp" => 2022-09-16T14:27:43.208Z,
  14.        "response" => "200",
  15.       "timestamp" => "12/Sep/2022:21:48:29 +0800",
  16.        "referrer" => ""-"",
  17.        "clientip" => "192.168.19.102"
  18. }
复制代码

②预界说字段

grok是基于正则表达式来进行匹配,它的语法格式是%{SYNTAX:SEMANTIC}


  • SYNTAX是将匹配您的文本的模式的名称,这是内置好的语法,官方支持120种字段。
  • SEMANTIC是您为要匹配的文本提供的标识符,也就是你要给它去的名字。
示例:

  • 日记源文件
  1. 55.3.244.1 GET /index.html 15824 0.043
复制代码

  • 匹配的字段应该是
  1.     %{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
复制代码

  • 配置文件
  1. input {
  2.   stdin {}
  3. }
  4. filter {
  5.   grok {
  6.     match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  7.   }
  8. }
  9. output {
  10.   stdout {}
  11. }
复制代码

  • 匹配出来的结果
  1. 55.3.244.1 GET /index.html 15824 0.043
  2. {       "message" => "55.3.244.1 GET /index.html 15824 0.043
  3. ",      "@version" => "1",    "@timestamp" => 2022-09-16T14:46:46.426Z,        "method" => "GET",       "request" => "/index.html",         "bytes" => "15824",      "duration" => "0.043",          "host" => "host3.test.com",        "client" => "55.3.244.1"}
复制代码
针对不同服务的日记,可以查看官方文档的界说:
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
③自界说字段

当预界说的字段不符合要求时,grok也支持自界说正则表达式来匹配日记信息

  • 首先必要创建自界说表达式保存的目录,并将表达式写进去
  1. [root@host3 ~]# mkdir patterns
  2. [root@host3 ~]# echo "POSTFIX_QUEUEID [0-9A-F]{10,11}" >> ./patterns/1
复制代码

  • 修改配置文件
  1. input {
  2.   stdin {}
  3. }
  4. filter {
  5.   grok {
  6.     patterns_dir => ["/root/patterns"]                                                                                        # 指定表达式位置
  7.     match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }        # 这里有系统预定义的,也有自定义的表达式,大括号外的字符就是常规的字符,需要逐个匹配,如冒号:
  8.   }
  9. }
  10. output {
  11.   stdout {}
  12. }
复制代码

  • 运行并测试
  1. ...
  2. The stdin plugin is now waiting for input:
  3. [INFO ] 2022-09-16 23:22:04.511 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
  4. Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
  5. {
  6.            "message" => "Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>",
  7.               "host" => "host3.test.com",
  8.          "timestamp" => "Jan  1 06:25:43",
  9.           "queue_id" => "BEF25A72965",                        # 自定义表达式匹配的字段
  10.          "logsource" => "mailserver14",
  11.         "@timestamp" => 2022-09-16T15:22:19.516Z,
  12.            "program" => "postfix/cleanup",
  13.                "pid" => "21403",
  14.           "@version" => "1",
  15.     "syslog_message" => "message-id=<20130101142543.5828399CCAF@mailserver14.example.com>"
  16. }
复制代码
4.2 通用字段

顾名思义,这些字段可以用在全部属于filter的插件中。


  • remove_field
  1. filter {
  2.   grok {
  3.     remove_field => ["@version","tag","agent"]
  4.   }
  5. }
复制代码


  • add_field
  1. filter {
  2.   grok {
  3.     add_field => ["new_tag" => "hello world %{YYYY.mm.dd}"]
  4.   }
  5. }
复制代码
4.3 date

在数据中,会有两个时间戳timestamp和@timestamp,日记产生的时间和数据采集的时间,这两个时间大概会不同等。
date插件可以用来转换日记记载中的时间字符串,参考@timestamp字段里的时间。date插件支持五种时间格式:


  • ISO8601
  • UNIX
  • UNIX_MS
  • TAI64N
  1. input {
  2.   file {
  3.     path => "/var/log/nginx/access.log*"
  4.     start_position => "beginning"
  5.   }
  6. }
  7. filter {
  8.   grok {
  9.     match => { "message" => "%{HTTPD_COMMONLOG}" }
  10.     remove_field => ["message","ident","auth","@version","path"]
  11.   }
  12.   date {
  13.     match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z" ]               
  14.     # timestamp必须是现有的字段,这里只是对这个字段的时间进行校正,且需要和timestamp字段的原数据格式一致,否则会报解析错误
  15.     # timestamp原来的数据格式为"17/Sep/2022:18:42:26 +0800",因此时区改成ZZZ就会一直报错,因为ZZZ代表Asia/Shanghai这种格式,Z代表+0800
  16.     timezone => "Asia/Shanghai"
  17.   }
  18. }
  19. output {
  20.   stdout {}
  21. }
复制代码
输出的格式:
  1. {
  2.       "timestamp" => "17/Sep/2022:18:42:26 +0800", #和@timestamp有8小时的时间差,可到Elasticsearch中查看,如果也有时间差,可以在date中修改timezone
  3.        "response" => "200",
  4.     "httpversion" => "1.1",
  5.        "clientip" => "192.168.19.102",
  6.            "verb" => "GET",
  7.            "host" => "host3.test.com",
  8.         "request" => "/",
  9.      "@timestamp" => 2022-09-17T10:42:26.000Z,
  10.           "bytes" => "4833"
  11. }
复制代码
使用target将匹配到的时间字段解析后存储到目的字段,若不指定,默认是@timestamp字段。这个字段在Kibana中创建索引时可以用到
  1.   date {
  2.     match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z" ]
  3.     timezone => "Asia/Shanghai"
  4.     target => "logtime"
  5.   }
  6. # 结果
  7. {
  8.       "timestamp" => "17/Sep/2022:21:15:30 +0800",
  9.        "response" => "200",
  10.         "logtime" => 2022-09-17T13:15:30.000Z,                # 日志产生的时间
  11.     "httpversion" => "1.1",
  12.        "clientip" => "192.168.19.102",
  13.            "verb" => "GET",
  14.            "host" => "host3.test.com",
  15.         "request" => "/",
  16.      "@timestamp" => 2022-09-17T13:15:31.357Z,                # 日志记录的时间,可以看到和日志产生的时间有一定的延迟
  17.           "bytes" => "4833"
  18. }
复制代码
4.4 geoip

用来解析访问IP的位置信息。这个插件是依靠GeoLite2城市数据库,信息不一定准确,也可以自己下载MaxMind格式的数据库然后应用,官方网站有自界说数据库的引导手册。
  1. input {
  2.   file {
  3.     path => "/var/log/nginx/access.log*"
  4.     start_position => "beginning"
  5.   }
  6. }
  7. filter {
  8.   grok {
  9.     match => { "message" => "%{HTTPD_COMMONLOG}" }
  10.     remove_field => ["message","ident","auth","@version","path"]
  11.   }
  12.   geoip {
  13.     source => "clientip"                         # IP地址的源参考clientip字段
  14.     # fields => ["country_name" ,"timezone", "city_name"]                # 可以选择显示的字段
  15.   }
  16. }
  17. output {
  18.   stdout {}
  19. }
复制代码
得到的结果,可以看到,私有地址无法正常解析
  1. {
  2.       "timestamp" => "17/Sep/2022:21:15:30 +0800",
  3.        "response" => "200",
  4.           "geoip" => {},
  5.     "httpversion" => "1.1",
  6.        "clientip" => "192.168.19.102",
  7.            "verb" => "GET",
  8.            "host" => "host3.test.com",
  9.            "tags" => [
  10.         [0] "_geoip_lookup_failure"                                # 私网地址
  11.     ],
  12.         "request" => "/",
  13.      "@timestamp" => 2022-09-17T13:30:05.178Z,
  14.           "bytes" => "4833"
  15. }
  16. {
  17.       "timestamp" => "17/Sep/2022:21:15:30 +0800",
  18.        "response" => "200",
  19.           "geoip" => {                                        # 解析的结果放在geoip中
  20.          "country_code2" => "CM",
  21.          "country_code3" => "CM",
  22.           "country_name" => "Cameroon",
  23.                     "ip" => "154.72.162.134",
  24.               "timezone" => "Africa/Douala",
  25.               "location" => {
  26.             "lon" => 12.5,
  27.             "lat" => 6.0
  28.         },
  29.         "continent_code" => "AF",
  30.               "latitude" => 6.0,
  31.              "longitude" => 12.5
  32.     },
  33.     "httpversion" => "1.1",
  34.        "clientip" => "154.72.162.134",
  35.            "verb" => "GET",
  36.            "host" => "host3.test.com",
  37.         "request" => "/",
  38.      "@timestamp" => 2022-09-17T13:30:05.178Z,
  39.           "bytes" => "4833"
  40. }
复制代码
4.5 useragent

用来解析浏览器的信息。前提是输出的信息有浏览器信息字段。
  1. input {
  2.   file {
  3.     path => "/var/log/nginx/access.log*"
  4.     start_position => "beginning"
  5.   }
  6. }
  7. filter {
  8.   grok {
  9.     match => { "message" => "%{HTTPD_COMBINEDLOG}" }                # HTTPD_COMBINEDLOG可以解析浏览器
  10.     remove_field => ["message","ident","auth","@version","path"]
  11.   }
  12.   useragent {
  13.     source => "agent"                                                        # 指定浏览器信息在哪个字段中,这个字段必须要存在
  14.     target => "agent_test"                                                # 为了方便查看,将所有解析后的信息放到这个字段里面去
  15.   }
  16. }
  17. output {
  18.   stdout {}
  19. }
复制代码
得到的结果:
  1. {
  2.       "timestamp" => "17/Sep/2022:23:42:31 +0800",
  3.        "response" => "404",
  4.           "geoip" => {},
  5.     "httpversion" => "1.1",
  6.        "clientip" => "192.168.19.103",
  7.            "verb" => "GET",
  8.           "agent" => ""Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Firefox/60.0"",
  9.            "host" => "host3.test.com",
  10.         "request" => "/favicon.ico",
  11.        "referrer" => ""-"",
  12.      "@timestamp" => 2022-09-17T15:42:31.927Z,
  13.           "bytes" => "3650",
  14.      "agent_test" => {
  15.           "major" => "60",
  16.            "name" => "Firefox",
  17.              "os" => "Linux",
  18.         "os_full" => "Linux",
  19.         "os_name" => "Linux",
  20.         "version" => "60.0",
  21.           "minor" => "0",
  22.          "device" => "Other"
  23.     }
  24. }
  25. {
  26. {
  27. ...
  28.      "agent_test" => {
  29.           "major" => "60",
  30.            "name" => "Firefox",
  31.              "os" => "Linux",
  32.         "os_full" => "Linux",
  33.         "os_name" => "Linux",
  34.         "version" => "60.0",
  35.           "minor" => "0",
  36.          "device" => "Other"
  37.     }
  38. }
  39. {
  40. ...
  41.      "agent_test" => {
  42.           "os_minor" => "0",
  43.            "os_full" => "iOS 16.0",
  44.            "version" => "16.0",
  45.           "os_major" => "16",
  46.             "device" => "iPhone",
  47.              "major" => "16",
  48.               "name" => "Mobile Safari",
  49.                 "os" => "iOS",
  50.         "os_version" => "16.0",
  51.            "os_name" => "iOS",
  52.              "minor" => "0"
  53.     }
  54. }
  55. {
  56. ...
  57.      "agent_test" => {
  58.              "patch" => "3987",
  59.            "os_full" => "Android 10",
  60.            "version" => "80.0.3987.162",
  61.           "os_major" => "10",
  62.             "device" => "Samsung SM-G981B",
  63.              "major" => "80",
  64.               "name" => "Chrome Mobile",
  65.                 "os" => "Android",
  66.         "os_version" => "10",
  67.            "os_name" => "Android",
  68.              "minor" => "0"
  69.     }
  70. }
复制代码
4.6 mutate


  • 切割自定的字段
  1. input {
  2.   stdin {}
  3. }
  4. filter {
  5.   mutate {
  6.     split => {
  7.       message =>  " "                # 将message消息以空格作为分隔符进行分割
  8.     }
  9.     remove_field => ["@version","host"]
  10.     add_field => {
  11.       "tag" => "This a test field from Bruce"
  12.     }
  13.   }
  14. }
  15. output {
  16.   stdout {}
  17. }
复制代码
  1. 111 222 333
  2. {
  3.            "tag" => "This a test field from Bruce",
  4.        "message" => [
  5.         [0] "111",
  6.         [1] "222",
  7.         [2] "333"
  8.     ],
  9.     "@timestamp" => 2022-09-18T08:07:36.373Z
  10. }
复制代码

  • 将切割后的数据取出来
  1. input {
  2.   stdin {}
  3. }
  4. filter {
  5.   mutate {
  6.     split => {
  7.       message =>  " "                # 将message消息以空格作为分隔符进行分割
  8.     }
  9.     remove_field => ["@version","host"]
  10.     add_field => {
  11.       "tag" => "This a test field from Bruce"
  12.     }
  13.   }
  14.   mutate {
  15.     add_field => {
  16.       "name" => "%{[message][0]}"
  17.       "age" => "%{[message][1]}"
  18.       "sex" => "%{[message][2]}"
  19.     }
  20.   }
  21. }
  22. output {
  23.   stdout {}
  24. }
复制代码
  1. bruce 37 male
  2. {
  3.        "message" => [
  4.         [0] "bruce",
  5.         [1] "37",
  6.         [2] "male"
  7.     ],
  8.            "age" => "37",
  9.     "@timestamp" => 2022-09-18T08:14:31.230Z,
  10.            "sex" => "male",
  11.            "tag" => "This a test field from Bruce",
  12.           "name" => "bruce"
  13. }
复制代码

  • convert:将字段的值转换成不同的类型,例如将字符串转换成证书,如字段值是一个数组,全部成员都会被转换。如果该字段是散列,则不会采取任何动作
  1. filter {
  2.   mutate {
  3.     convert => {
  4.       "age" => "integer"                        # 将age转换成数字类型
  5.     }
  6.   }
  7. }
复制代码
  1. bruce 20 male
  2. {
  3.        "message" => [
  4.         [0] "bruce",
  5.         [1] "20",
  6.         [2] "male"
  7.     ],
  8.            "sex" => "male",
  9.           "name" => "bruce",
  10.            "age" => 20,                                        # 没有引号,代表已经修改成数字类型了
  11.     "@timestamp" => 2022-09-18T08:51:07.633Z,
  12.            "tag" => "This a test field from Bruce"
  13. }
复制代码

  • strip:剔除字段中的前导和尾随的空格
  1. filter {
  2.   mutate {
  3.     strip => { "name","sex" }
  4.   }
  5. }
复制代码

  • rename:修改字段名
  1. filter {
  2.   mutate {
  3.     rename => { "sex" => "agenda" }
  4.   }
  5. }
复制代码

  • replace:替换字段内容
  1. filter {
  2.   mutate {
  3.     replace => { "tag" => "This is test message" }                # 修改了tag字段的内容
  4.   }
  5. }
复制代码

  • update:用法和replace一样,区别在于如果字段存在则修改内容,如果过不存在则忽略此操作
  • uppercase/lowercase:转换成大写/小写;capitalize:首字母大写。转换的是字段内容
  1. filter {
  2.   mutate {
  3.     uppercase => "tag"
  4.     capitalize => "name"
  5.   }
  6. }
复制代码
5 高级特性

5.1 判定语法

在input中打上标记后,可以在output和filter中通过判定语句来做区别化的处理
  1. input {
  2.   beats {
  3.     port => 8888
  4.     type => "nginx-beats"
  5.   }
  6.   tcp {
  7.     port => 9999
  8.     type => "tomcat-tcp"
  9.   }
  10. }
  11. output {
  12.   if [type] == "nginx-beats" {
  13.     elasticsearch {
  14.       hosts => ["192.168.19.101:9200","192.168.19.102:9200","192.168.19.103:9200"]
  15.       index => "nginx-beats-elasticsearh-%{+YYYY.MM.dd}"
  16.     }
  17.   } else {
  18.     elasticsearch {
  19.       hosts => ["192.168.19.101:9200","192.168.19.102:9200","192.168.19.103:9200"]
  20.       index => "tomcat-tcp-elasticsearh-%{+YYYY.MM.dd}"
  21.   }
  22. }
复制代码
5.2 多实例运行

Logstash支持多实例运行,但是如果直接启动,第二个实例会报错,必要指定path.data的路径才气正常启动。
  1. [root@host3 ~]# logstash -f 01-stdin-stdout.conf --path.data /tmp/logstash
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表