超详细,elk+kafka+filebeat日志收集,filebeat收集的不同日志生成不同的to ...

千千梦丶琪  金牌会员 | 2024-9-2 05:58:57 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 883|帖子 883|积分 2649

项目主机规划
主机名
外网IP
安装软件
所属集群
elstack
192.168.1.176
elasticsearch8.4

logstash
192.168.1.178
logstash8.4+Kibana8.4

kafka
192.168.1.179
kafka+zookeeper


1、环境预备

1.1 配置hosts文件

   [yu@elk~]$ sudo vim /etc/hosts
  192.168.1.177 elk
  192.168.1.178 logstash
  192.168.1.179 kafka
  推送hosts文件到 yu的其他主机上
   [yu@elk~]$ sudo scp /etc/hosts 192.168.1.178:/etc/hosts
  [yu@elk~]$ sudo scp /etc/hosts 192.168.1.179:/etc/hosts
  1.2、elk软件下载地点

    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.4.0-linux-x86_64.tar.gz
   wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.4.0-linux-x86_64.tar.gz
   wget https://artifacts.elastic.co/downloads/logstash/logstash-8.4.0-linux-x86_64.tar.gz
   wget https://artifacts.elastic.co/downloads/kibana/kibana-8.4.0-linux-x86_64.tar.gz
  1.3、kafka+zookeeper下载地点

   wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.g
  wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgzkafka_2.12-3.6.1https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
  
 2、摆设elasticsearch

2.1、上传elasticsearch安装包

下载地点

2.2、解压软件包

   [yu@elk~]$  cd /tmp
  [yu@elk tmp]$ sudo tar -zxvf elasticsearch-8.4.0-linux-x86_64.tar.gz -C /usr/local/
  2.3、给文件授权

   [yu@elk~]$ sudo chown -R yu:yu /usr/local/elasticsearch-8.4.0/
  2.4、修改linux系统打开文件最大数

   [yu@elk~]$ sudo vim /etc/sysctl.conf
  vm.max_map_count=655360
  [yu@elk~]$ sudo  sysctl -p /etc/sysctl.conf
  

   [yu@elk~]$ sudo  vim /etc/security/limits.conf 
    #在文件最后添加以下行
  #修改最大打开文件数
  * soft nofile 65536
  * hard nofile 65536
  # 修改最大进程数
  * soft nproc 65536
  * hard nproc 65536
  2.5、安装jdk

由于YU 3个软件都需要JDK只支持,所以只要安装Elasticsearch + Logstash + Kibana的服务器都要装JDK,
Elasticsearch安装包自带jdk,我们利用自带版本
   [yu@elk~]$  ll /usr/local/elasticsearch-8.4.0/
  

推送jdk到logstash服务器上
   [yu@elk~]$  sudo scp -r  /usr/local/elasticsearch-8.4.0/jdk  logstash:/usr/local/
  2.6、配置JDK环境变量
   [yu@elk~]$ sudo vim /etc/profile  #在文件最后加入一下行
  export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk
  export PATH=$PATHJAVA_HOME/bin
  export CLASSPATH=$JAVA_HOME/lib/dt.jarJAVA_HOME/lib/tools.jar
  export JAVA_HOME  PATH CLASSPATH
    [yu@elk~]$  source /etc/profile   #使环境变量生效
  2.7、修改配置文件

   [yu@elk~]$ cd /usr/local/elasticsearch-8.4.0/
  [yu@elkelasticsearch-8.4.0]$ vim config/elasticsearch.yml
  cluster.name: my-application
  node.name: node-1
  bootstrap.memory_lock: false  #防止启动检测报错
  network.host: 0.0.0.0   #修改监听IP
  http.port: 9200
  cluster.initial_master_nodes: ["node-1"]
  xpack.security.enabled: false   #关闭安全认证,否则利用http访问不到es,第一次启动没有这个参数,第二次启动再关闭
  xpack.security.http.ssl:
    enabled: false
  
2.8、启动Elasticsearch

   [yu@elkelasticsearch-8.4.0]$  ./bin/elasticsearch
  

2.8、修改配置文件后再次启动,后台启动

   [yu@elk elasticsearch-8.4.0]$ vim config/elasticsearch.yml
  xpack.security.enabled: false   #关闭安全认证,否则利用http访问不到es,第一次启动没有这个参数,第二次启动再关闭
  xpack.security.http.ssl:
    enabled: false
    [yu@elk elasticsearch-8.4.0]$ ./bin/elasticsearch &
  2.9、测试

