【星海随笔】云解决方案学习日志篇(二) kafka、Zookeeper、Fielbeat ...

农民  金牌会员 | 2024-8-12 00:57:26 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

Elastic 中国社区官方博客
https://blog.csdn.net/ubuntutouch/category_9209092.html
Kafka

kafka的源代码是基于Scala语言编写的,运行在Java虚拟机(即:JVM)上。因此,在安装kafka之前需要先安装JDK
Kafka 为什么依赖 Zookeeper


  • 1.协调分布式体系:Kafka是一个分布式体系,各个节点之间需要进行协调和同步,而Zookeeper正是为分布式体系提供协调和同步的服务的。
  • 2.元数据管理:Kafka的元数据包括了集群的配置、broker的状态等信息,而这些信息需要被全部的Kafka节点共享和维护。Zookeeper提供了一个分布式的文件体系,可以方便地存储和管理这些元数据信息。
  • 3.向导推举:Kafka的一个分区只会分配给一个broker进行读写,而这个broker就是该分区的leader。当leader宕机后,需要从剩余的broker中推举一个新的leader。而Zookeeper可以提供分布式锁和推举的功能,因此Kafka可以利用Zookeeper来实现leader推举。
综上所述,Kafka依赖Zookeeper主要是为了协调分布式体系、元数据管理和向导推举。

ZK安装
来源于apache
   1.下载
下载地点:https://zookeeper.apache.org/releases.html
  2.解压安装包
  1. tar -zxf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local/
  2. /usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper-3.7.1/
复制代码
3.拷贝配置文件,
  1. cp /usr/local/zookeeper-3.7.1/conf/zoo_sample.cfg /usr/local/zookeeper-3.7.1/conf/zoo.cfg
复制代码
4.修改配置文件
  1. #在配置文件中加一行监听本机 IP 即可
  2. clientPortAddress=10.0.5.163
复制代码
  zookeeper默认会占用8080端口,如果你本机已有服务在利用8080,可以把下面参数添加到zoo.cfg 文件里,自定义端口
admin.serverPort=8001
  5.启动zk
  1. /usr/local/zookeeper-3.7.1/bin/zkServer.sh start
复制代码
6.查察端口是否监听
  1. netstat -lntp |grep 2181
复制代码
  如果服务未监听,请查察日志排查问题
more zookeeper-root-server-VM-5-163-centos.out
  
kafka 部署

1.下载
   下载地点:https://kafka.apache.org/downloads
  2.解压安装包
  1. tar -zxf kafka_2.12-3.4.0.tgz -C /usr/local/
复制代码
3.修改kafka配置
  1. vim /usr/local/kafka_2.12-3.4.0/config/server.properties
  2. #修改 zk 的IP
  3. zookeeper.connect=10.0.5.163:2181
  4. #修改监听地址
  5. listeners=PLAINTEXT://10.0.5.163:9092
复制代码
4.启动kafka
  1. nohup /usr/local/kafka_2.12-3.4.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.4.0/config/server.properties >/tmp/kafka.log 2>&1 &
复制代码
5.查察端口是否监听
  1. netstat -lntp |grep 9092
复制代码

Flebeat部署
   原理流程如下:
首先是input输入,可以指定多个数据输入源,然后通过通配符进行日志文件的匹配
匹配到日志后,就会利用Harvester(收割机),将日志源源不断的读取到来
然后收割机收割到的日志,就传递到Spooler(卷轴),然后卷轴就在将他们传到对应的地方
  1.下载
  1. wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.16.1-linux-x86_64.tar.gz
复制代码
2.解压二进制包
  1. tar zxf filebeat-7.16.1-linux-x86_64.tar.gz -C /usr/local/
  2. mv /usr/local/filebeat-7.16.1-linux-x86_64/ /usr/local/filebeat-7.16.1
