千千梦丶琪 发表于 2024-9-2 05:58:57

超详细,elk+kafka+filebeat日志收集,filebeat收集的不同日志生成不同的to

项目主机规划
主机名
外网IP
安装软件
所属集群
elstack
192.168.1.176
elasticsearch8.4

logstash
192.168.1.178
logstash8.4+Kibana8.4

kafka
192.168.1.179
kafka+zookeeper


1、环境预备

1.1 配置hosts文件

   $ sudo vim /etc/hosts
192.168.1.177 elk
192.168.1.178 logstash
192.168.1.179 kafka
推送hosts文件到 yu的其他主机上
   $ sudo scp /etc/hosts 192.168.1.178:/etc/hosts
$ sudo scp /etc/hosts 192.168.1.179:/etc/hosts
1.2、elk软件下载地点

    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.4.0-linux-x86_64.tar.gz
 wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.4.0-linux-x86_64.tar.gz
 wget https://artifacts.elastic.co/downloads/logstash/logstash-8.4.0-linux-x86_64.tar.gz
 wget https://artifacts.elastic.co/downloads/kibana/kibana-8.4.0-linux-x86_64.tar.gz
1.3、kafka+zookeeper下载地点

   wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.g
wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgzkafka_2.12-3.6.1https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz

 2、摆设elasticsearch

2.1、上传elasticsearch安装包

下载地点
https://i-blog.csdnimg.cn/blog_migrate/f71b468ab4a858fed74734e72ed75dab.png
2.2、解压软件包

   $  cd /tmp
$ sudo tar -zxvf elasticsearch-8.4.0-linux-x86_64.tar.gz -C /usr/local/
2.3、给文件授权

   $ sudo chown -R yu:yu /usr/local/elasticsearch-8.4.0/
2.4、修改linux系统打开文件最大数

   $ sudo vim /etc/sysctl.conf
vm.max_map_count=655360
$ sudo  sysctl -p /etc/sysctl.conf
https://i-blog.csdnimg.cn/blog_migrate/ffe60630f83eece6ac79dbd6836e2565.png
   $ sudo  vim /etc/security/limits.conf 
    #在文件最后添加以下行
#修改最大打开文件数
* soft nofile 65536
* hard nofile 65536
# 修改最大进程数
* soft nproc 65536
* hard nproc 65536
2.5、安装jdk

由于YU 3个软件都需要JDK只支持,所以只要安装Elasticsearch + Logstash + Kibana的服务器都要装JDK,
Elasticsearch安装包自带jdk,我们利用自带版本
   $  ll /usr/local/elasticsearch-8.4.0/
https://i-blog.csdnimg.cn/blog_migrate/a0f1e2fee9b30d8bc93c6514316d3514.png
推送jdk到logstash服务器上
   $  sudo scp -r  /usr/local/elasticsearch-8.4.0/jdk  logstash:/usr/local/
2.6、配置JDK环境变量
   $ sudo vim /etc/profile  #在文件最后加入一下行
export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME  PATH CLASSPATH
    $  source /etc/profile   #使环境变量生效
2.7、修改配置文件

   $ cd /usr/local/elasticsearch-8.4.0/
$ vim config/elasticsearch.yml
cluster.name: my-application
node.name: node-1
bootstrap.memory_lock: false  #防止启动检测报错
network.host: 0.0.0.0   #修改监听IP
http.port: 9200
cluster.initial_master_nodes: ["node-1"]
xpack.security.enabled: false   #关闭安全认证,否则利用http访问不到es,第一次启动没有这个参数,第二次启动再关闭
xpack.security.http.ssl:
  enabled: false

2.8、启动Elasticsearch

   $  ./bin/elasticsearch
https://i-blog.csdnimg.cn/blog_migrate/b5934cf9a15488ee392e5035a5c1c301.png
2.8、修改配置文件后再次启动,后台启动

   $ vim config/elasticsearch.yml
xpack.security.enabled: false   #关闭安全认证,否则利用http访问不到es,第一次启动没有这个参数,第二次启动再关闭
xpack.security.http.ssl:
  enabled: false
    $ ./bin/elasticsearch &