网页 192.168.1.177:9200

2.10、加入开机自启动

   [yu@elk elasticsearch-8.4.0]$ sudo  vim /etc/init.d/elasticsearch
  #!/bin/sh
  #chkconfig: 2345 80 05
  #description: elasticsearch
  #author: taft
  
  export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk
  export PATH=$PATHJAVA_HOME/bin
  export CLASSPATH=$JAVA_HOME/lib/dt.jarJAVA_HOME/lib/tools.jar
  export JAVA_HOME  PATH CLASSPATH
  case "$1" in
  start)
      su yu<<!
      cd /usr/local/elasticsearch-8.4.0/
      ./bin/elasticsearch -d
  !
      echo "elasticsearch startup"
      ;;
  stop)
      es_pid=`jps | grep Elasticsearch | awk '{print $1}'`
      kill -9 $es_pid
      echo "elasticsearch stopped"
      ;;
  restart)
      es_pid=`jps | grep Elasticsearch | awk '{print $1}'`
      kill -9 $es_pid
      echo "elasticsearch stopped"
      su yu<<!
      cd /usr/local/elasticsearch-8.4.0/
      ./bin/elasticsearch -d
  !
      echo "elasticsearch startup"
      ;;
  *)
      echo "start|stop|restart"
      ;;
  esac
  
  exit $?
  su yu<<! 切换为yu用户实行下面的下令,<<! 相当于<<EOF
注意:
以上脚本的用户为yu,如果你的用户不是,则需要替换
以上脚本的JAVA_HOME以及elasticsearch_home如果不同请替换
   [yu@elk~]$ sudo chmod +x /etc/init.d/elasticsearch #给脚本添加权限
  [yu@elk~]$ sudo  chkconfig --add /etc/init.d/elasticsearch  #添加开机自启动
  [yu@elk~]$ sudo  chkconfig elasticsearch on
  2.11、测试脚本

   [yu@elk~]$  sudo systemctl restart elasticsearch
  [yu@elk~]$  ps -aux | grep elasticsearch
  

3、安装Kibana

3.1、上传Kibana安装包

   [yu@logstash ~]$   cd /tmp
  [yu@logstash ~]$ ls
  

3.2、解压软件包

   [yu@logstash tmp]$  sudo  tar -zxvf kibana-8.4.0-linux-x86_64.tar.gz -C /usr/local/
  3.3、给文件授权

   [yu@logstash ~]$ sudo chown -R yu:yu /usr/local/kibana-8.4.0
  3.4、修改配置文件

   [yu@logstash tmp]$ cd
  [yu@logstash ~]$  sudo  vim /usr/local/kibana-8.4.0/config/kibana.yml
  #开启以下选项并修改
  server.port: 5601
  server.host: "192.168.1.178"
  elasticsearch.hosts: ["http://192.168.1.176:9200"]  #修改elasticsearch地点,多个服务器请用逗号隔开。
  i18n.locale: "zh-CN"                          #修改语言为中文
  注意: host:与IP地点中间有个空格不能删除,否则报错。
  3.5、启动服务

   [yu@logstash ~]$  /usr/local/kibana-8.4.0/bin/kibana
  

3.6、kibana 设置后台启动

   [yu@logstash ~]$ nohup /usr/local/kibana-8.4.0/bin/kibana &
  [yu@logstash ~]$ ps -ef | grep kibana
  

3.7、网页测试

   192.168.1.177:5601
  

4、搭建kafka服务 在kafka服务器上

4.1、环境搭建

4.11、上传kafka、zookeeper、jdk安装包 到/tmp目录下

   [yu@kafka ~]$ ll  /tmp
  4.1.2、解压jdk

   [yu@kafka ~]$ cd /tmp
  [yu@kafka tmp]$ sudo  tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/local/
  4.1.3配置JDK环境变量

   [yu@kafka tmp]$ sudo  vim /etc/profile 
    #在文件最后加入一下行
  JAVA_HOME=/usr/local/jdk1.8.0_171
  PATH=$JAVA_HOME/binPATH
  CLASSPATH=$JAVA_HOME/jre/lib/extJAVA_HOME/lib/tools.jar
  export PATH JAVA_HOME CLASSPATH
  [yu@kafka tmp]$ source /etc/profile
  4.1.4、检察java环境

   [yu@kafka tmp]$ java –version
  4.2、安装zookeepe