复制代码
3.创建 Filebeat 配置文件
  1. #备份模板文件
  2. mv /usr/local/filebeat-7.16.1/filebeat.yml /usr/local/filebeat-7.16.1/filebeat.yml.bak
  3. #创建配置文件
  4. cat > /usr/local/filebeat-7.16.1/filebeat.yml << "EOF"
  5. filebeat.inputs:
  6. - type: log
  7.   tail_files: true
  8.   backoff: "1s"
  9.   paths:
  10.       - /var/log/nginx/access.json.log
  11.   fields:
  12.     type: access
  13.   fields_under_root: true
  14. - type: log
  15.   tail_files: true
  16.   backoff: "1s"
  17.   paths:
  18.       - /var/log/messages
  19.   fields:
  20.     type: messages
  21.   fields_under_root: true
  22. output:
  23.   kafka:
  24.     hosts: ["10.0.5.163:9092"]
  25.     topic: hosts_10-0-5-163
  26. EOF
复制代码
4.启动Fielbeat
  1. #查看是否已存在进程,将其停止
  2. ps -ef |grep filebeat |grep -v grep |awk '{print $2}' |xargs kill -9
  3. #启动Filebeat
  4. nohup /usr/local/filebeat-7.16.1/filebeat  -e -c /usr/local/filebeat-7.16.1/filebeat.yml >/tmp/filebeat.log 2>&1 &
  5. #查看进程
  6. ps -ef |grep filebeat
  7. #查看是否与ZK建立连接
  8. netstat -ntp |egrep -w '9092|filebeat'
复制代码

Fielbeat利用

启动
  1. ./filebeat -e -c shengxia.yml
复制代码
yaml文件介绍
  1. filebeat.inputs: # filebeat input输入
  2. - type: stdin    # 标准输入
  3.   enabled: true  # 启用标准输入
  4. setup.template.settings:
  5.   index.number_of_shards: 3 # 指定下载数
  6. output.console:  # 控制台输出
  7.   pretty: true   # 启用美化功能
  8.   enable: true
复制代码
运送至ElasticSearch或者Logstash,在Kibana中实现可视化
然后我们在控制台输入hello,就能看到我们会有一个json的输出,是通过读取到我们控制台的内容后输出的,内容如下
  1. {
  2.   "@timestamp": "2023-05-31T22:57:58.700Z",
  3.   "@metadata": {#元数据信息
  4.     "beat": "filebeat",
  5.     "type": "_doc",
  6.     "version": "8.8.1"
  7.   },
  8.   "log": {
  9.     "offset": 0,
  10.     "file": {
  11.       "path": ""
  12.     }
  13.   },
  14.   "message": "hello",#元数据信息
  15.   "input": {#控制台标准输入
  16.     "type": "stdin"#元数据信息
  17.   },
  18.   "ecs": {
  19.     "version": "8.0.0"
  20.   },
  21.   "host": {
  22.     "name": "elk-node1"
  23.   },
  24.   "agent": {#版本以及主机信息
  25.     "id": "5d5e4b99-8ee3-42f5-aae3-b0492d723730",
  26.     "name": "elk-node1",
  27.     "type": "filebeat",
  28.     "version": "8.8.1",
  29.     "ephemeral_id": "24b4fd16-5466-4d7e-b4b8-b73d41f77de0"
  30.   }
  31. }
  32. 参考文档:https://blog.csdn.net/qq_52589631/article/details/131216188
复制代码
再次创建一个文件,叫 shengxia-log.yml,然后在文件里添加如下内容
  1. filebeat.inputs:
  2. - type: log
  3.   enabled: true
  4.   paths:
  5.     - /opt/elk/logs/*.log
  6. setup.template.settings:
  7.   index.number_of_shards: 3
  8. output.console:
  9.   pretty: true
  10.   enable: true
复制代码
添加完成后,我们在到下面貌录创建一个日志文件
  1. # 创建文件夹
  2. mkdir -p /opt/elk/logs
  3. # 进入文件夹
  4. cd /opt/elk/logs
  5. # 追加内容
  6. echo "hello world" >> test.log
复制代码
然后再次启动filebeat
  1. ./filebeat -e -c shengxia-log.yml
复制代码
可以大概发现,它已经乐成加载到了我们的日志文件 test.log
同时我们还可以继承往文件中追加内容
追加后,我们再次查察filebeat,也能看到刚刚我们追加的内容
检测到日志文件有更新,立刻就会读取到更新的内容,而且输出到控制台。
自定义字段

  1.    当我们的元数据没办法支撑我们的业务时,我们还可以自定义添加一些字段
复制代码
  1. filebeat.inputs:
  2. - type: log
  3.   enabled: true
  4.   paths:
  5.     - /opt/elk/logs/*.log
  6.   tags: ["web", "test"]  #添加自定义tag,便于后续的处理
  7.   fields:  #添加自定义字段
  8.     from: web-test
  9.   fields_under_root: true #true为添加到根节点,false为添加到子节点中
  10. setup.template.settings:
  11.   index.number_of_shards: 3
  12. output.console:
  13.   pretty: true
  14.   enable: true
复制代码
添加完成后,重启 filebeat
  1. ./filebeat -e -c shengxia-log.yml
复制代码
  1. filebeat.inputs:
  2. - type: log
  3.   enabled: true
  4.   paths:
  5.     - /opt/elk/logs/*.log
  6.   tags: ["web", "test"]
  7.   fields:
  8.     from: web-test
  9.   fields_under_root: false
  10. setup.template.settings:
  11.   index.number_of_shards: 1
  12. output.elasticsearch:
  13.   hosts: ["192.168.40.150:9200","192.168.40.137:9200","192.168.40.138:9200"]
复制代码
Logstash 配置

1.修改Logstash 配置文件(下面 output 将日志打印到本地,观察日志是否收罗到,日志格式是否精确)
  1. cat > /usr/local/logstash-7.16.1/config/logstash.conf << "EOF"
  2. input {
  3.   kafka {
  4.     bootstrap_servers => "10.0.5.163:9092"
  5.     topics => ["hosts_10-0-5-163"]
  6.     group_id => "test"
  7.     codec => "json"
  8.   }
  9. }
  10. filter {
  11.   if [type] == "access" {
  12.     json {
  13.       source => "message"
  14.       remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
  15.     }
  16.   }
  17. }
  18. output {
  19.   stdout {
  20.     codec=>rubydebug
  21.   }
  22. }
  23. EOF
复制代码
2.实行前台启动命令
  1. #查看是否已存在进程,将其停止
  2. ps -ef |grep logstash |grep -v grep |awk '{print $2}' |xargs kill -9
  3. #启动 Logstash
  4. logstash -f /usr/local/logstash-7.16.1/config/logstash.conf
复制代码
3.查察kafka Group 和队列信息
  1. #进入kafka 安装目录
  2. cd /usr/local/kafka_2.12-3.4.0/bin
  3. #查看所有topic
  4. ./kafka-topics.sh  --bootstrap-server 10.0.5.163:9092 --lis
  5. #查看Group
  6. ./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --list
  7. #查看队列
  8. ./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --group test --describe
复制代码

4.修改配置文件,将output 将日志写入elasticsearch
  1. cat > /usr/local/logstash-7.16.1/config/logstash.conf << "EOF"
  2. input {
  3.   kafka {
  4.     bootstrap_servers => "10.0.5.163:9092"
  5.     topics => ["hosts_10-0-5-163"]
  6.     group_id => "test"
  7.     codec => "json"
  8.   }
  9. }
  10. filter {
  11.   if [type] == "access" {
  12.     json {
  13.       source => "message"
  14.       remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
  15.     }
  16.   }
  17. }
  18. output{
  19.   if [type] == "access" {
  20.     elasticsearch {
  21.       hosts => ["http://127.0.0.1:9200"]
  22.       user => "elastic"
  23.       password => "elk@2023"
  24.       index => "access-%{+YYYY.MM.dd}"
  25.     }
  26.   }
  27.   else if [type] == "messages" {
  28.     elasticsearch {
  29.       hosts => ["http://127.0.0.1:9200"]
  30.       user => "elastic"
  31.       password => "elk@2023"
  32.       index => "messages-%{+YYYY.MM.dd}"
  33.     }
  34.   }
  35. }
  36. EOF
复制代码
4.配景启动 Logstash
  1. #查看是否已存在进程,将其停止
  2. ps -ef |grep logstash |grep -v grep |awk '{print $2}' |xargs kill -9
  3. #启动 Logstash
  4. nohup logstash -f /usr/local/logstash-7.16.1/config/logstash.conf  >/tmp/logstash.log 2>&1 &
复制代码
查察服务日志是否正常
  1. 查看日志是否有 ERROR 持续输出
  2. tailf /tmp/logstash.log
  3. #查看logstash 端口是否监听
  4. netstat -lntp |grep 9600
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农民

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表