2.9、测试

网页 192.168.1.177:9200
https://i-blog.csdnimg.cn/blog_migrate/06fd18fd250327d43a0114b35a1ce867.png
2.10、加入开机自启动

   $ sudo  vim /etc/init.d/elasticsearch
#!/bin/sh
#chkconfig: 2345 80 05
#description: elasticsearch
#author: taft

export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME  PATH CLASSPATH
case "$1" in
start)
    su yu<<!
    cd /usr/local/elasticsearch-8.4.0/
    ./bin/elasticsearch -d
!
    echo "elasticsearch startup"
    ;;
stop)
    es_pid=`jps | grep Elasticsearch | awk '{print $1}'`
    kill -9 $es_pid
    echo "elasticsearch stopped"
    ;;
restart)
    es_pid=`jps | grep Elasticsearch | awk '{print $1}'`
    kill -9 $es_pid
    echo "elasticsearch stopped"
    su yu<<!
    cd /usr/local/elasticsearch-8.4.0/
    ./bin/elasticsearch -d
!
    echo "elasticsearch startup"
    ;;
*)
    echo "start|stop|restart"
    ;;
esac

exit $?
su yu<<! 切换为yu用户实行下面的下令,<<! 相当于<<EOF
注意:
以上脚本的用户为yu,如果你的用户不是,则需要替换
以上脚本的JAVA_HOME以及elasticsearch_home如果不同请替换
   $ sudo chmod +x /etc/init.d/elasticsearch #给脚本添加权限
$ sudo  chkconfig --add /etc/init.d/elasticsearch  #添加开机自启动
$ sudo  chkconfig elasticsearch on
2.11、测试脚本

   $  sudo systemctl restart elasticsearch
$  ps -aux | grep elasticsearch
https://i-blog.csdnimg.cn/blog_migrate/c3c3faa172eb26e87f06732e344135ec.png
3、安装Kibana

3.1、上传Kibana安装包

   $   cd /tmp
$ ls
https://i-blog.csdnimg.cn/blog_migrate/333337bea97bfc395e7fa63100caf868.png
3.2、解压软件包

   $  sudo  tar -zxvf kibana-8.4.0-linux-x86_64.tar.gz -C /usr/local/
3.3、给文件授权

   $ sudo chown -R yu:yu /usr/local/kibana-8.4.0
3.4、修改配置文件

   $ cd
$  sudo  vim /usr/local/kibana-8.4.0/config/kibana.yml
#开启以下选项并修改
server.port: 5601
server.host: "192.168.1.178"
elasticsearch.hosts: ["http://192.168.1.176:9200"]  #修改elasticsearch地点,多个服务器请用逗号隔开。
i18n.locale: "zh-CN"                          #修改语言为中文
注意: host:与IP地点中间有个空格不能删除,否则报错。
3.5、启动服务

   $  /usr/local/kibana-8.4.0/bin/kibana
https://i-blog.csdnimg.cn/blog_migrate/92e7f0c142c7c92b0a949fc9089fa115.png
3.6、kibana 设置后台启动

   $ nohup /usr/local/kibana-8.4.0/bin/kibana &
$ ps -ef | grep kibana
https://i-blog.csdnimg.cn/blog_migrate/380f7f7093e6a7d31afd171ac2789c03.png
3.7、网页测试

   192.168.1.177:5601
https://i-blog.csdnimg.cn/blog_migrate/b91c3f9a49a233ee26805dfd1aef1159.png
4、搭建kafka服务 在kafka服务器上

4.1、环境搭建

4.11、上传kafka、zookeeper、jdk安装包 到/tmp目录下

   $ ll  /tmp
4.1.2、解压jdk

   $ cd /tmp
$ sudo  tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/local/
4.1.3配置JDK环境变量

   $ sudo  vim /etc/profile 
    #在文件最后加入一下行
JAVA_HOME=/usr/local/jdk1.8.0_171
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
export PATH JAVA_HOME CLASSPATH
$ source /etc/profile
4.1.4、检察java环境

   $ java –version