4.2.1解压zookeeper安装包

   [yu@kafka tmp]$ sudo tar -zxf apache-zookeeper-3.7.2-bin.tar.gz  -C /usr/local/
  4.2.2、创建快照日志存放目录并赋予权限:

   [yu@kafka tmp]$ sudo mkdir -p /data/zk/data
  [yu@kafka tmp]$  sudo chmod -R 777 /data/zk/data
  4.2.3、 创建变乱日志存放目录:

   [yu@kafka tmp]$ sudo mkdir -p /data/zk/datalog
  [yu@kafka tmp]$  sudo chmod -R 777  /data/zk/datalog
  4.2.4、生成配置文件

   [yu@kafka tmp]$  cd /usr/local/apache-zookeeper-3.7.2-bin/conf/
  [yu@kafka conf]$ sudo cp zoo_sample.cfg  zoo.cfg
  4.2.5、修改主配置文件zoo.cfg

   dataDir=/data/zk/data        #修改这一行为我们创建的目录
  dataLogDir=/data/zk/datalog   #添加这一行
  4.2.6、添加path环境变量

   [yu@kafka conf]$ sudo vim /etc/profile
  export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.7.2-bin
  export PATH=$ZOOKEEPER_HOME/binPATH
  
  [yu@kafka conf]$  source /etc/profile
  4.2.7、给文件授权

   [yu@kafka conf]$  sudo chown -R yu:yu /usr/local/apache-zookeeper-3.7.2-bin
  
  4.2.8、启动Zookeeper

   [yu@kafka conf]$ zkServer.sh start
  

4.2.9、验证

   [yu@kafka conf]$ zkServer.sh status
  

4.2.10、添加开机启动

在/lib/systemd/system/文件夹下创建一个启动脚本zookeeper.service
   [yu@kafka conf]$ sudo vim /lib/systemd/system/zookeeper.service
  [Unit]
  Description=Zookeeper service
  After=network.target
  
  [Service]
  Type=forking
  Environment="JAVA_HOME=/usr/local/jdk1.8.0_171"
  ExecStart=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh start
  ExecStop=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh stop
  
  [Install]
  WantedBy=multi-user.target
  [yu@kafka conf]$  sudo systemctl daemon-reload
  [yu@kafka conf]$ sudo  systemctl enable zookeeper
  测试:
   [yu@kafka conf]$ jps
  67608 Jps
  67439 QuorumPeerMain
  [yu@kafka conf]$ kill 67439
  [yu@kafka conf]$ jps
  67620 Jps
  [yu@kafka conf]$ sudo systemctl start zookeeper
  [yu@kafka conf]$ ps -ef |grep zookeeper
  

4.3.Kafka 单节点单Broker摆设

这里摆设的是kafka单节点,生产环境应该摆设kafka多节点集群。
4.3.1解压软件到指定目录

   [yu@kafka conf]$ cd /tmp
  [yu@kafka tmp]$ sudo  tar -zxvf kafka_2.12-3.6.1.tgz  -C /usr/local/
  4.3.2 修改配置文件

   [yu@kafka tmp]$ sudo  vim /usr/local/kafka_2.12-3.6.1/config/server.properties
    # broker的全局唯一编号,不能重复
  broker.id=0
  # 监听
  listeners=PLAINTEXT://:9092  #开启此项
  # 日志目录
  log.dirs=/data/kafka/log      #修改日志目录
  # 配置zookeeper的毗连(如果不是本机,需要该为ip或主机名)
  zookeeper.connect=localhost:2181
  
  auto.create.topics.enable=true  自动创建topic  添加这一条
  4.3.3、创建日志目录并改权限

   [yu@kafka tmp]$ sudo mkdir -p /data/kafka/log
  [yu@kafka tmp]$  sudo chmod -R 777 /data/kafka/log
  4.3.4添加path环境变量

   [yu@kafka tmp]$ sudo vim /etc/profile
  export KAFKA_HOME=/usr/local/kafka_2.12-3.6.1
  export PATH=$KAFKA_HOME/binPATH
    [yu@kafka tmp]$  source /etc/profile
  4.3.5、给文件授权

   [yu@kafka tmp]$ sudo chown -R yu:yu /usr/local/kafka_2.12-3.6.1
  4.3.6、启动kafka

