企业级日志系统架构——ELK(Elasticsearch、Filebeat、Kafka、Logstash、K ...

打印 上一主题 下一主题

主题 515|帖子 515|积分 1555

目录

一、概述

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。
大致流程图如下:

1)Elasticsearch 存储

Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
2)Filebeat 日志数据采集

filebeat是Beats中的一员,Beats在是一个轻量级日志采集器,其实Beats家族有6个成员,早期的ELK架构中使用Logstash收集、解析日志,但是Logstash对内存、cpu、io等资源消耗比较高。相比Logstash,Beats所占系统的CPU和内存几乎可以忽略不计。
Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置,收集日志事件。
目前Beats包含六种工具:

  • Packetbeat:网络数据(收集网络流量数据)
  • Metricbeat:指标(收集系统、进程和文件系统级别的CPU和内存使用情况等数据)
  • Filebeat:日志文件(收集文件数据)
  • Winlogbeat:windows事件日志(收集Windows事件日志数据)
  • Auditbeat:审计数据(收集审计日志)
  • Heartbeat:运行时间监控(收集系统运行时的数据)
工作的流程图如下:

优点

  • Filebeat 只是一个二进制文件没有任何依赖。它占用资源极少。
缺点

  • Filebeat 的应用范围十分有限,因此在某些场景下咱们会碰到问题。在 5.x 版本中,它还具有过滤的能力。
3)Kafka

kafka能帮助我们削峰。ELK可以使用redis作为消息队列,但redis作为消息队列不是强项而且redis集群不如专业的消息发布系统kafka。kafka安装可以参考我之前的文章:Kafka原理介绍+安装+基本操作(kafka on k8s)
4)Logstash 过滤

Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
优点

  • 可伸缩性
节拍应该在一组Logstash节点之间进行负载平衡。
建议至少使用两个Logstash节点以实现高可用性。
每个Logstash节点只部署一个Beats输入是很常见的,但每个Logstash节点也可以部署多个Beats输入,以便为不同的数据源公开独立的端点。


  • 弹性
Logstash持久队列提供跨节点故障的保护。对于Logstash中的磁盘级弹性,确保磁盘冗余非常重要。对于内部部署,建议您配置RAID。在云或容器化环境中运行时,建议您使用具有反映数据SLA的复制策略的永久磁盘。


  • 可过滤
对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。
缺点

  • Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。
5)Kibana 展示

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。
filebeat和logstash的关系

因为logstash是jvm跑的,资源消耗比较大,所以后来作者又用golang写了一个功能较少但是资源消耗也小的轻量级的logstash-forwarder。不过作者只是一个人,加入http://elastic.co公司以后,因为es公司本身还收购了另一个开源项目packetbeat,而这个项目专门就是用golang的,有整个团队,所以es公司干脆把logstash-forwarder的开发工作也合并到同一个golang团队来搞,于是新的项目就叫filebeat了。
二、helm3安装ELK

详细流程图如下:

1)准备条件

1、添加helm仓库
  1. $ helm repo add elastic   https://helm.elastic.co
复制代码
2)helm3安装elasticsearch

1、自定义values

主要是设置storage Class 持久化和资源限制,本人电脑资源有限,所以这里就把资源调小了很多,小伙伴们可以根据自己配置自定义哈。
  1. $ # 集群名称
  2. clusterName: "elasticsearch"
  3. # ElasticSearch 6.8+ 默认安装了 x-pack 插件,部分功能免费,这里选禁用
  4. esConfig:
  5. elasticsearch.yml: |
  6.     network.host: 0.0.0.0
  7.     cluster.name: "elasticsearch"
  8.     xpack.security.enabled: false
  9. resources:
  10.   requests:
  11.     memory: 1Gi
  12. volumeClaimTemplate:
  13.   storageClassName: "bigdata-nfs-storage"
  14.   accessModes: [ "ReadWriteOnce" ]
  15.   resources:
  16.     requests:
  17.       storage: 3Gi
  18. service:
  19.   type: NodePort
  20.   port: 9000
  21.   nodePort: 31311
复制代码
禁用Kibana安全提示(Elasticsearch built-in security features are not enabled)
xpack.security.enabled: false
2、开始安装Elasitcsearch

安装过程比较慢,因为官方镜像下载比较慢
  1. $ helm install es elastic/elasticsearch -f my-values.yaml  --namespace bigdata
复制代码
  1. W1207 23:10:57.980283   21465 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
  2. W1207 23:10:58.015416   21465 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
  3. NAME: es
  4. LAST DEPLOYED: Tue Dec  7 23:10:57 2021
  5. NAMESPACE: bigdata
  6. STATUS: deployed
  7. REVISION: 1
  8. NOTES:
  9. 1. Watch all cluster members come up.
  10.   $ kubectl get pods --namespace=bigdata -l app=elasticsearch-master -w2. Test cluster health using Helm test.
  11.   $ helm --namespace=bigdata test es
复制代码
查看,需要所有pod都正常运行才正常,下载镜像有点慢,需要稍等一段时间再查看
  1. $ kubectl get pod -n bigdata -l app=elasticsearch-master
  2. $ kubectl get pvc -n bigdata
  3. $ watch kubectl get pod -n bigdata -l app=elasticsearch-master
