项目主机规划
主机名
| 外网IP
| 安装软件
| 所属集群
| elstack
| 192.168.1.176
| elasticsearch8.4
|
| logstash
| 192.168.1.178
| logstash8.4+Kibana8.4
|
| kafka
| 192.168.1.179
| kafka+zookeeper
|
|
1、环境预备
1.1 配置hosts文件
[yu@elk~]$ sudo vim /etc/hosts
192.168.1.177 elk
192.168.1.178 logstash
192.168.1.179 kafka
推送hosts文件到 yu的其他主机上
[yu@elk~]$ sudo scp /etc/hosts 192.168.1.178:/etc/hosts
[yu@elk~]$ sudo scp /etc/hosts 192.168.1.179:/etc/hosts
1.2、elk软件下载地点
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.4.0-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.4.0-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.4.0-linux-x86_64.tar.gz
wget https://artifacts.elastic.co/downloads/kibana/kibana-8.4.0-linux-x86_64.tar.gz
1.3、kafka+zookeeper下载地点
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.g
wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgzkafka_2.12-3.6.1https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
2、摆设elasticsearch
2.1、上传elasticsearch安装包
下载地点
2.2、解压软件包
[yu@elk~]$ cd /tmp
[yu@elk tmp]$ sudo tar -zxvf elasticsearch-8.4.0-linux-x86_64.tar.gz -C /usr/local/
2.3、给文件授权
[yu@elk~]$ sudo chown -R yu:yu /usr/local/elasticsearch-8.4.0/
2.4、修改linux系统打开文件最大数
[yu@elk~]$ sudo vim /etc/sysctl.conf
vm.max_map_count=655360
[yu@elk~]$ sudo sysctl -p /etc/sysctl.conf
[yu@elk~]$ sudo vim /etc/security/limits.conf
#在文件最后添加以下行
#修改最大打开文件数
* soft nofile 65536
* hard nofile 65536
# 修改最大进程数
* soft nproc 65536
* hard nproc 65536
2.5、安装jdk
由于YU 3个软件都需要JDK只支持,所以只要安装Elasticsearch + Logstash + Kibana的服务器都要装JDK,
Elasticsearch安装包自带jdk,我们利用自带版本
[yu@elk~]$ ll /usr/local/elasticsearch-8.4.0/
推送jdk到logstash服务器上
[yu@elk~]$ sudo scp -r /usr/local/elasticsearch-8.4.0/jdk logstash:/usr/local/
2.6、配置JDK环境变量
[yu@elk~]$ sudo vim /etc/profile #在文件最后加入一下行
export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk
export PATH=$PATH JAVA_HOME/bin
export CLASSPATH=$JAVA_HOME/lib/dt.jar JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
[yu@elk~]$ source /etc/profile #使环境变量生效
2.7、修改配置文件
[yu@elk~]$ cd /usr/local/elasticsearch-8.4.0/
[yu@elkelasticsearch-8.4.0]$ vim config/elasticsearch.yml
cluster.name: my-application
node.name: node-1
bootstrap.memory_lock: false #防止启动检测报错
network.host: 0.0.0.0 #修改监听IP
http.port: 9200
cluster.initial_master_nodes: ["node-1"]
xpack.security.enabled: false #关闭安全认证,否则利用http访问不到es,第一次启动没有这个参数,第二次启动再关闭
xpack.security.http.ssl:
enabled: false
2.8、启动Elasticsearch
[yu@elkelasticsearch-8.4.0]$ ./bin/elasticsearch
2.8、修改配置文件后再次启动,后台启动
[yu@elk elasticsearch-8.4.0]$ vim config/elasticsearch.yml
xpack.security.enabled: false #关闭安全认证,否则利用http访问不到es,第一次启动没有这个参数,第二次启动再关闭
xpack.security.http.ssl:
enabled: false
[yu@elk elasticsearch-8.4.0]$ ./bin/elasticsearch &
2.9、测试
网页 192.168.1.177:9200
2.10、加入开机自启动
[yu@elk elasticsearch-8.4.0]$ sudo vim /etc/init.d/elasticsearch
#!/bin/sh
#chkconfig: 2345 80 05
#description: elasticsearch
#author: taft
export JAVA_HOME=/usr/local/elasticsearch-8.4.0/jdk
export PATH=$PATH JAVA_HOME/bin
export CLASSPATH=$JAVA_HOME/lib/dt.jar JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
case "$1" in
start)
su yu<<!
cd /usr/local/elasticsearch-8.4.0/
./bin/elasticsearch -d
!
echo "elasticsearch startup"
;;
stop)
es_pid=`jps | grep Elasticsearch | awk '{print $1}'`
kill -9 $es_pid
echo "elasticsearch stopped"
;;
restart)
es_pid=`jps | grep Elasticsearch | awk '{print $1}'`
kill -9 $es_pid
echo "elasticsearch stopped"
su yu<<!
cd /usr/local/elasticsearch-8.4.0/
./bin/elasticsearch -d
!
echo "elasticsearch startup"
;;
*)
echo "start|stop|restart"
;;
esac
exit $?
su yu<<! 切换为yu用户实行下面的下令,<<! 相当于<<EOF
注意:
以上脚本的用户为yu,如果你的用户不是,则需要替换
以上脚本的JAVA_HOME以及elasticsearch_home如果不同请替换
[yu@elk~]$ sudo chmod +x /etc/init.d/elasticsearch #给脚本添加权限
[yu@elk~]$ sudo chkconfig --add /etc/init.d/elasticsearch #添加开机自启动
[yu@elk~]$ sudo chkconfig elasticsearch on
2.11、测试脚本
[yu@elk~]$ sudo systemctl restart elasticsearch
[yu@elk~]$ ps -aux | grep elasticsearch
3、安装Kibana
3.1、上传Kibana安装包
[yu@logstash ~]$ cd /tmp
[yu@logstash ~]$ ls
3.2、解压软件包
[yu@logstash tmp]$ sudo tar -zxvf kibana-8.4.0-linux-x86_64.tar.gz -C /usr/local/
3.3、给文件授权
[yu@logstash ~]$ sudo chown -R yu:yu /usr/local/kibana-8.4.0
3.4、修改配置文件
[yu@logstash tmp]$ cd
[yu@logstash ~]$ sudo vim /usr/local/kibana-8.4.0/config/kibana.yml
#开启以下选项并修改
server.port: 5601
server.host: "192.168.1.178"
elasticsearch.hosts: ["http://192.168.1.176:9200"] #修改elasticsearch地点,多个服务器请用逗号隔开。
i18n.locale: "zh-CN" #修改语言为中文
注意: host:与IP地点中间有个空格不能删除,否则报错。
3.5、启动服务
[yu@logstash ~]$ /usr/local/kibana-8.4.0/bin/kibana
3.6、kibana 设置后台启动
[yu@logstash ~]$ nohup /usr/local/kibana-8.4.0/bin/kibana &
[yu@logstash ~]$ ps -ef | grep kibana
3.7、网页测试
192.168.1.177:5601
4、搭建kafka服务 在kafka服务器上
4.1、环境搭建
4.11、上传kafka、zookeeper、jdk安装包 到/tmp目录下
[yu@kafka ~]$ ll /tmp
4.1.2、解压jdk
[yu@kafka ~]$ cd /tmp
[yu@kafka tmp]$ sudo tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/local/
4.1.3配置JDK环境变量
[yu@kafka tmp]$ sudo vim /etc/profile
#在文件最后加入一下行
JAVA_HOME=/usr/local/jdk1.8.0_171
PATH=$JAVA_HOME/bin PATH
CLASSPATH=$JAVA_HOME/jre/lib/ext JAVA_HOME/lib/tools.jar
export PATH JAVA_HOME CLASSPATH
[yu@kafka tmp]$ source /etc/profile
4.1.4、检察java环境
[yu@kafka tmp]$ java –version
4.2、安装zookeepe
4.2.1解压zookeeper安装包
[yu@kafka tmp]$ sudo tar -zxf apache-zookeeper-3.7.2-bin.tar.gz -C /usr/local/
4.2.2、创建快照日志存放目录并赋予权限:
[yu@kafka tmp]$ sudo mkdir -p /data/zk/data
[yu@kafka tmp]$ sudo chmod -R 777 /data/zk/data
4.2.3、 创建变乱日志存放目录:
[yu@kafka tmp]$ sudo mkdir -p /data/zk/datalog
[yu@kafka tmp]$ sudo chmod -R 777 /data/zk/datalog
4.2.4、生成配置文件
[yu@kafka tmp]$ cd /usr/local/apache-zookeeper-3.7.2-bin/conf/
[yu@kafka conf]$ sudo cp zoo_sample.cfg zoo.cfg
4.2.5、修改主配置文件zoo.cfg
dataDir=/data/zk/data #修改这一行为我们创建的目录
dataLogDir=/data/zk/datalog #添加这一行
4.2.6、添加path环境变量
[yu@kafka conf]$ sudo vim /etc/profile
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.7.2-bin
export PATH=$ZOOKEEPER_HOME/bin PATH
[yu@kafka conf]$ source /etc/profile
4.2.7、给文件授权
[yu@kafka conf]$ sudo chown -R yu:yu /usr/local/apache-zookeeper-3.7.2-bin
4.2.8、启动Zookeeper
[yu@kafka conf]$ zkServer.sh start
4.2.9、验证
[yu@kafka conf]$ zkServer.sh status
4.2.10、添加开机启动
在/lib/systemd/system/文件夹下创建一个启动脚本zookeeper.service
[yu@kafka conf]$ sudo vim /lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=forking
Environment="JAVA_HOME=/usr/local/jdk1.8.0_171"
ExecStart=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh start
ExecStop=/usr/local/apache-zookeeper-3.7.2-bin/bin/zkServer.sh stop
[Install]
WantedBy=multi-user.target
[yu@kafka conf]$ sudo systemctl daemon-reload
[yu@kafka conf]$ sudo systemctl enable zookeeper
测试:
[yu@kafka conf]$ jps
67608 Jps
67439 QuorumPeerMain
[yu@kafka conf]$ kill 67439
[yu@kafka conf]$ jps
67620 Jps
[yu@kafka conf]$ sudo systemctl start zookeeper
[yu@kafka conf]$ ps -ef |grep zookeeper
4.3.Kafka 单节点单Broker摆设
这里摆设的是kafka单节点,生产环境应该摆设kafka多节点集群。
4.3.1解压软件到指定目录
[yu@kafka conf]$ cd /tmp
[yu@kafka tmp]$ sudo tar -zxvf kafka_2.12-3.6.1.tgz -C /usr/local/
4.3.2 修改配置文件
[yu@kafka tmp]$ sudo vim /usr/local/kafka_2.12-3.6.1/config/server.properties
# broker的全局唯一编号,不能重复
broker.id=0
# 监听
listeners=PLAINTEXT://:9092 #开启此项
# 日志目录
log.dirs=/data/kafka/log #修改日志目录
# 配置zookeeper的毗连(如果不是本机,需要该为ip或主机名)
zookeeper.connect=localhost:2181
auto.create.topics.enable=true 自动创建topic 添加这一条
4.3.3、创建日志目录并改权限
[yu@kafka tmp]$ sudo mkdir -p /data/kafka/log
[yu@kafka tmp]$ sudo chmod -R 777 /data/kafka/log
4.3.4添加path环境变量
[yu@kafka tmp]$ sudo vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.12-3.6.1
export PATH=$KAFKA_HOME/bin PATH
[yu@kafka tmp]$ source /etc/profile
4.3.5、给文件授权
[yu@kafka tmp]$ sudo chown -R yu:yu /usr/local/kafka_2.12-3.6.1
4.3.6、启动kafka
后台启动kafka
[yu@kafka tmp]$ kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.6.1/config/server.properties
4.3.7添加开机自启
将kafka添加到开机自启服务
在/lib/systemd/system/文件夹下创建一个启动脚本kafka.service
[yu@kafka tmp]$ sudo vim /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment=" ATH=/usr/local/jdk1.8.0_171/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin"
ExecStart=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.6.1/config/server.properties
ExecStop=/usr/local/kafka_2.12-3.6.1/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
[yu@kafka tmp]$ sudo systemctl daemon-reload
[yu@kafka tmp]$ sudo systemctl enable kafka
测试
[yu@kafka tmp]$ jps
68931 Jps
68799 Kafka
[yu@kafka tmp]$ kill 68799
[yu@kafka tmp]$ jps
68954 Jps
[yu@kafka tmp]$ sudo systemctl start kafka
[yu@kafka tmp]$ sudo systemctl status kafka
5、 设置filebeat 在kafka服务器上,收集kafka服务器的日志
5.1、上传软件包
5.2、解压缩软件
[yu@kafka tmp]$ sudo tar -zxvf filebeat-8.4.0-linux-x86_64.tar.gz -C /usr/local/
5.3、给文件授权
[yu@kafka tmp]$ sudo chown -R yu:yu /usr/local/filebeat-8.4.0-linux-x86_64
[yu@ kafka tmp]$ sudo chmod o+r /var/log/*
5.4、修改配置文件
Filebeat 要求配置文件只能由其所有者写入,不允许组(group)和其他用户(others)也有写权限。
[yu@kafka tmp]$ sudo vim /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml
filebeat.idle_timeout: 2s
filebeat.name: filebeat-shiper
filebeat.spool_size: 50000
filebeat.inputs:
- type: log
paths:
- /usr/local/kafka_2.12-3.6.1/logs/controller.log
fields:
topic: topic-kafka-controller-log
enabled: true
backoff: 1s
backoff_factor: 2
close_inactive: 1h
encoding: plain
harvester_buffer_size: 262144
max_backoff: 10s
max_bytes: 10485760
scan_frequency: 10s
tail_lines: true
- type: log
paths:
- /usr/local/kafka_2.12-3.6.1/logs/state-change.log
fields:
topic: topic-kafka-state-change-log
enabled: true
backoff: 1s
backoff_factor: 2
close_inactive: 1h
encoding: plain
harvester_buffer_size: 262144
max_backoff: 10s
max_bytes: 10485760
scan_frequency: 10s
tail_lines: true
- type: log
paths:
- /usr/local/kafka_2.12-3.6.1/logs/server.log
fields:
topic: topic-kafka-run-log
enabled: true
backoff: 1s
backoff_factor: 2
close_inactive: 1h
encoding: plain
harvester_buffer_size: 262144
max_backoff: 10s
max_bytes: 10485760
scan_frequency: 10s
tail_lines: true
- type: log
paths:
- /usr/local/kafka_2.12-3.6.1/logs/kafka-request.log
fields:
topic: topic-kafka-request-log
enabled: true
backoff: 1s
backoff_factor: 2
close_inactive: 1h
encoding: plain
harvester_buffer_size: 262144
max_backoff: 10s
max_bytes: 10485760
scan_frequency: 10s
tail_lines: true
- type: log
paths:
- /var/log/messages
fields:
topic: topic-kafka-messages-log
enabled: true
backoff: 1s
backoff_factor: 2
close_inactive: 1h
encoding: plain
harvester_buffer_size: 262144
max_backoff: 10s
max_bytes: 10485760
scan_frequency: 10s
tail_lines: true
output.kafka:
bulk_flush_frequency: 0
bulk_max_size: 2048
codec.format:
string: '%{[message]}'
compression: gzip
compression_level: 4
hosts:
- 192.168.1.179:9092
max_message_bytes: 10485760
partition.round_robin:
reachable_only: true
required_acks: 1
topic: '%{[fields.topic]}'
setup.ilm.enabled: false
配置文件详解
全局配置
filebeat.idle_timeout: 2s:
#Filebeat在没有数据发送时保持打开状态的超时时间。
filebeat.name: filebeat-apache:
#Filebeat实例的名称,这有助于在日志和监控中识别它。每个单独的filebeat实例的名称必须不同
filebeat.spool_size: 50000:
#Filebeat内部利用的队列大小,它决定了可以缓冲多少个变乱。
输入配置
type: log:
#指定输入范例为日志。
paths:
#界说Filebeat需要监控的日志文件的路径。这里指定的是/usr/local/apache/logs/access_log。
fields:
#允许你添加自界说字段到输出的变乱中。这里添加了一个topic字段,值为topic-apache-access-log。
enabled: true:
#启用此输入配置。
backoff: 1s和 close_inactive: 1h:
#在读取文件遇到错误时,Filebeat会等待backoff指定的时间,然后每次重试等待的时间会乘以backoff_factor。
close_inactive:
#如果文件在给定的时间段内没有新变乱,Filebeat将关闭文件句柄。
#句柄(Handle)在计算机科学中是一个用来标识对象或资源的引用或者指针。在操纵系统和很多软件应用程序中,句柄被用作一个内部标识符,它允许程序访问和操纵系统或应用程序中的对象或资源,而无需直接知道对象的内存地点或其他详细信息。
encoding: plain
#指定日志文件的编码方式。
harvester_buffer_size: 262144
#读取文件时利用的缓冲区大小。
max_backoff: 10s
#最大重试等待时间。
max_bytes: 10485760
#单个变乱的最大字节数。
scan_frequency: 10s
#Filebeat检查指定路径下的新文件的频率。
tail_lines: true
#如果设置为true,Filebeat将从文件末尾开始读取,这通常用于实时日志流。
输出配置
output.kafka:
#指定输出到Kafka的配置。
bulk_flush_frequency: 0
#批量发送变乱到Kafka的频率,这里设置为0,意味着立即发送。
bulk_max_size:
#批量发送变乱的最大数量。
codec.format:
string: '%{[message]}'
#这表现Filebeat将利用字符串格式来格式化输出到Kafka的消息,而且该字符串的内容将是日志变乱中的message字段。%{[message]} 是一个字段引用,它告诉Filebeat从每个日志变乱中提取message字段的值,并将其作为Kafka消息的内容。
compression: gzip 和 compression_level: 4
#启用压缩,并设置压缩级别。
hosts:
#Kafka服务器的地点和端口。
max_message_bytes: 10485760
#Kafka消息的最大字节数。
partition.round_robin:
reachable_only: true
#partition.round_robin 设置启用了轮询(round-robin)策略,即Filebeat将按顺序将数据分发到Kafka的可用分区。这种策略确保在没有特定分区选择逻辑的情况下,数据可以或许匀称地在所有分区之间分布。reachable_only: true 是partition.round_robin下的一个子选项,它指定Filebeat只选择可达(reachable)的分区来发送数据。
required_acks: 1
#Kafka要求的确认数量,确保消息已经被写入。
topic: '%{[fields.topic]}'
#利用前面在fields中界说的topic字段的值作为Kafka的topic。
setup.ilm.enabled: false
#禁用索引生命周期管理(ILM)。
5.5、启动filebeat
[yu@kafka tmp]$ cd /usr/local/filebeat-8.4.0-linux-x86_64/ && ./filebeat -e -c filebeat.yml &
-e: 这个选项告诉Filebeat以“输出到标准错误”(Error output)的模式运行。这意味着所有的日志和错误信息会直接打印到控制台,而不是写入日志文件。这有助于在调试过程中快速检察问题。 测试时利用测试成功后取消这一个选项
-c filebeat.yml: 这个选项指定了Filebeat的配置文件路径。在这个例子中,配置文件是filebeat.yml,它应该包罗Filebeat如何收集、处理和转发日志的指令。
5.6、再开一个终端检察kafka主题是否创建
[yu@kafka logs]$ kafka-topics.sh --bootstrap-server kafka:9092 --list
topic-kafka-controller-log
topic-kafka-run-log
topic-kafka-state-change-log
表现kafka吸收到filebeat收集到的日志
5.7设置开机自启动
[yu@kafka logs]$ sudo echo "nohup /usr/local/filebeat-8.4.0-linux-x86_64/filebeat -e -c /usr/local/filebeat-8.4.0-linux-x86_64/filebeat.yml &" >> /etc/rc.d/rc.local
6、搭建logstash服务器
6.1、上传软件包
6.2、解压缩软件
[yu@logstash tmp]$ sudo tar -zxvf logstash-8.4.0-linux-x86_64.tar.gz -C /usr/local/
6.3、给文件授权
[yu@logstash tmp]$ sudo chown -R yu:yu /usr/local/logstash-8.4.0/
6.4、做软毗连
[yu@logstash tmp]$ sudo ln -s /usr/local/logstash-8.4.0/bin/* /usr/local/bin/
6.5、生成配置文件
input {
kafka {
bootstrap_servers => "192.168.1.179:9092"
topics_pattern => "topic.*"
consumer_threads => 5
decorate_events => true
codec => plain { charset => "UTF-8" }
auto_offset_reset => "latest"
group_id => "logstash1"
}
}
filter {
# 如果需要转换时间戳为本地时间,请取消注释并正确编写Ruby代码
# 注意:下面的代码可能需要调解,确保它符合您的详细需求
#ruby {
# code => "event.set('@timestamp', Time.parse(event.get('time')).localtime)"
#}
mutate {
remove_field => ["beat"]
}
grok {
match => {
"message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+) ?<msg>.+)"
}
overwrite => ["message"]
tag_on_failure => ["_grokparsefailure"]
}
if "[@metadata][kafka][topic]" {
mutate {
add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }
}
}
}
output {
# 确保topic_name字段存在才发送到Elasticsearch
if [topic_name] {
elasticsearch {
hosts => ["192.168.1.177:9200"]
# 利用topic_name字段来构建索引名
index => "%{topic_name}-%{+YYYY.MM.dd}"
}
}
stdout {
codec => rubydebug
}
}
配置文件详解
Input部分
input {
kafka {
bootstrap_servers => "192.168.1.179:9092"
#Kafka集群的地点和端口。
topics_pattern => "topic.*"
#订阅所有以"topic."开头的Kafka主题。
consumer_threads => 5
#消耗者线程数,用于并行读取Kafka消息。
decorate_events => true
#的配置选项用于控制是否在Logstash变乱中添加与Kafka相干的元数据字段。当设置为true时,Logstash会向每个变乱添加一些额外的字段,这些字段包罗了关于Kafka消息的一些信息。
codec => plain { charset => "UTF-8" }
#告诉Logstash以UTF-8编码的方式从Kafka消息中读取纯文本数据,并将其转换为Logstash变乱以供后续处理。如许,无论Kafka中的消息包罗何种字符,Logstash都可以或许正确解析并处理它们。codec => json 用于指定输入插件利用JSON格式来解码数据。这通常用于处理存储在Kafka等消息队列中的JSON格式的消息。
auto_offset_reset => "latest"
#它决定了当Logstash开始消耗Kafka主题时,从哪里开始读取消息。
auto_offset_reset 参数有以下几个可能的值:
"earliest": 从最早的记载开始读取,即从头开始消耗。
"latest": 从最新的记载开始读取,即只消耗在Logstash开始消耗之后新产生的消息。
"none": 如果找不到初始偏移量,就抛出非常。
group_id => "logstash1"
}
}
Filter部分
filter {
# ... Ruby代码块(已注释)
mutate {
remove_field => ["beat"]
#mutate:移除beat字段
}
grok {
match => {
"message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+) ?<msg>.+)"
#利用正则表达式解析message字段,并提取出时间、日志级别、线程、类名、行号和消息内容等字段。
}
overwrite => ["message"]
#将原始message字段替换为grok解析后的内容。
tag_on_failure => ["_grokparsefailure"]
#如果grok解析失败,则给变乱添加一个标签_grokparsefailure
}
if "[@metadata][kafka][topic]" {
mutate {
add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }
#if条件判定:如果变乱中存在[@metadata][kafka][topic]字段,则添加一个名为topic_name的新字段,其值为Kafka主题名。
}
}
}
Output部分
output {
if [topic_name] {
#只有当变乱中存在topic_name字段时,才实行下面的输出操纵。
elasticsearch {
hosts => ["192.168.1.177:9200"]
index => "%{topic_name}-%{+YYYY.MM.dd}"
#利用topic_name和日期来构建索引名。
}
}
stdout {
#输出到标准输出(通常用于调试)。
codec => rubydebug
#利用rubydebug编码,以易读的格式显示变乱内容
}
}
6.6、启动logstash
[yu@logstash tmp]$ logstash -f /usr/local/logstash-8.4.0/config/logstash.conf
6.7后台启动
[yu@logstash tmp]$ sudo nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &
6.8、开机自启动
[yu@logstash tmp]$ sudo echo "source /etc/profile" >> /etc/rc.local #让开机加载java环境
[yu@logstash tmp]$ sudo echo "nohup logstash -f /usr/local/logstash-8.4.0/config/logstash.conf &" >> /etc/rc.local
[yu@logstash tmp]$ sudo chmod +x /etc/rc.local
7、登录kibana网页检察索引生成情况生成数据视图
8、Zabbix与ELK整合实现对安整日志数据的实时监控告警
有些时候,我们希望在收集日志的时候,可以或许将日志中的非常信息(警告、错误、失败等信息)实时的提取出来,因为日志中的非常信息意味着操纵系统、应用程序可能存在故障,如果能将日志中的故障信息实时的告知运维人员,那么运维就可以第一时间去进行故障排查和处理,进而也就可以制止很多故障的发生。
ELK(更确切的说应该是logstash)可以实时的读取日志的内容,而且还可以过滤日志信息,通过ELK的读取和过滤功能,就可以将日志中的一些非常关键字(error、failed、OutOff、Warning)过滤出来,然后通过logstash的zabbix插件将这个错误日志信息发送给zabbix,那么zabbix在吸收到这个数据后,结合自身的机制,然后发起告警动作,如许就实现了日志非常zabbix实时告警的功能了。
8.1、Logstash与zabbix插件的利用
Logstash支持多种输出介质,比如syslog、HTTP、TCP、elasticsearch、kafka等,而有时候我们想将收集到的日志中一些错误信息输出,并告警时,就用到了logstash-output-zabbix这个插件,此插件可以将Logstash与zabbix进行整合,也就是将Logstash收集到的数据进行过滤,将有错误标识的日志输出到zabbix中,最后通过zabbix的告警机制进行触发、告警。
logstash-output-zabbix是一个社区维护的插件,它默认没有在Logstash中安装,但是安装起来也很轻易,
8.1.1下载阿里源
[yu@logstash ~]$ sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
8.1.2直接在logstash中运行如下下令即可:
[yu@logstash ~]$ logstash-plugin install logstash-output-zabbix
Using bundled JDK: /usr/local/logstash-8.4.0/jdk
Validating logstash-output-zabbix
Resolving mixin dependencies
Installing logstash-output-zabbix
Installation successful
列出现在已经安装的插件
[yu@logstash ~]$ logstash-plugin list
列出已安装的插件及版本信息
[yu@logstash ~]$ logstash-plugin list --verbose
8.2、logstash-output-zabbix插件的利用
logstash-output-zabbix安装好之后,就可以在logstash配置文件中利用了,下面是一个logstash-output-zabbix利用的例子:
zabbix {
zabbix_host =>"[@metadata][zabbix_host]"
zabbix_key =>"[@metadata][zabbix_key]"
zabbix_server_host =>"x.x.x.x"
zabbix_server_port =>"xxxx"
zabbix_value ="xxxx"
}
其中:
zabbix_host:表现Zabbix主机名字段名称, 可以是单独的一个字段, 也可以是 @metadata 字段的子字段, 是必须的设置,没有默认值。
zabbix_key:表现Zabbix项目键的值,也就是zabbix中的item,此字段可以是单独的一个字段, 也可以是 @metadata 字段的子字段,没有默认值。
zabbix_server_host:表现Zabbix服务器的IP或可解析主机名,默认值是 “localhost”,需要设置为zabbix server服务器所在的地点。
zabbix_server_port:表现Zabbix服务器开启的监听端口,默认值是10051。
zabbix_value:表现要发送给zabbix item监控项的值对应的字段名称,默认值是 “message”,也就是将”message”字段的内容发送给上面zabbix_key界说的zabbix item监控项,固然也可以指定一个详细的字段内容发送给zabbix item监控项。
8.3、将logstash与zabbix进行整合
这里我们以logstash收集日志,然后对日志进行读取,最后选择关键字进行过滤并调用zabbix告警的流程,来看看如何配置logstash实现zabbix告警。
先阐明一下我们的应用需求:通过对系统日志文件的监控,然后去过滤日志信息中的一些关键字,比方ERROR、Failed、WARNING等,将日志中这些信息过滤出来,然后发送到zabbix上,最后借助zabbix的报警功能实现对系统日志中有上述关键字的告警。
对于过滤关键字,进行告警,不同的业务系统,可能关键字不尽相同,比方对http系统,可能需要过滤500、403、503等这些错误码,对于java相干的系统,可能需要过滤OutOfMemoryError、PermGen、Java heap等关键字。在某些业务系统的日志输出中,可能另有一些自界说的错误信息,那么这些也需要作为过滤关键字来利用。
8.3.1配置logstash变乱配置文件
input {
kafka {
bootstrap_servers => "192.168.1.179:9092"
topics_pattern => "topic.*"
consumer_threads => 5
decorate_events => true
codec => plain { charset => "UTF-8" }
auto_offset_reset => "latest"
group_id => "logstash1"
}
}
filter {
# 移除不需要的字段
mutate {
remove_field => ["beat"]
}
# 利用grok来解析日志
grok {
match => {
"message" => "\[(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+) ?<msg>.+)"
}
overwrite => ["message"]
tag_on_failure => ["_grokparsefailure"]
}
# 添加topic_name字段
if "[@metadata][kafka][topic]" {
mutate {
add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }
}
}
# 为Zabbix预备特定的字段
if [topic_name] {
mutate {
add_field => { "zabbix_topic" => "%{topic_name}" }
add_field => { "zabbix_level" => "%{level}" }
add_field => { "zabbix_message" => "%{msg}" }
# 如果需要,继续添加其他Zabbix需要的字段
}
}
}
output {
# 发送给Elasticsearch
if [topic_name] {
elasticsearch {
hosts => ["192.168.1.177:9200"]
index => "%{topic_name}-%{+YYYY.MM.dd}"
}
}
# 发送给Zabbix
if [zabbix_topic] and [zabbix_level] and [zabbix_message] {
zabbix {
zabbix_host => "your_zabbix_server_host"
zabbix_port => 10051
zabbix_sender_host => "logstash_host_name"
# 界说item key和对应的字段
item_key => {
"topic" => "%{zabbix_topic}"
"level" => "%{zabbix_level}"
"message" => "%{zabbix_message}"
# ... 其他键值对 ...
}
# 其他可选参数,比如zabbix_server等
}
}
}
}
可以在一个服务器上同时启动多个logstash进程。但是,当同时启动多个logstash进程时,需要指定不同的path.data,否则会报错。比方,利用以下下令启动两个logstash进程:
./logstash -f /etc/logstash/config.d/xxx1.conf --path.data=/etc/logstash/data1/ &
./logstash -f /etc/logstash/config.d/xxx2.conf --path.data=/etc/logstash/data2/ &
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |