日记收集条记(Filebeat 日记收集、Logstash 日记过滤)

张春  金牌会员 | 2024-7-22 12:27:59 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 844|帖子 844|积分 2532

1 FileBeat

Filebeat 是使用 Golang 实现的轻量型日记采集器,也是 Elasticsearch stack 内里的一员。本质上是一个 agent ,可以安装在各个节点上,根据设置读取对应位置的日记,并上报到相应的地方去。
1.1 FileBeat 安装与使用

从 官网 下载对应的版本,我这里的 ElasticSearch 版本号是 6.4.3,所以下载 FileBeat 的版本也是 6.4.3。
下载后解压:
  1. cd /home/software
  2. tar -zxvf filebeat-6.4.3-linux-x86_64.tar.gz -C /usr/local/
  3. cd /usr/local
  4. mv filebeat-6.4.3-linux-x86_64/ filebeat-6.4.3
复制代码
设置 Filebeat,可以参考 filebeat.full.yml 中的设置
  1. vim /usr/local/filebeat-6.4.3/filebeat.yml
复制代码
内容如下:
  1. ###################### Filebeat Configuration Example #########################
  2. filebeat.prospectors:
  3. - input_type: log
  4.   paths:
  5.     ## 定义了日志文件路径,可以采用模糊匹配模式,如*.log
  6.     - /workspaces/logs/logCollector/app-collector.log
  7.   #定义写入 ES 时的 _type 值
  8.   document_type: "app-log"
  9.   multiline:
  10.     #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
  11.     pattern: '^\['                              # 指定匹配的表达式(匹配以 [ 开头的字符串)
  12.     negate: true                                # 是否需要匹配到
  13.     match: after                                # 不匹配的行,合并到上一行的末尾
  14.     max_lines: 2000                             # 最大的行数
  15.     timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志
  16.   fields: ## topic 对应的消息字段或自定义增加的字段
  17.     logbiz: collector
  18.     logtopic: app-log-collector   ## 按服务划分用作kafka topic,会在logstash filter 过滤数据时候作为 判断参数 [fields][logtopic]
  19.     evn: dev
  20. - input_type: log
  21.   paths:
  22.     ## 定义了日志文件路径,可以采用模糊匹配模式,如*.log
  23.     - /workspaces/logs/logCollector/error-collector.log
  24.   #定义写入 ES 时的 _type 值
  25.   document_type: "error-log"
  26.   multiline:
  27.     #pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
  28.     pattern: '^\['                              # 指定匹配的表达式(匹配以 [ 开头的字符串)
  29.     negate: true                                # 是否匹配到
  30.     match: after                                # 不匹配的行,合并到上一行的末尾
  31.     max_lines: 2000                             # 最大的行数
  32.     timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志,直接进行推送操作
  33.   fields: ## topic 对应的消息字段或自定义增加的字段
  34.     logbiz: collector
  35.     logtopic: error-log-collector   ## 按服务划分用作kafka topic
  36.     evn: dev
  37. output.kafka: ## filebeat 支持多种输出,支持向 kafka,logstash,elasticsearch 输出数据,此处设置数据输出到 kafka。
  38.   enabled: true ## 启动这个模块
  39.   hosts: ["192.168.212.128:9092"] ## 地址
  40.   topic: '%{[fields.logtopic]}'  ## 主题(使用动态变量)
  41.   partition.hash:  ## kafka 分区 hash 规则
  42.     reachable_only: true
  43.   compression: gzip  ## 数据压缩
  44.   max_message_bytes: 1000000  ## 最大容量
  45.   required_acks: 1  ## 是否需要 ack
  46. logging.to_files: true
复制代码
检查设置是否正确:
  1. ./filebeat -c filebeat.yml -configtest
复制代码
启动filebeat:
  1. /usr/local/filebeat-6.4.3/filebeat &
复制代码
  注:需要启动 kafka
  2 Logstash 日记过滤