复制代码

3、验证
  1. $ helm --namespace=bigdata test es
  2. $ kubectl get pod,svc -n bigdata -l app=elasticsearch-master -o wide
  3. $ curl 192.168.0.113:31311/_cat/health
  4. $ curl 192.168.0.113:31311/_cat/nodes
复制代码

4、清理
  1. $ helm uninstall es -n bigdata
  2. $ kubectl delete pvc elasticsearch-master-elasticsearch-master-0 -n bigdata
  3. $ kubectl delete pvc elasticsearch-master-elasticsearch-master-1 -n bigdata
  4. $ kubectl delete pvc elasticsearch-master-elasticsearch-master-2 -n bigdata
复制代码
3)helm3安装Kibana

1、自定义values

域名(elasticsearch-master-headless.bigdata.svc.cluster.local)的由来不清楚的,可以参考我之前的文章:Kubernetes(k8s)DNS(CoreDNS)介绍
  1. $ cat <<EOF> my-values.yaml
  2. #此处修改了kibana的配置文件,默认位置/usr/share/kibana/kibana.yaml
  3. kibanaConfig:
  4.    kibana.yml: |
  5.      server.port: 5601
  6.      server.host: "0.0.0.0"
  7.      elasticsearch.hosts: [ "elasticsearch-master-headless.bigdata.svc.cluster.local:9200" ]
  8. resources:
  9.   requests:
  10.     cpu: "1000m"
  11.     memory: "256Mi"
  12.   limits:
  13.     cpu: "1000m"
  14.     memory: "1Gi"
  15. service:
  16.   #type: ClusterIP
  17.   type: NodePort
  18.   loadBalancerIP: ""
  19.   port: 5601
  20.   nodePort: "30026"
  21. EOF
复制代码
output plugin  输出插件,将事件发送到特定目标:
stdout { codec => rubydebug }            // 开启debug模式,可在控制台输出


  • stdout :标准输出。将事件输出到屏幕上
  1. $ helm install kibana elastic/kibana -f my-values.yaml  --namespace bigdata
复制代码

  • file  :将事件写入文件
  1. $ kubectl get pod,svc -n bigdata -l app=kibana
复制代码

  • kafka :将事件发送到kafka
  1. $ helm uninstall kibana -n bigdata
复制代码

  • elasticseach :在es中存储日志
  1. - name: varlibdockercontainers
  2.   hostPath:
  3.     path: /var/lib/docker/containers   #改为docker安装路径