后台启动kafka
   [yu@kafka tmp]$ kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.6.1/config/server.properties
  4.3.7添加开机自启

将kafka添加到开机自启服务
在/lib/systemd/system/文件夹下创建一个启动脚本kafka.service
   [yu@kafka tmp]$ sudo  vim /lib/systemd/system/kafka.service
  [Unit]
  Description=Apache Kafka server (broker)
  After=network.target  zookeeper.service
  
  [Service]
  Type=simple
  Environment="ATH=/usr/local/jdk1.8.0_171/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin"
  ExecStart=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.6.1/config/server.properties
  ExecStop=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-stop.sh
  Restart=on-failure
  
  [Install]
  WantedBy=multi-user.target
    [yu@kafka tmp]$ sudo  systemctl daemon-reload
  [yu@kafka tmp]$ sudo systemctl enable kafka
  测试
   [yu@kafka tmp]$ jps
  68931 Jps
  68799 Kafka
  [yu@kafka tmp]$ kill 68799
  [yu@kafka tmp]$ jps
  68954 Jps
  [yu@kafka tmp]$ sudo  systemctl start kafka
  [yu@kafka tmp]$ sudo  systemctl status  kafka
  

5、 设置filebeat 在kafka服务器上,收集kafka服务器的日志

5.1、上传软件包