4.2、安装zookeepe

4.2.1解压zookeeper安装包

   $ sudo tar -zxf apache-zookeeper-3.7.2-bin.tar.gz  -C /usr/local/
4.2.2、创建快照日志存放目录并赋予权限:

   $ sudo mkdir -p /data/zk/data
$  sudo chmod -R 777 /data/zk/data
4.2.3、 创建变乱日志存放目录:

   $ sudo mkdir -p /data/zk/datalog
$  sudo chmod -R 777  /data/zk/datalog
4.2.4、生成配置文件

   $  cd /usr/local/apache-zookeeper-3.7.2-bin/conf/
$ sudo cp zoo_sample.cfg  zoo.cfg
4.2.5、修改主配置文件zoo.cfg

   dataDir=/data/zk/data        #修改这一行为我们创建的目录
dataLogDir=/data/zk/datalog   #添加这一行
4.2.6、添加path环境变量

   $ sudo vim /etc/profile
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.7.2-bin
export PATH=$ZOOKEEPER_HOME/bin:$PATH

$  source /etc/profile
4.2.7、给文件授权

   $  sudo chown -R yu:yu /usr/local/apache-zookeeper-3.7.2-bin

4.2.8、启动Zookeeper

   $ zkServer.sh start
https://i-blog.csdnimg.cn/blog_migrate/2da17753c4ea0a130de79b838f7b1f64.png
4.2.9、验证

   $ zkServer.sh status
https://i-blog.csdnimg.cn/blog_migrate/dded6bbda122b7c93be547b4cec445cc.png
4.2.10、添加开机启动

在/lib/systemd/system/文件夹下创建一个启动脚本zookeeper.service
   $ sudo vim /lib/systemd/system/zookeeper.service

Description=Zookeeper service
After=network.target


Type=forking
Environment="JAVA_HOME=/usr/local/jdk1.8.0_171"
ExecStart=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh start
ExecStop=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh stop


WantedBy=multi-user.target
$  sudo systemctl daemon-reload
$ sudo  systemctl enable zookeeper
测试:
   $ jps
67608 Jps
67439 QuorumPeerMain
$ kill 67439
$ jps
67620 Jps
$ sudo systemctl start zookeeper
$ ps -ef |grep zookeeper
https://i-blog.csdnimg.cn/blog_migrate/4f9eadf42776a5a9414b36ae9180ba13.png
4.3.Kafka 单节点单Broker摆设

这里摆设的是kafka单节点,生产环境应该摆设kafka多节点集群。
4.3.1解压软件到指定目录

   $ cd /tmp
$ sudo  tar -zxvf kafka_2.12-3.6.1.tgz  -C /usr/local/
4.3.2 修改配置文件

   $ sudo  vim /usr/local/kafka_2.12-3.6.1/config/server.properties
    # broker的全局唯一编号,不能重复
broker.id=0
# 监听
listeners=PLAINTEXT://:9092  #开启此项
# 日志目录
log.dirs=/data/kafka/log      #修改日志目录
# 配置zookeeper的毗连(如果不是本机,需要该为ip或主机名)
zookeeper.connect=localhost:2181

auto.create.topics.enable=true  自动创建topic  添加这一条
4.3.3、创建日志目录并改权限

   $ sudo mkdir -p /data/kafka/log
$  sudo chmod -R 777 /data/kafka/log
4.3.4添加path环境变量

   $ sudo vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.12-3.6.1
export PATH=$KAFKA_HOME/bin:$PATH
    $  source /etc/profile
4.3.5、给文件授权

   $ sudo chown -R yu:yu /usr/local/kafka_2.12-3.6.1
4.3.6、启动kafka

后台启动kafka
   $ kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.6.1/config/server.properties
4.3.7添加开机自启

将kafka添加到开机自启服务
在/lib/systemd/system/文件夹下创建一个启动脚本kafka.service
   $ sudo  vim /lib/systemd/system/kafka.service

Description=Apache Kafka server (broker)
After=network.target  zookeeper.service