在 《Elasticsearch入门条记(Logstash数据同步)》 这边文章中已经先容过任何安装设置 Logstash 了,不外当时输入的数据源是来自于 MySQL,此时输入的数据源是 Kafka。
在 /usr/local/logstash-6.4.3 目次下新建一个 script 用于存放对接 Kafka 的设置文件。
以下是该目次下创建的 logstash-script.conf 文件:
  1. ## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
  2. input {
  3.   kafka {
  4.     topics_pattern => "app-log-.*"  ## kafka 主题 topic
  5.     bootstrap_servers => "192.168.212.128:9092"  ## kafka 地址
  6.     codec => json  ## 数据格式
  7.     consumer_threads => 1  ## 增加consumer的并行消费线程数(数值可以设置为 kafka 的分片数)
  8.     decorate_events => true
  9.     group_id => "app-log-group" ## kafka 组别
  10.   }
  11.   kafka {
  12.     topics_pattern => "error-log-.*"  ## kafka 主题 topic
  13.     bootstrap_servers => "192.168.212.128:9092" ## kafka 地址
  14.     codec => json  ## 数据格式
  15.     consumer_threads => 1  ## 增加consumer的并行消费线程数(数值可以设置为 kafka 的分片数)
  16.     decorate_events => true
  17.     group_id => "error-log-group" ## kafka 组别
  18.   }
  19. }
  20. filter {
  21.   ## 时区转换,这里使用 ruby 语言,因为 logstash 本身是东八区的,这个时区比北京时间慢8小时,所以这里采用 ruby 语言设置为北京时区
  22.   ruby {
  23.     code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
  24.   }
  25.   ## [fields][logtopic] 这个是从 FileBeat 定义传入 Kafka 的
  26.   if "app-log" in [fields][logtopic]{
  27.     grok {
  28.       ## 表达式,这里对应的是Springboot输出的日志格式
  29.       match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
  30.     }
  31.   }
  32.   ## [fields][logtopic] 这个是从 FileBeat 定义传入 Kafka 的
  33.   if "error-log" in [fields][logtopic]{
  34.     grok {
  35.       ## 表达式
  36.       match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
  37.     }
  38.   }
  39. }
  40. ## 测试输出到控制台:
  41. ## 命令行输入 ./logstash -f /usr/local/logstash-6.4.3/script/logstash-script.conf  --verbose --debug
  42. output {
  43.   stdout { codec => rubydebug }
  44. }
  45. ## elasticsearch:
  46. output {
  47.   if "app-log" in [fields][logtopic]{
  48.     ## es插件
  49.     elasticsearch {
  50.       # es服务地址
  51.       hosts => ["192.168.212.128:9200"]
  52.       ## 索引名,%{index_time} 是由上面配置的 ruby 脚本定义的日期时间,即每天生成一个索引
  53.       index => "app-log-%{[fields][logbiz]}-%{index_time}"
  54.       # 是否嗅探集群ip:一般设置true
  55.       # 只需要知道一台 elasticsearch 的地址,就可以访问这一台对应的整个 elasticsearch 集群
  56.       sniffing => true
  57.       # logstash默认自带一个mapping模板,进行模板覆盖
  58.       template_overwrite => true
  59.     }
  60.   }
  61.   
  62.   if "error-log" in [fields][logtopic]{
  63.     elasticsearch {
  64.       hosts => ["192.168.212.128:9200"]
  65.       index => "error-log-%{[fields][logbiz]}-%{index_time}"
  66.       sniffing => true
  67.       template_overwrite => true
  68.     }
  69.   }
  70. }
复制代码
检察一下此中的过滤规则,这需要联合 《日记收集条记(架构计划、Log4j2项目初始化、Lombok)》 文章中的定义日记输出格式一起看:
  1. ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
复制代码


  • "message":logstash 固定的格式,统一叫传入的数据为 message
  • \[%{NOTSPACE:currentDateTime}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 currentDateTime
  • \[%{NOTSPACE:level}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 level,日记级别
  • \[%{NOTSPACE:thread-id}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 thread-id,线程ID
  • \[%{NOTSPACE:class}\]:匹配 [ 为开头,] 为结尾,NOTSPACE 表示不能有空格,赋值变量名为 class,创建对应 logger 实例传入的 class
  • \[%{DATA:hostName}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 level,当前应用主机名称
  • \[%{DATA:ip}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 level,当前应用的 IP
  • \[%{DATA:applicationName}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 level,当前应用的 applicationName
  • \[%{DATA:location}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 location
  • \[%{DATA:messageInfo}\]:匹配 [ 为开头,] 为结尾,DATA 表示数据,可为空,赋值变量名为 messageInfo,日记输出的自定义内容
  • (\'\'|%{QUOTEDSTRING:throwable}):两个 ' 单引号之间的 | 表示,之间可为空,不为空就是 throwable 非常信息
启动 logstash:
  1. /usr/local/logstash-6.4.3/bin/logstash -f /usr/local/logstash-6.4.3/script/logstash-script.conf
  2. ## 如果测试时,想要控制台输出日志,输入以下命令
  3. /usr/local/logstash-6.4.3/bin/logstash -f /usr/local/logstash-6.4.3/script/logstash-script.conf --verbose --debug
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张春

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表