5.2、解压缩软件

   [yu@kafka tmp]$ sudo  tar -zxvf filebeat-8.4.0-linux-x86_64.tar.gz -C /usr/local/
  5.3、给文件授权

   [yu@kafka tmp]$  sudo chown -R yu:yu /usr/local/filebeat-8.4.0-linux-x86_64
  [yu@ kafka tmp]$  sudo chmod o+r  /var/log/*
  5.4、修改配置文件

Filebeat 要求配置文件只能由其所有者写入,不允许组(group)和其他用户(others)也有写权限。
   [yu@kafka tmp]$ sudo vim /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml
  
   filebeat.idle_timeout: 2s
  filebeat.name: filebeat-shiper
  filebeat.spool_size: 50000
  filebeat.inputs:
  
  - type: log
    paths:
      - /usr/local/kafka_2.12-3.6.1/logs/controller.log
    fields:
      topic: topic-kafka-controller-log
    enabled: true
    backoff: 1s
    backoff_factor: 2
    close_inactive: 1h
    encoding: plain
    harvester_buffer_size: 262144
    max_backoff: 10s
    max_bytes: 10485760
    scan_frequency: 10s
    tail_lines: true
  
  - type: log
    paths:
      - /usr/local/kafka_2.12-3.6.1/logs/state-change.log
    fields:
      topic: topic-kafka-state-change-log
    enabled: true
    backoff: 1s
    backoff_factor: 2
    close_inactive: 1h
    encoding: plain
    harvester_buffer_size: 262144
    max_backoff: 10s
    max_bytes: 10485760
    scan_frequency: 10s
    tail_lines: true
  
  - type: log
    paths:
      -  /usr/local/kafka_2.12-3.6.1/logs/server.log
    fields:
      topic: topic-kafka-run-log
    enabled: true
    backoff: 1s
    backoff_factor: 2
    close_inactive: 1h
    encoding: plain
    harvester_buffer_size: 262144
    max_backoff: 10s
    max_bytes: 10485760
    scan_frequency: 10s
    tail_lines: true
  
  - type:  log
    paths:
      -  /usr/local/kafka_2.12-3.6.1/logs/kafka-request.log
    fields:
      topic: topic-kafka-request-log
    enabled: true
    backoff: 1s
    backoff_factor: 2
    close_inactive: 1h
    encoding: plain
    harvester_buffer_size: 262144
    max_backoff: 10s
    max_bytes: 10485760
    scan_frequency: 10s
    tail_lines: true
  
  - type:  log
    paths:
      -  /var/log/messages
    fields:
      topic: topic-kafka-messages-log
    enabled: true
    backoff: 1s
    backoff_factor: 2
    close_inactive: 1h
    encoding: plain
    harvester_buffer_size: 262144
    max_backoff: 10s
    max_bytes: 10485760
    scan_frequency: 10s
    tail_lines: true
  
  output.kafka:          
    bulk_flush_frequency: 0
    bulk_max_size: 2048
    codec.format:
      string: '%{[message]}'
    compression: gzip
    compression_level: 4
    hosts:
    - 192.168.1.179:9092
    max_message_bytes: 10485760
    partition.round_robin:
      reachable_only: true
    required_acks: 1
    topic: '%{[fields.topic]}'
  setup.ilm.enabled: false
  配置文件详解
   全局配置
  filebeat.idle_timeout: 2s:
  #Filebeat在没有数据发送时保持打开状态的超时时间。
  filebeat.name: filebeat-apache:
  #Filebeat实例的名称,这有助于在日志和监控中识别它。每个单独的filebeat实例的名称必须不同
  filebeat.spool_size: 50000:
  #Filebeat内部利用的队列大小,它决定了可以缓冲多少个变乱。
  输入配置
  type: log:
  #指定输入范例为日志。
  paths:
  #界说Filebeat需要监控的日志文件的路径。这里指定的是/usr/local/apache/logs/access_log。
  fields:
  #允许你添加自界说字段到输出的变乱中。这里添加了一个topic字段,值为topic-apache-access-log。
  enabled: true:
  #启用此输入配置。
  backoff: 1s和 close_inactive: 1h:
  #在读取文件遇到错误时,Filebeat会等待backoff指定的时间,然后每次重试等待的时间会乘以backoff_factor。
  close_inactive:
  #如果文件在给定的时间段内没有新变乱,Filebeat将关闭文件句柄。
  #句柄(Handle)在计算机科学中是一个用来标识对象或资源的引用或者指针。在操纵系统和很多软件应用程序中,句柄被用作一个内部标识符,它允许程序访问和操纵系统或应用程序中的对象或资源,而无需直接知道对象的内存地点或其他详细信息。
  encoding: plain
  #指定日志文件的编码方式。
  harvester_buffer_size: 262144
  #读取文件时利用的缓冲区大小。
  max_backoff: 10s
  #最大重试等待时间。
  max_bytes: 10485760
  #单个变乱的最大字节数。
  scan_frequency: 10s
  #Filebeat检查指定路径下的新文件的频率。
  tail_lines: true
  #如果设置为true,Filebeat将从文件末尾开始读取,这通常用于实时日志流。
  输出配置
  output.kafka:
  #指定输出到Kafka的配置。
    bulk_flush_frequency: 0
  #批量发送变乱到Kafka的频率,这里设置为0,意味着立即发送。
  bulk_max_size:
  #批量发送变乱的最大数量。
    codec.format:
      string: '%{[message]}'
  #这表现Filebeat将利用字符串格式来格式化输出到Kafka的消息,而且该字符串的内容将是日志变乱中的message字段。%{[message]} 是一个字段引用,它告诉Filebeat从每个日志变乱中提取message字段的值,并将其作为Kafka消息的内容。
    compression: gzip 和  compression_level: 4
  #启用压缩,并设置压缩级别。
  hosts:
  #Kafka服务器的地点和端口。
  max_message_bytes: 10485760
  #Kafka消息的最大字节数。
    partition.round_robin:
      reachable_only: true
  #partition.round_robin 设置启用了轮询(round-robin)策略,即Filebeat将按顺序将数据分发到Kafka的可用分区。这种策略确保在没有特定分区选择逻辑的情况下,数据可以或许匀称地在所有分区之间分布。reachable_only: true 是partition.round_robin下的一个子选项,它指定Filebeat只选择可达(reachable)的分区来发送数据。
    required_acks: 1
  #Kafka要求的确认数量,确保消息已经被写入。
    topic: '%{[fields.topic]}'
  #利用前面在fields中界说的topic字段的值作为Kafka的topic。
  setup.ilm.enabled: false
  #禁用索引生命周期管理(ILM)。
  
5.5、启动filebeat

   [yu@kafka tmp]$  cd /usr/local/filebeat-8.4.0-linux-x86_64/ && ./filebeat -e -c filebeat.yml &
  -e: 这个选项告诉Filebeat以“输出到标准错误”(Error output)的模式运行。这意味着所有的日志和错误信息会直接打印到控制台,而不是写入日志文件。这有助于在调试过程中快速检察问题。 测试时利用测试成功后取消这一个选项
-c filebeat.yml: 这个选项指定了Filebeat的配置文件路径。在这个例子中,配置文件是filebeat.yml,它应该包罗Filebeat如何收集、处理和转发日志的指令。
5.6、再开一个终端检察kafka主题是否创建

   [yu@kafka logs]$ kafka-topics.sh --bootstrap-server kafka:9092 --list
  topic-kafka-controller-log
  topic-kafka-run-log
  topic-kafka-state-change-log
  表现kafka吸收到filebeat收集到的日志
5.7设置开机自启动

   [yu@kafka logs]$ sudo  echo "nohup /usr/local/filebeat-8.4.0-linux-x86_64/filebeat -e -c /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml &" >> /etc/rc.d/rc.local
  6、搭建logstash服务器

6.1、上传软件包


6.2、解压缩软件

   [yu@logstash tmp]$ sudo  tar -zxvf logstash-8.4.0-linux-x86_64.tar.gz -C /usr/local/
  6.3、给文件授权

   [yu@logstash tmp]$ sudo chown -R yu:yu /usr/local/logstash-8.4.0/
  6.4、做软毗连

   [yu@logstash tmp]$ sudo  ln -s /usr/local/logstash-8.4.0/bin/* /usr/local/bin/
  6.5、生成配置文件

   input {
    kafka {
      bootstrap_servers => "192.168.1.179:9092"
      topics_pattern => "topic.*"
      consumer_threads => 5
      decorate_events => true
      codec => plain { charset => "UTF-8" }
      auto_offset_reset => "latest"
      group_id => "logstash1"
    }
  }
  
  filter {
    # 如果需要转换时间戳为本地时间,请取消注释并正确编写Ruby代码
    # 注意:下面的代码可能需要调解,确保它符合您的详细需求
    #ruby {
    #  code => "event.set('@timestamp', Time.parse(event.get('time')).localtime)"
    #}
  
    mutate {
      remove_field => ["beat"]
    }
  
    grok {
      match => {
        "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+)?<msg>.+)"
      }
   overwrite => ["message"]
      tag_on_failure => ["_grokparsefailure"]
    }
    if "[@metadata][kafka][topic]" {
      mutate {
        add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }
      }
    }
  }
  
  output {
    # 确保topic_name字段存在才发送到Elasticsearch
    if [topic_name] {
      elasticsearch {
        hosts => ["192.168.1.177:9200"]
        # 利用topic_name字段来构建索引名
        index => "%{topic_name}-%{+YYYY.MM.dd}"
      }
    }
  
    stdout {
      codec => rubydebug
    }
  }
  配置文件详解
   Input部分
  input { 
    kafka { 
      bootstrap_servers => "192.168.1.179:9092" 
  #Kafka集群的地点和端口。
      topics_pattern => "topic.*" 
  #订阅所有以"topic."开头的Kafka主题。
      consumer_threads => 5 
  #消耗者线程数,用于并行读取Kafka消息。
      decorate_events => true 
  #的配置选项用于控制是否在Logstash变乱中添加与Kafka相干的元数据字段。当设置为true时,Logstash会向每个变乱添加一些额外的字段,这些字段包罗了关于Kafka消息的一些信息。
      codec => plain { charset => "UTF-8" } 
  #告诉Logstash以UTF-8编码的方式从Kafka消息中读取纯文本数据,并将其转换为Logstash变乱以供后续处理。如许,无论Kafka中的消息包罗何种字符,Logstash都可以或许正确解析并处理它们。codec => json 用于指定输入插件利用JSON格式来解码数据。这通常用于处理存储在Kafka等消息队列中的JSON格式的消息。
      auto_offset_reset => "latest" 
  #它决定了当Logstash开始消耗Kafka主题时,从哪里开始读取消息。
  auto_offset_reset 参数有以下几个可能的值:
  "earliest": 从最早的记载开始读取,即从头开始消耗。
  "latest": 从最新的记载开始读取,即只消耗在Logstash开始消耗之后新产生的消息。
  "none": 如果找不到初始偏移量,就抛出非常。
  
      group_id => "logstash1" 
    } 
  }
  Filter部分
  filter { 
    # ... Ruby代码块(已注释) 
    mutate { 
      remove_field => ["beat"] 
  #mutate:移除beat字段
    } 
    grok { 
      match => { 
        "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+)?<msg>.+)" 
  #利用正则表达式解析message字段,并提取出时间、日志级别、线程、类名、行号和消息内容等字段。
      } 
      overwrite => ["message"] 
  #将原始message字段替换为grok解析后的内容。
      tag_on_failure => ["_grokparsefailure"] 
  #如果grok解析失败,则给变乱添加一个标签_grokparsefailure
    } 
    if "[@metadata][kafka][topic]" { 
      mutate { 
        add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" } 
  #if条件判定:如果变乱中存在[@metadata][kafka][topic]字段,则添加一个名为topic_name的新字段,其值为Kafka主题名。
      } 
    } 
  }
  Output部分
  output { 
    if [topic_name] { 
  #只有当变乱中存在topic_name字段时,才实行下面的输出操纵。
      elasticsearch { 
        hosts => ["192.168.1.177:9200"] 
        index => "%{topic_name}-%{+YYYY.MM.dd}" 
  #利用topic_name和日期来构建索引名。
      } 
    } 
  
    stdout { 
  #输出到标准输出(通常用于调试)。
      codec => rubydebug 
  #利用rubydebug编码,以易读的格式显示变乱内容
    } 
  }
  
6.6、启动logstash

   [yu@logstash tmp]$ logstash -f /usr/local/logstash-8.4.0/config/logstash.conf
  

6.7后台启动

   [yu@logstash tmp]$ sudo nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &
  6.8、开机自启动

   [yu@logstash tmp]$ sudo echo "source /etc/profile" >> /etc/rc.local #让开机加载java环境
  [yu@logstash tmp]$ sudo echo "nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &" >> /etc/rc.local
  [yu@logstash tmp]$ sudo chmod +x /etc/rc.local
  7、登录kibana网页检察索引生成情况生成数据视图



8、Zabbix与ELK整合实现对安整日志数据的实时监控告警

     有些时候,我们希望在收集日志的时候,可以或许将日志中的非常信息(警告、错误、失败等信息)实时的提取出来,因为日志中的非常信息意味着操纵系统、应用程序可能存在故障,如果能将日志中的故障信息实时的告知运维人员,那么运维就可以第一时间去进行故障排查和处理,进而也就可以制止很多故障的发生。
     ELK(更确切的说应该是logstash)可以实时的读取日志的内容,而且还可以过滤日志信息,通过ELK的读取和过滤功能,就可以将日志中的一些非常关键字(error、failed、OutOff、Warning)过滤出来,然后通过logstash的zabbix插件将这个错误日志信息发送给zabbix,那么zabbix在吸收到这个数据后,结合自身的机制,然后发起告警动作,如许就实现了日志非常zabbix实时告警的功能了。
8.1、Logstash与zabbix插件的利用

    Logstash支持多种输出介质,比如syslog、HTTP、TCP、elasticsearch、kafka等,而有时候我们想将收集到的日志中一些错误信息输出,并告警时,就用到了logstash-output-zabbix这个插件,此插件可以将Logstash与zabbix进行整合,也就是将Logstash收集到的数据进行过滤,将有错误标识的日志输出到zabbix中,最后通过zabbix的告警机制进行触发、告警。

   logstash-output-zabbix是一个社区维护的插件,它默认没有在Logstash中安装,但是安装起来也很轻易,
8.1.1下载阿里源

   [yu@logstash ~]$ sudo wget   -O   /etc/yum.repos.d/CentOS-Base.repo    http://mirrors.aliyun.com/repo/Centos-7.repo
  
8.1.2直接在logstash中运行如下下令即可:

   [yu@logstash ~]$ logstash-plugin install logstash-output-zabbix
    Using bundled JDK: /usr/local/logstash-8.4.0/jdk
  Validating logstash-output-zabbix
  Resolving mixin dependencies
  Installing logstash-output-zabbix
  Installation successful
  
   列出现在已经安装的插件
  [yu@logstash ~]$ logstash-plugin list
    列出已安装的插件及版本信息
  [yu@logstash ~]$ logstash-plugin list --verbose
  8.2、logstash-output-zabbix插件的利用

logstash-output-zabbix安装好之后,就可以在logstash配置文件中利用了,下面是一个logstash-output-zabbix利用的例子:
   zabbix {
          zabbix_host =>"[@metadata][zabbix_host]"
          zabbix_key =>"[@metadata][zabbix_key]"
          zabbix_server_host =>"x.x.x.x"
          zabbix_server_port =>"xxxx"
          zabbix_value ="xxxx"
          }
  其中:
zabbix_host:表现Zabbix主机名字段名称, 可以是单独的一个字段, 也可以是 @metadata 字段的子字段, 是必须的设置,没有默认值。

zabbix_key表现Zabbix项目键的值,也就是zabbix中的item,此字段可以是单独的一个字段, 也可以是 @metadata 字段的子字段,没有默认值。

zabbix_server_host:表现Zabbix服务器的IP或可解析主机名,默认值是 “localhost”,需要设置为zabbix server服务器所在的地点。

zabbix_server_port表现Zabbix服务器开启的监听端口,默认值是10051。

zabbix_value表现要发送给zabbix item监控项的值对应的字段名称,默认值是 “message”,也就是将”message”字段的内容发送给上面zabbix_key界说的zabbix item监控项,固然也可以指定一个详细的字段内容发送给zabbix item监控项。


8.3、将logstash与zabbix进行整合


这里我们以logstash收集日志,然后对日志进行读取,最后选择关键字进行过滤并调用zabbix告警的流程,来看看如何配置logstash实现zabbix告警。

先阐明一下我们的应用需求:通过对系统日志文件的监控,然后去过滤日志信息中的一些关键字,比方ERROR、Failed、WARNING等,将日志中这些信息过滤出来,然后发送到zabbix上,最后借助zabbix的报警功能实现对系统日志中有上述关键字的告警。

对于过滤关键字,进行告警,不同的业务系统,可能关键字不尽相同,比方对http系统,可能需要过滤500、403、503等这些错误码,对于java相干的系统,可能需要过滤OutOfMemoryError、PermGen、Java heap等关键字。在某些业务系统的日志输出中,可能另有一些自界说的错误信息,那么这些也需要作为过滤关键字来利用。

8.3.1配置logstash变乱配置文件

   input { 
    kafka { 
      bootstrap_servers => "192.168.1.179:9092" 
      topics_pattern => "topic.*" 
      consumer_threads => 5 
      decorate_events => true 
      codec => plain { charset => "UTF-8" } 
      auto_offset_reset => "latest" 
      group_id => "logstash1" 
    } 
  } 
  
  filter { 
    # 移除不需要的字段 
    mutate { 
      remove_field => ["beat"] 
    } 
  
    # 利用grok来解析日志 
    grok { 
      match => { 
        "message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+)?<msg>.+)" 
      } 
      overwrite => ["message"] 
      tag_on_failure => ["_grokparsefailure"] 
    } 
  
    # 添加topic_name字段 
    if "[@metadata][kafka][topic]" { 
      mutate { 
        add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" } 
      } 
    } 
  
    # 为Zabbix预备特定的字段 
    if [topic_name] { 
      mutate { 
        add_field => { "zabbix_topic" => "%{topic_name}" } 
        add_field => { "zabbix_level" => "%{level}" } 
        add_field => { "zabbix_message" => "%{msg}" } 
        # 如果需要,继续添加其他Zabbix需要的字段 
      } 
    } 
  } 
  
  output { 
    # 发送给Elasticsearch 
    if [topic_name] { 
      elasticsearch { 
        hosts => ["192.168.1.177:9200"] 
        index => "%{topic_name}-%{+YYYY.MM.dd}" 
      } 
    } 
  
    # 发送给Zabbix 
    if [zabbix_topic] and [zabbix_level] and [zabbix_message] { 
      zabbix { 
        zabbix_host => "your_zabbix_server_host" 
        zabbix_port => 10051 
        zabbix_sender_host => "logstash_host_name" 
  
        # 界说item key和对应的字段 
        item_key => { 
          "topic" => "%{zabbix_topic}" 
          "level" => "%{zabbix_level}" 
          "message" => "%{zabbix_message}" 
          # ... 其他键值对 ... 
        } 
  
        # 其他可选参数,比如zabbix_server等 
      } 
    } 
  }
  
  
                          }
  可以在一个服务器上同时启动多个logstash进程。但是,当同时启动多个logstash进程时,需要指定不同的path.data,否则会报错。比方,利用以下下令启动两个logstash进程:
   ./logstash -f /etc/logstash/config.d/xxx1.conf --path.data=/etc/logstash/data1/ & 
  ./logstash -f /etc/logstash/config.d/xxx2.conf --path.data=/etc/logstash/data2/ &
  



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

千千梦丶琪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表