复制代码
2、开始安装Logstash
  1. $ cat <<EOF> my-values.yaml
  2. daemonset:
  3.   filebeatConfig:
  4.     filebeat.yml: |
  5.       filebeat.inputs:
  6.       - type: container
  7.         paths:
  8.           - /var/log/containers/*.log
  9.       output.elasticsearch:
  10.         enabled: false
  11.         host: '${NODE_NAME}'
  12.         hosts: '${ELASTICSEARCH_HOSTS:elasticsearch-master:9200}'
  13.       output.kafka:
  14.        enabled: true
  15.        hosts: ["kafka-headless.bigdata.svc.cluster.local:9092"]
  16.        topic: test
  17. EOF
复制代码
  1. $ helm install filebeat elastic/filebeat -f my-values.yaml  --namespace bigdata
  2. $ kubectl get pods --namespace=bigdata -l app=filebeat-filebeat -w
复制代码

3、验证

1、登录kibana查看索引是否创建

2、查看logs
  1. # 先登录kafka客户端
  2. $ kubectl exec --tty -i kafka-client --namespace bigdata -- bash
  3. # 再消费数据
  4. $ kafka-console-consumer.sh --bootstrap-server kafka.bigdata.svc.cluster.local:9092 --topic test
复制代码

3、查看kafka消费情况
  1. $ kubectl exec --tty -i kafka-client --namespace bigdata -- bash
  2. $ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --describe --group mygroup
复制代码

4、通过kibana查看索引数据(Kibana版本:7.15.0)
创建索引模式
Management-》Stack Management-》Kibana-》Index patterns



通过上面创建的索引模式查询数据(Discover)


4、清理
  1. $ helm uninstall filebeat -n bigdata
复制代码
三、ELK相关的备份组件和备份方式

Elasticsearch备份两种方式:

  • 将数据导出成文本文件,比如通过 elasticdumpesm 等工具将存储在 Elasticsearch 中的数据导出到文件中。适用数据量小的场景
  • 备份 elasticsearch data 目录中文件的形式来做快照,借助 Elasticsearch 中 snapshot 接口实现的功能。适用大数据量的场景
1)Elasticsearch的snapshot快照备份


  • 优点:通过snapshot拍摄快照,然后定义快照备份策略,能够实现快照自动化存储,可以定义各种策略来满足自己不同的备份
  • 缺点:还原不够灵活,拍摄快照进行备份很快,但是还原的时候没办法随意进行还原,类似虚拟机快照
1、配置备份目录
在 elasticsearch.yml 的配置文件中注明可以用作备份路径 path.repo ,如下所示:
  1. $ cat <<EOF> my-values.yaml
  2. logstashConfig:
  3.   logstash.yml: |
  4.     xpack.monitoring.enabled: false
  5. logstashPipeline:
  6.    logstash.yml: |
  7.     input {
  8.       kafka {
  9.             bootstrap_servers => "kafka-headless.bigdata.svc.cluster.local:9092"
  10.             topics => ["test"]
  11.             group_id => "mygroup"
  12.             #如果使用元数据就不能使用下面的byte字节序列化,否则会报错
  13.             #key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
  14.             #value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
  15.             consumer_threads => 1
  16.             #默认为false,只有为true的时候才会获取到元数据
  17.             decorate_events => true
  18.             auto_offset_reset => "earliest"
  19.          }
  20.     }
  21.     filter {
  22.       mutate {
  23.         #从kafka的key中获取数据并按照逗号切割
  24.         split => ["[@metadata][kafka][key]", ","]
  25.         add_field => {
  26.             #将切割后的第一位数据放入自定义的“index”字段中
  27.             "index" => "%{[@metadata][kafka][key][0]}"
  28.         }
  29.       }
  30.     }
  31.     output {
  32.       elasticsearch {
  33.           pool_max => 1000
  34.           pool_max_per_route => 200
  35.           hosts => ["elasticsearch-master-headless.bigdata.svc.cluster.local:9200"]
  36.           index => "test-%{+YYYY.MM.dd}"
  37.       }
  38.     }
  39. # 资源限制
  40. resources:
  41.   requests:
  42.     cpu: "100m"
  43.     memory: "256Mi"
  44.   limits:
  45.     cpu: "1000m"
  46.     memory: "1Gi"
  47. volumeClaimTemplate:
  48.   accessModes: ["ReadWriteOnce"]
  49.   resources:
  50.     requests:
  51.       storage: 3Gi
  52. EOF
复制代码
配置好后,就可以使用 snapshot api 来创建一个 repository 了,如下我们创建一个名为 my_backup 的 repository。
  1. output{
  2.     stdout{
  3.         codec => "rubydebug"
  4.     }
  5. }
复制代码
2、开始通过API接口备份
有了 repostiroy 后,我们就可以做备份了,也叫快照,也就是记录当下数据的状态。如下所示我们创建一个名为 snapshot_1 的快照。
  1. output{
  2.    file {
  3.        path => "/data/logstash/%{host}/{application}
  4.        codec => line { format => "%{message}"} }
  5.     }
  6. }
复制代码
【温馨提示】wait_for_completion 为 true 是指该 api 在备份执行完毕后再返回结果,否则默认是异步执行的,我们这里为了立刻看到效果,所以设置了该参数,线上执行时不用设置该参数,让其在后台异步执行即可。
3、增量备份
  1. output{
  2.    kafka{
  3.         bootstrap_servers => "localhost:9092"
  4.         topic_id => "test_topic"  #必需的设置。生成消息的主题
  5.     }
  6. }
复制代码
当执行完毕后,你会发现 /mount/backups/my_backup 体积变大了。这说明新数据备份进来了。要说明的一点是,当你在同一个 repository 中做多次 snapshot 时,elasticsearch 会检查要备份的数据 segment 文件是否有变化,如果没有变化则不处理,否则只会把发生变化的 segment file 备份下来。这其实就实现了增量备份。
4、数据恢复
通过调用如下 api 即可快速实现恢复功能:
  1. output{
  2.    elasticsearch {
  3.         #user => elastic
  4.         #password => changeme
  5.         hosts => "localhost:9200"
  6.         index => "nginx-access-log-%{+YYYY.MM.dd}"  
  7.     }
  8. }
复制代码
2)elasticdump备份迁移es数据

索引数据导出为文件(备份)
  1. $ helm install logstash elastic/logstash -f my-values.yaml  --namespace bigdata
复制代码
索引数据文件导入至索引(恢复)
  1. $ kubectl get pods --namespace=bigdata -l app=logstash-logstash
复制代码
可直接将备份数据导入另一个es集群
  1. $ kubectl logs -f  logstash-logstash-0 -n bigdata >logs
  2. $ tail -100 logs
复制代码
type类型
type是ES数据导出导入类型,Elasticdump工具支持以下数据类型:
type类型说明mappingES的索引映射结构数据dataES的数据settingsES的索引库默认配置analyzerES的分词器templateES的模板结构数据aliasES的索引别名3)esm备份迁移es数据

备份es数据
  1. $ kubectl exec --tty -i kafka-client --namespace bigdata -- bash
  2. $ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --describe --group mygroup
复制代码
-w 表示线程数
-b 表示一次bulk请求数据大小,单位MB默认 5M
-c 一次scroll请求数量
导入恢复es数据
  1. $ helm uninstall logstash -n bigdata
复制代码
四、彩蛋

还有个日志系统架构跟ELK架构很相似(Elasticsearch、Flume、Kafka、Flink、Kibana),只是把Filebeat换成了Flume,Logstash换成了Flink。后面也会写篇文章分享出来,请耐心等待……


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

来自云龙湖轮廓分明的月亮

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表