Type=simple
Environment="PATH=/usr/local/jdk1.8.0_171/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin"
ExecStart=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.6.1/config/server.properties
ExecStop=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-stop.sh
Restart=on-failure


WantedBy=multi-user.target
    $ sudo  systemctl daemon-reload
$ sudo systemctl enable kafka
测试
   $ jps
68931 Jps
68799 Kafka
$ kill 68799
$ jps
68954 Jps
$ sudo  systemctl start kafka
$ sudo  systemctl status  kafka
https://i-blog.csdnimg.cn/blog_migrate/30a5a945161d7cf5583a2864f5921ce0.png
5、 设置filebeat 在kafka服务器上,收集kafka服务器的日志

5.1、上传软件包

5.2、解压缩软件

   $ sudo  tar -zxvf filebeat-8.4.0-linux-x86_64.tar.gz -C /usr/local/
5.3、给文件授权

   $  sudo chown -R yu:yu /usr/local/filebeat-8.4.0-linux-x86_64
$  sudo chmod o+r  /var/log/*
5.4、修改配置文件

Filebeat 要求配置文件只能由其所有者写入,不允许组(group)和其他用户(others)也有写权限。
   $ sudo vim /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml

   filebeat.idle_timeout: 2s
filebeat.name: filebeat-shiper
filebeat.spool_size: 50000
filebeat.inputs:

- type: log
  paths:
    - /usr/local/kafka_2.12-3.6.1/logs/controller.log
  fields:
    topic: topic-kafka-controller-log
  enabled: true
  backoff: 1s
  backoff_factor: 2
  close_inactive: 1h
  encoding: plain
  harvester_buffer_size: 262144
  max_backoff: 10s
  max_bytes: 10485760
  scan_frequency: 10s
  tail_lines: true

- type: log
  paths:
    - /usr/local/kafka_2.12-3.6.1/logs/state-change.log
  fields:
    topic: topic-kafka-state-change-log
  enabled: true
  backoff: 1s
  backoff_factor: 2
  close_inactive: 1h
  encoding: plain
  harvester_buffer_size: 262144
  max_backoff: 10s
  max_bytes: 10485760
  scan_frequency: 10s
  tail_lines: true

- type: log
  paths:
    -  /usr/local/kafka_2.12-3.6.1/logs/server.log
  fields:
    topic: topic-kafka-run-log
  enabled: true
  backoff: 1s
  backoff_factor: 2
  close_inactive: 1h
  encoding: plain
  harvester_buffer_size: 262144
  max_backoff: 10s
  max_bytes: 10485760
  scan_frequency: 10s
  tail_lines: true

- type:  log
  paths:
    -  /usr/local/kafka_2.12-3.6.1/logs/kafka-request.log
  fields:
    topic: topic-kafka-request-log
  enabled: true
  backoff: 1s
  backoff_factor: 2
  close_inactive: 1h
  encoding: plain
  harvester_buffer_size: 262144
  max_backoff: 10s
  max_bytes: 10485760
  scan_frequency: 10s
  tail_lines: true

- type:  log
  paths:
    -  /var/log/messages
  fields:
    topic: topic-kafka-messages-log
  enabled: true
  backoff: 1s
  backoff_factor: 2
  close_inactive: 1h
  encoding: plain
  harvester_buffer_size: 262144
  max_backoff: 10s
  max_bytes: 10485760
  scan_frequency: 10s
  tail_lines: true

output.kafka:          
  bulk_flush_frequency: 0
  bulk_max_size: 2048
  codec.format:
    string: '%{}'
  compression: gzip
  compression_level: 4
  hosts:
  - 192.168.1.179:9092
  max_message_bytes: 10485760
  partition.round_robin:
    reachable_only: true
  required_acks: 1
  topic: '%{}'
setup.ilm.enabled: false
配置文件详解
   全局配置
filebeat.idle_timeout: 2s:
#Filebeat在没有数据发送时保持打开状态的超时时间。
filebeat.name: filebeat-apache:
#Filebeat实例的名称,这有助于在日志和监控中识别它。每个单独的filebeat实例的名称必须不同
filebeat.spool_size: 50000:
#Filebeat内部利用的队列大小,它决定了可以缓冲多少个变乱。
输入配置
type: log:
#指定输入范例为日志。
paths:
#界说Filebeat需要监控的日志文件的路径。这里指定的是/usr/local/apache/logs/access_log。
fields:
#允许你添加自界说字段到输出的变乱中。这里添加了一个topic字段,值为topic-apache-access-log。
enabled: true:
#启用此输入配置。
backoff: 1s和 close_inactive: 1h:
#在读取文件遇到错误时,Filebeat会等待backoff指定的时间,然后每次重试等待的时间会乘以backoff_factor。
close_inactive:
#如果文件在给定的时间段内没有新变乱,Filebeat将关闭文件句柄。
#句柄(Handle)在计算机科学中是一个用来标识对象或资源的引用或者指针。在操纵系统和很多软件应用程序中,句柄被用作一个内部标识符,它允许程序访问和操纵系统或应用程序中的对象或资源,而无需直接知道对象的内存地点或其他详细信息。
encoding: plain
#指定日志文件的编码方式。
harvester_buffer_size: 262144
#读取文件时利用的缓冲区大小。
max_backoff: 10s
#最大重试等待时间。
max_bytes: 10485760
#单个变乱的最大字节数。
scan_frequency: 10s
#Filebeat检查指定路径下的新文件的频率。
tail_lines: true
#如果设置为true,Filebeat将从文件末尾开始读取,这通常用于实时日志流。
输出配置
output.kafka:
#指定输出到Kafka的配置。
  bulk_flush_frequency: 0
#批量发送变乱到Kafka的频率,这里设置为0,意味着立即发送。
bulk_max_size:
#批量发送变乱的最大数量。
  codec.format:
    string: '%{}'
#这表现Filebeat将利用字符串格式来格式化输出到Kafka的消息,而且该字符串的内容将是日志变乱中的message字段。%{} 是一个字段引用,它告诉Filebeat从每个日志变乱中提取message字段的值,并将其作为Kafka消息的内容。
  compression: gzip 和  compression_level: 4
#启用压缩,并设置压缩级别。
hosts:
#Kafka服务器的地点和端口。
max_message_bytes: 10485760
#Kafka消息的最大字节数。
  partition.round_robin:
    reachable_only: true
#partition.round_robin 设置启用了轮询(round-robin)策略,即Filebeat将按顺序将数据分发到Kafka的可用分区。这种策略确保在没有特定分区选择逻辑的情况下,数据可以或许匀称地在所有分区之间分布。reachable_only: true 是partition.round_robin下的一个子选项,它指定Filebeat只选择可达(reachable)的分区来发送数据。
  required_acks: 1
#Kafka要求的确认数量,确保消息已经被写入。
  topic: '%{}'
#利用前面在fields中界说的topic字段的值作为Kafka的topic。
setup.ilm.enabled: false
#禁用索引生命周期管理(ILM)。

5.5、启动filebeat

   $  cd /usr/local/filebeat-8.4.0-linux-x86_64/ && ./filebeat -e -c filebeat.yml &
-e: 这个选项告诉Filebeat以“输出到标准错误”(Error output)的模式运行。这意味着所有的日志和错误信息会直接打印到控制台,而不是写入日志文件。这有助于在调试过程中快速检察问题。 测试时利用测试成功后取消这一个选项
-c filebeat.yml: 这个选项指定了Filebeat的配置文件路径。在这个例子中,配置文件是filebeat.yml,它应该包罗Filebeat如何收集、处理和转发日志的指令。
5.6、再开一个终端检察kafka主题是否创建

   $ kafka-topics.sh --bootstrap-server kafka:9092 --list
topic-kafka-controller-log
topic-kafka-run-log
topic-kafka-state-change-log
表现kafka吸收到filebeat收集到的日志
5.7设置开机自启动

   $ sudo  echo "nohup /usr/local/filebeat-8.4.0-linux-x86_64/filebeat -e -c /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml &" >> /etc/rc.d/rc.local
6、搭建logstash服务器

6.1、上传软件包


6.2、解压缩软件

   $ sudo  tar -zxvf logstash-8.4.0-linux-x86_64.tar.gz -C /usr/local/
6.3、给文件授权

   $ sudo chown -R yu:yu /usr/local/logstash-8.4.0/
6.4、做软毗连

   $ sudo  ln -s /usr/local/logstash-8.4.0/bin/* /usr/local/bin/
6.5、生成配置文件

   input {
  kafka {
    bootstrap_servers => "192.168.1.179:9092"
    topics_pattern => "topic.*"
    consumer_threads => 5
    decorate_events => true
    codec => plain { charset => "UTF-8" }
    auto_offset_reset => "latest"
    group_id => "logstash1"
  }
}

filter {
  # 如果需要转换时间戳为本地时间,请取消注释并正确编写Ruby代码
  # 注意:下面的代码可能需要调解,确保它符合您的详细需求
  #ruby {
  #  code => "event.set('@timestamp', Time.parse(event.get('time')).localtime)"
  #}

  mutate {
    remove_field => ["beat"]
  }

  grok {
    match => {
      "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)"
    }
 overwrite => ["message"]
    tag_on_failure => ["_grokparsefailure"]
  }
  if "[@metadata]" {
    mutate {
      add_field => { "topic_name" => "%{[@metadata]}" }
    }
  }
}

output {
  # 确保topic_name字段存在才发送到Elasticsearch
  if {
    elasticsearch {
      hosts => ["192.168.1.177:9200"]
      # 利用topic_name字段来构建索引名
      index => "%{topic_name}-%{+YYYY.MM.dd}"
    }
  }

  stdout {
    codec => rubydebug
  }
}
配置文件详解
   Input部分
input { 
  kafka { 
    bootstrap_servers => "192.168.1.179:9092" 
#Kafka集群的地点和端口。
    topics_pattern => "topic.*" 
#订阅所有以"topic."开头的Kafka主题。
    consumer_threads => 5 
#消耗者线程数,用于并行读取Kafka消息。
    decorate_events => true 
#的配置选项用于控制是否在Logstash变乱中添加与Kafka相干的元数据字段。当设置为true时,Logstash会向每个变乱添加一些额外的字段,这些字段包罗了关于Kafka消息的一些信息。
    codec => plain { charset => "UTF-8" } 
#告诉Logstash以UTF-8编码的方式从Kafka消息中读取纯文本数据,并将其转换为Logstash变乱以供后续处理。如许,无论Kafka中的消息包罗何种字符,Logstash都可以或许正确解析并处理它们。codec => json 用于指定输入插件利用JSON格式来解码数据。这通常用于处理存储在Kafka等消息队列中的JSON格式的消息。
    auto_offset_reset => "latest" 
#它决定了当Logstash开始消耗Kafka主题时,从哪里开始读取消息。
auto_offset_reset 参数有以下几个可能的值:
"earliest": 从最早的记载开始读取,即从头开始消耗。
"latest": 从最新的记载开始读取,即只消耗在Logstash开始消耗之后新产生的消息。
"none": 如果找不到初始偏移量,就抛出非常。

    group_id => "logstash1" 
  } 
}
Filter部分
filter { 
  # ... Ruby代码块(已注释) 
  mutate { 
    remove_field => ["beat"] 
#mutate:移除beat字段
  } 
  grok { 
    match => { 
      "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)" 
#利用正则表达式解析message字段,并提取出时间、日志级别、线程、类名、行号和消息内容等字段。
    } 
    overwrite => ["message"] 
#将原始message字段替换为grok解析后的内容。
    tag_on_failure => ["_grokparsefailure"] 
#如果grok解析失败,则给变乱添加一个标签_grokparsefailure
  } 
  if "[@metadata]" { 
    mutate { 
      add_field => { "topic_name" => "%{[@metadata]}" } 
#if条件判定:如果变乱中存在[@metadata]字段,则添加一个名为topic_name的新字段,其值为Kafka主题名。
    } 
  } 
}
Output部分
output { 
  if { 
#只有当变乱中存在topic_name字段时,才实行下面的输出操纵。
    elasticsearch { 
      hosts => ["192.168.1.177:9200"] 
      index => "%{topic_name}-%{+YYYY.MM.dd}" 
#利用topic_name和日期来构建索引名。
    } 
  } 

  stdout { 
#输出到标准输出(通常用于调试)。
    codec => rubydebug 
#利用rubydebug编码,以易读的格式显示变乱内容
  } 
}

6.6、启动logstash

   $ logstash -f /usr/local/logstash-8.4.0/config/logstash.conf
https://i-blog.csdnimg.cn/blog_migrate/277855d6cd51bfdf12a998c1ae2c1396.png
6.7后台启动

   $ sudo nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &
6.8、开机自启动

   $ sudo echo "source /etc/profile" >> /etc/rc.local #让开机加载java环境
$ sudo echo "nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &" >> /etc/rc.local
$ sudo chmod +x /etc/rc.local
7、登录kibana网页检察索引生成情况生成数据视图

https://i-blog.csdnimg.cn/blog_migrate/0ca7628b5f1944f12cdbc9d5847e2d45.png

8、Zabbix与ELK整合实现对安整日志数据的实时监控告警

     有些时候,我们希望在收集日志的时候,可以或许将日志中的非常信息(警告、错误、失败等信息)实时的提取出来,因为日志中的非常信息意味着操纵系统、应用程序可能存在故障,如果能将日志中的故障信息实时的告知运维人员,那么运维就可以第一时间去进行故障排查和处理,进而也就可以制止很多故障的发生。
     ELK(更确切的说应该是logstash)可以实时的读取日志的内容,而且还可以过滤日志信息,通过ELK的读取和过滤功能,就可以将日志中的一些非常关键字(error、failed、OutOff、Warning)过滤出来,然后通过logstash的zabbix插件将这个错误日志信息发送给zabbix,那么zabbix在吸收到这个数据后,结合自身的机制,然后发起告警动作,如许就实现了日志非常zabbix实时告警的功能了。
8.1、Logstash与zabbix插件的利用

    Logstash支持多种输出介质,比如syslog、HTTP、TCP、elasticsearch、kafka等,而有时候我们想将收集到的日志中一些错误信息输出,并告警时,就用到了logstash-output-zabbix这个插件,此插件可以将Logstash与zabbix进行整合,也就是将Logstash收集到的数据进行过滤,将有错误标识的日志输出到zabbix中,最后通过zabbix的告警机制进行触发、告警。

   logstash-output-zabbix是一个社区维护的插件,它默认没有在Logstash中安装,但是安装起来也很轻易,
8.1.1下载阿里源

   $ sudo wget   -O   /etc/yum.repos.d/CentOS-Base.repo    http://mirrors.aliyun.com/repo/Centos-7.repo

8.1.2直接在logstash中运行如下下令即可:

   $ logstash-plugin install logstash-output-zabbix
    Using bundled JDK: /usr/local/logstash-8.4.0/jdk
Validating logstash-output-zabbix
Resolving mixin dependencies
Installing logstash-output-zabbix
Installation successful

   列出现在已经安装的插件
$ logstash-plugin list
    列出已安装的插件及版本信息
$ logstash-plugin list --verbose
8.2、logstash-output-zabbix插件的利用

logstash-output-zabbix安装好之后,就可以在logstash配置文件中利用了,下面是一个logstash-output-zabbix利用的例子:
   zabbix {
        zabbix_host =>"[@metadata]"
        zabbix_key =>"[@metadata]"
        zabbix_server_host =>"x.x.x.x"
        zabbix_server_port =>"xxxx"
        zabbix_value ="xxxx"
        }
其中:
zabbix_host:表现Zabbix主机名字段名称, 可以是单独的一个字段, 也可以是 @metadata 字段的子字段, 是必须的设置,没有默认值。

zabbix_key:表现Zabbix项目键的值,也就是zabbix中的item,此字段可以是单独的一个字段, 也可以是 @metadata 字段的子字段,没有默认值。

zabbix_server_host:表现Zabbix服务器的IP或可解析主机名,默认值是 “localhost”,需要设置为zabbix server服务器所在的地点。

zabbix_server_port:表现Zabbix服务器开启的监听端口,默认值是10051。

zabbix_value:表现要发送给zabbix item监控项的值对应的字段名称,默认值是 “message”,也就是将”message”字段的内容发送给上面zabbix_key界说的zabbix item监控项,固然也可以指定一个详细的字段内容发送给zabbix item监控项。


8.3、将logstash与zabbix进行整合


这里我们以logstash收集日志,然后对日志进行读取,最后选择关键字进行过滤并调用zabbix告警的流程,来看看如何配置logstash实现zabbix告警。

先阐明一下我们的应用需求:通过对系统日志文件的监控,然后去过滤日志信息中的一些关键字,比方ERROR、Failed、WARNING等,将日志中这些信息过滤出来,然后发送到zabbix上,最后借助zabbix的报警功能实现对系统日志中有上述关键字的告警。

对于过滤关键字,进行告警,不同的业务系统,可能关键字不尽相同,比方对http系统,可能需要过滤500、403、503等这些错误码,对于java相干的系统,可能需要过滤OutOfMemoryError、PermGen、Java heap等关键字。在某些业务系统的日志输出中,可能另有一些自界说的错误信息,那么这些也需要作为过滤关键字来利用。

8.3.1配置logstash变乱配置文件

   input { 
  kafka { 
    bootstrap_servers => "192.168.1.179:9092" 
    topics_pattern => "topic.*" 
    consumer_threads => 5 
    decorate_events => true 
    codec => plain { charset => "UTF-8" } 
    auto_offset_reset => "latest" 
    group_id => "logstash1" 
  } 


filter { 
  # 移除不需要的字段 
  mutate { 
    remove_field => ["beat"] 
  } 

  # 利用grok来解析日志 
  grok { 
    match => { 
      "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)" 
    } 
    overwrite => ["message"] 
    tag_on_failure => ["_grokparsefailure"] 
  } 

  # 添加topic_name字段 
  if "[@metadata]" { 
    mutate { 
      add_field => { "topic_name" => "%{[@metadata]}" } 
    } 
  } 

  # 为Zabbix预备特定的字段 
  if { 
    mutate { 
      add_field => { "zabbix_topic" => "%{topic_name}" } 
      add_field => { "zabbix_level" => "%{level}" } 
      add_field => { "zabbix_message" => "%{msg}" } 
      # 如果需要,继续添加其他Zabbix需要的字段 
    } 
  } 


output { 
  # 发送给Elasticsearch 
  if { 
    elasticsearch { 
      hosts => ["192.168.1.177:9200"] 
      index => "%{topic_name}-%{+YYYY.MM.dd}" 
    } 
  } 

  # 发送给Zabbix 
  if and and { 
    zabbix { 
      zabbix_host => "your_zabbix_server_host" 
      zabbix_port => 10051 
      zabbix_sender_host => "logstash_host_name" 

      # 界说item key和对应的字段 
      item_key => { 
        "topic" => "%{zabbix_topic}" 
        "level" => "%{zabbix_level}" 
        "message" => "%{zabbix_message}" 
        # ... 其他键值对 ... 
      } 

      # 其他可选参数,比如zabbix_server等 
    } 
  } 
}


                        }
可以在一个服务器上同时启动多个logstash进程。但是,当同时启动多个logstash进程时,需要指定不同的path.data,否则会报错。比方,利用以下下令启动两个logstash进程:
   ./logstash -f /etc/logstash/config.d/xxx1.conf --path.data=/etc/logstash/data1/ & 
./logstash -f /etc/logstash/config.d/xxx2.conf --path.data=/etc/logstash/data2/ &




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 超详细,elk+kafka+filebeat日志收集,filebeat收集的不同日志生成不同的to