基于kafka、celery的日志收集报警项目

打印 上一主题 下一主题

主题 893|帖子 893|积分 2679

项目情况:centOS7.9 mariadb5.6 celery5.0 kafka3.6.1
项目时间:2025年1月
项目描述:这个项目搭建了一个基于 Nginx 和 Flask 的 Web 集群,使用 Filebeat 将 Nginx 的访问日志发送到 Kafka 集群。通过 Python 消费者程序解析日志并存储到 MySQL 数据库中,最后使用 Celery 监控数据库中的流量数据,当流量凌驾阈值时发送邮件告警。
项目目的

搭建一个日志收集平台,用于监控 Nginx 的 Web 访问日志流量。通过反向代理、日志收集、数据处理和实时监控,实现流量异常告警功能。
体系架构

1.Web 集群:

使用两台 Nginx 服务器搭建反向代理集群
用户哀求通过 Nginx 被代理到后端的 Flask 程序
Nginx 生成访问日志(access.log)
2.日志收集:

使用 Filebeat 监听 Nginx 的 access 日志
Filebeat 将日志实时发送到 Kafka 集群
3.消息队列:

Kafka 集群包罗三个 Broker 节点
创建 Kafka Topic,每个 Topic 包罗三个分区和三个副本,确保高可用性和负载平衡
4.日志处理与存储:

使用 Python-kafka 编写的消费者程序并发消费 Kafka 中的日志数据
提取日志中的关键信息(如 IP、流量、时间、省份、运营商等)
将清洗后的数据存储到 MySQL 数据库中
5.流量监控与告警:

使用 Celery 定时任务监控 MySQL 数据库中的流量数据
如果某一分钟内的流量凌驾设定阈值(过高或过低),触发邮件告警
架构图



关键技术栈

Nginx:提供反向代理服务,生成访问日志
Filebeat:轻量级日志收集器,将日志发送到 Kafka
Kafka:分布式消息队列,存储和转发日志数据
Python-kafka:用于编写 Kafka 消费者程序
MySQL:存储清洗后的日志数据
Celery:分布式任务队列,用于定时监控和告警
SMTP:用于发送邮件告警
实现步调

1.情况预备

依赖软件安装

  1. yum源配置:
  2.     cd /etc/yum.repos.d
  3.     mkdir repo
  4.     mv *.repo repo/
  5.     下载阿里云源:
  6.     curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
  7. 下载依赖软件:
  8. yum install epel-release -y
  9. yum install wget vim java-11-openjdk.x86_64  -y
复制代码
设置静态ip地址,修改/etc/sysconfig/network-scripts/ifcfg-ens33

  1. PROXY_METHOD=none
  2. BROWSER_ONLY=no
  3. BOOTPROTO=none
  4. DEFROUTE=yes
  5. IPV4_FAILURE_FATAL=no
  6. NAME=ens33
  7. UUID=0f3239b9-6ba7-406e-94e8-fa7b680a4d82
  8. DEVICE=ens33
  9. ONBOOT=yes
  10. IPADDR=192.168.20.163                  
  11. NETMASK=255.255.255.0
  12. GATEWAY=192.168.20.2
  13. DNS1=114.114.114.114
复制代码
设置主机名

  1. hostnamectl set-hostname kafka1
复制代码
修改/etc/hosts文件,添加主机名和ip地址映射

  1. 192.168.20.161  kafka1
  2. 192.168.20.162  kafka2
  3. 192.168.20.163  kafka3
复制代码
关闭防火墙与selinux

  1. 关闭防火墙:
  2.   iptables -F                #清空防火墙规则
  3.   systemctl stop firewalld   #关闭防火墙服务
  4.   systemctl disable firewalld  #设置开机不自启
  5. 关闭selinux,编辑/etc/selinux/config 文件
  6.   SELINUX=disabled
  7. 重启系统:
  8.   reboot
复制代码
2.部署kafka集群

下载kafka

  1. cd /opt
  2. wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
复制代码
解压缩

  1. tar xf kafka_2.13-3.6.1.tgz
  2. cd kafka_2.13-3.6.1
复制代码
修改设置文件,位于kafka目次下config/kraft/server.properties

  1. #修改节点id,每个节点唯一
  2. node.id=1
  3. #修改控制器投票列表
  4. controller.quorum.voters=1@192.168.223.161:9093,2@192.168.223.162:9093,3@192.168.223.163:9093
  5. #修改监听器和控制器,绑定ip。其中kafka1为主机名,可用本机ip地址代替
  6. listeners=PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
  7. # 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
  8. # 如果未设置,则使用"listeners"的值.
  9. advertised.listeners=PLAINTEXT://kafka3:9092
复制代码
设置文件详解

  1. ############################# Server Basics #############################
  2. # 此服务器的角色。设置此项将进入KRaft模式(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功能)
  3. process.roles=broker,controller
  4. # 节点 ID
  5. node.id=2
  6. # 全 Controller 列表
  7. controller.quorum.voters=2@192.168.58.130:9093,3@192.168.58.131:9093,4@192.168.58.132:9093
  8. ############################# Socket Server Settings #############################
  9. # 套接字服务器侦听的地址.
  10. # 组合节点(即具有`process.roles=broker,controller`的节点)必须至少在此处列出控制器侦听器
  11. # 如果没有定义代理侦听器,那么默认侦听器将使用一个等于java.net.InetAddress.getCanonicalHostName()值的主机名,
  12. # 带有PLAINTEXT侦听器名称和端口9092
  13. #   FORMAT:
  14. #     listeners = listener_name://host_name:port
  15. #   EXAMPLE:
  16. #     listeners = PLAINTEXT://your.host.name:9092
  17. #不同服务器绑定的端口
  18. listeners=PLAINTEXT://192.168.58.130:9092,CONTROLLER://192.168.58.130:9093
  19. # 用于代理之间通信的侦听器的名称(broker 服务协议别名)
  20. inter.broker.listener.name=PLAINTEXT
  21. # 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
  22. # 如果未设置,则使用"listeners"的值.
  23. advertised.listeners=PLAINTEXT://192.168.58.130:9092
  24. # controller 服务协议别名
  25. # 控制器使用的侦听器名称的逗号分隔列表
  26. # 如果`listener.security.protocol.map`中未设置显式映射,则默认使用PLAINTEXT协议
  27. # 如果在KRaft模式下运行,这是必需的。
  28. controller.listener.names=CONTROLLER
  29. # 将侦听器名称映射到安全协议,默认情况下它们是相同的。(协议别名到安全协议的映射)有关更多详细信息,请参阅配置文档.
  30. listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  31. # 服务器用于从网络接收请求并向网络发送响应的线程数
  32. num.network.threads=3
  33. # 服务器用于处理请求的线程数,其中可能包括磁盘I/O
  34. num.io.threads=8
  35. # 套接字服务器使用的发送缓冲区(SO_SNDBUF)
  36. socket.send.buffer.bytes=102400
  37. # 套接字服务器使用的接收缓冲区(SO_RCVBUF)
  38. socket.receive.buffer.bytes=102400
  39. # 套接字服务器将接受的请求的最大大小(防止OOM)
  40. socket.request.max.bytes=104857600
  41. ############################# Log Basics #############################
  42. # 存储日志文件的目录的逗号分隔列表(kafka 数据存储目录)
  43. #log.dirs=/usr/kafka/kafka_2.13-3.6.1/datas
  44. log.dirs=/tmp/kraft-combined-logs
  45. # 每个主题的默认日志分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件。
  46. num.partitions=1
  47. # 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。
  48. # 对于数据目录位于RAID阵列中的安装,建议增加此值。
  49. num.recovery.threads.per.data.dir=1
  50. ############################# Internal Topic Settings  #############################
  51. # 组元数据内部主题"__consumer_offsets"和"__transaction_state"的复制因子
  52. # 对于除开发测试以外的任何测试,建议使用大于1的值来确保可用性,例如3.
  53. offsets.topic.replication.factor=1
  54. transaction.state.log.replication.factor=1
  55. transaction.state.log.min.isr=1
  56. ############################# Log Flush Policy #############################
  57. # 消息会立即写入文件系统,但默认情况下,我们只使用fsync()进行同步
  58. # 操作系统缓存延迟。以下配置控制将数据刷新到磁盘.
  59. # 这里有一些重要的权衡:
  60. #    1. Durability(持久性): 如果不使用复制,未清理的数据可能会丢失
  61. #    2. Latency(延迟): 当刷新发生时,非常大的刷新间隔可能会导致延迟峰值,因为将有大量数据要刷新.
  62. #    3. Throughput(吞吐量): 刷新通常是最昂贵的操作,较小的刷新间隔可能导致过多的寻道.
  63. # 下面的设置允许配置刷新策略,以便在一段时间后或每N条消息(或两者兼有)刷新数据。这可以全局完成,并在每个主题的基础上覆盖
  64. # 强制将数据刷新到磁盘之前要接受的消息数
  65. #log.flush.interval.messages=10000
  66. # 在我们强制刷新之前,消息可以在日志中停留的最长时间
  67. #log.flush.interval.ms=1000
  68. ############################# Log Retention Policy #############################
  69. # 以下配置控制日志段的处理。可以将该策略设置为在一段时间后删除分段,或者在累积了给定大小之后删除分段。
  70. # 只要满足这些条件中的任意一个,segment就会被删除。删除总是从日志的末尾开始
  71. # 日志文件因使用年限而有资格删除的最短使用年限
  72. log.retention.hours=168
  73. # 基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否则将从日志中删除段。独立于log.retention.hours的函数。
  74. #log.retention.bytes=1073741824
  75. # 日志segment文件的最大大小。当达到此大小时,将创建一个新的日志segment
  76. log.segment.bytes=1073741824
  77. # 检查日志segments以查看是否可以根据保留策略删除它们的间隔
  78. log.retention.check.interval.ms=300000
复制代码


  • 创建集群
  1. cd   /opt/kafka_2.13-3.6.1
  2. # 在其中一台执行,生成集群UUID命令,拿到集群UUID保存在当前tmp_random文件中
  3. bin/kafka-storage.sh random-uuid >tmp_random
  4. # 查看uuid
  5. [root@chainmaker1 kafka_2.13-3.6.1]# cat tmp_random
  6. z3oq9M4IQguOBm2rt1ovmQ
  7. # 在所有机器上执行,它会初始化存储区域,为 Kafka 集群的元数据存储和后续操作做好准备。z3oq9M4IQguOBm2rt1ovmQ为自己生成的集群uuid
  8. bin/kafka-storage.sh format -t z3oq9M4IQguOBm2rt1ovmQ -c /opt/kafka_2.13-3.6.1/config/kraft/server.properties
复制代码


  • 启动
  1. 启动:
  2. bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
  3. 关闭:
  4. bin/kafka-server-stop.sh
复制代码




    • 命令行启动



  • 使用systemctl管理服务 -- systemd
  1. ## 编辑文件  /usr/lib/systemd/system/kafka.service
  2.   [Unit]
  3.   Description=Apache Kafka server (KRaft mode)
  4.   Documentation=http://kafka.apache.org/documentation.html
  5.   After=network.target
  6.   [Service]
  7.   Type=forking
  8.   User=root
  9.   Group=root
  10.   Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/lib/jvm/java-11-openjdk-11.0.23.0.9-2.el7_9.x86_64/bin/"
  11.   ExecStart=/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
  12.   ExecStop=/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh
  13.   Restart=on-failure
  14.   [Install]
  15.   WantedBy=multi-user.target
  16.   
  17.   #重新加载systemd配置
  18.   systemctl daemon-reload
  19.   
  20.   #启动kafka服务
  21.   systemctl  start  kafka
  22.   
  23.   #关闭kafka服务
  24.   systemctl  stop  kafka
  25.   
  26.   #设置开机自启
  27.   systemctl enable kafka
复制代码


  • 测试集群Kraft模式下Kafka脚本的使用-阿里云开发者社区 (aliyun.com)
  1. # 创建topic
  2. bin/kafka-topics.sh --create --bootstrap-server kafka3:9092 --replication-factor 3 --partitions 3 --topic my_topic
  3. ** --replication-factor指定副本因子,--partitions指定分区数,--topic指定主题名称。
  4. # 查看topic
  5. bin/kafka-topics.sh --list --bootstrap-server kafka3:9092
  6. #创建生产者,发送消息,测试用
  7. bin/kafka-console-producer.sh --broker-list kafka3:9092 --topic my_topic
  8. #创建消费者,获取数据,测试用
  9. bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic my_topic --from-beginning
复制代码

3.部署filebeat



  • 一篇文章搞懂filebeat(ELK) - 一寸HUI - 博客园
安装

  1. 1、rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
  2. 2、编辑 vim /etc/yum.repos.d/fb.repo
  3. [elastic-7.x]
  4. name=Elastic repository for 7.x packages
  5. baseurl=https://artifacts.elastic.co/packages/7.x/yum
  6. gpgcheck=1
  7. gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
  8. enabled=1
  9. autorefresh=1
  10. type=rpm-md
  11. 3、yum安装
  12. yum  install  filebeat -y
  13. rpm -qa  |grep filebeat  #可以查看filebeat有没有安装  rpm -qa 是查看机器上安装的所有软件包
  14. rpm -ql  filebeat  查看filebeat安装到哪里去了,牵扯的文件有哪些
复制代码
设置,修改设置文件/etc/filebeat/filebeat.yml

  1. filebeat.inputs:
  2. - type: log
  3.   # Change to true to enable this input configuration.
  4.   enabled: true
  5.   # Paths that should be crawled and fetched. Glob based paths.
  6.   paths:
  7.     - /var/log/nginx/access.log
  8.     - /var/log/nginx/error.log
  9. #==========------------------------------kafka-----------------------------------
  10. output.kafka:
  11.   hosts: ["192.168.20.161:9092","192.168.20.162:9092","192.168.20.163:9092"]
  12.   topic: nginxlog
  13.   keep_alive: 10s
复制代码
创建主题

  1. cd /opt/kafka_2.13-3.6.1
  2. bin/kafka-topics.sh --create --bootstrap-server  kafka3:9092 --replication-factor 3 --partitions 3 --topic nginxlog
复制代码
启动服务

  1. systemctl start  filebeat
  2. systemctl enable filebeat  #设置开机自启
复制代码
4.nginx反向代理集群搭建

安装nginx

  1. yum install epel-release -y
  2. yum install nginx -y
复制代码
编辑设置文件 /etc/nginx/conf.d/sc.conf

  1. upstream flask {
  2.    server 192.168.20.162:5000;
  3.    server 192.168.20.163:5000;
  4. }
  5. server {
  6.     server_name www.sc.com;
  7.     location / {
  8.        proxy_pass http://flask;
  9.     }
  10. }
复制代码
启动nginx

  1. systemctl start nginx
复制代码
4.后端flask程序

安装flask情况

  1. yum install python3 -y
  2. pip3 install flask -i https://pypi.tuna.tsinghua.edu.cn/simple
复制代码
编辑/opt/python-flask/app.py文件

  1. from flask import Flask
  2. app = Flask(__name__)
  3. @app.route("/")
  4. def index():
  5.     return "this is flask web kafka2"
  6. app.run(host = "0.0.0.0")
复制代码
启动flask

  1. python3 app.py
复制代码
4.消费nginx日志

使用kafka-python并发消费nginx日志,举行清洗

查看解析IP网址http://whois.pconline.com.cn/ipJson.jsp?ip=123.123.123.123&json=true
  1. # -*- coding: utf-8 -*-
  2. from kafka import KafkaConsumer
  3. from multiprocessing import Process, current_process
  4. import time
  5. import json
  6. import pymysql
  7. import requests
  8. from datetime import datetime
  9. # 配置信息
  10. IP_URL = "https://whois.pconline.com.cn/ipJson.jsp?json=true&ip="
  11. DB_HOST = "192.168.140.159"
  12. DB_PORT = 3306
  13. DB_USER = "sc"
  14. DB_PASSWD = "Sctl@123456"
  15. DB = "test2"
  16. # 将传入的 JSON 字符串转换成字典格式
  17. def json_to_dict(message: str) -> dict:
  18.     d1 = {}
  19.     try:
  20.         d1 = json.loads(message)
  21.     except:
  22.         print("输入信息非 JSON 格式")
  23.     return d1
  24. # 解析给定的 IP 地址
  25. def resolve_ip(ip):
  26.     #if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.16."):
  27.         # return "局域网", "局域网"
  28.     url = IP_URL + ip
  29.     response = requests.get(url)
  30.     data = response.json()
  31.     prov = data.get("pro")
  32.     isp = data.get("addr").split()[1]
  33.     return prov, isp
  34. # 时间格式转换
  35. def time_deformat(time_str):
  36.     format_str = "%d/%b/%Y:%H:%M:%S"
  37.     struct_time = time.strptime(time_str, format_str)
  38.     result_time = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)
  39.     return result_time
  40. # 处理日志字符串
  41. def handler_log(log_str):
  42.     log_str = log_str.split()
  43.     ip = log_str[0]
  44.     time_str = log_str[3][1:]
  45.     flow = log_str[9]
  46.     prov, isp = resolve_ip(ip)
  47.     time = time_deformat(time_str)
  48.     return time, ip, prov, isp, flow
  49. def consume_kafka_partition(topic, group_id, partition):
  50.     """
  51.     进程执行的函数,用于消费指定分区的 Kafka 消息
  52.     """
  53.     consumer = KafkaConsumer(
  54.         group_id=group_id,
  55.         bootstrap_servers=['192.168.140.158:9092', '192.168.140.159:9092', '192.168.140.160:9092'],
  56.         auto_offset_reset='earliest',
  57.         enable_auto_commit=True,
  58.         auto_commit_interval_ms=5000,
  59.         value_deserializer=lambda x: x.decode('utf-8')
  60.     )
  61.     consumer.subscribe([topic])
  62.     conn = pymysql.connect(
  63.         host=DB_HOST,
  64.         user=DB_USER,
  65.         password=DB_PASSWD,
  66.         port=DB_PORT,
  67.         database=DB
  68.     )
  69.     cur = conn.cursor()
  70.     for message in consumer:
  71.         # print(f"message: {message}")
  72.         # print(f"进程 {current_process().name} 消费到来自分区 {partition} 的消息:{message.value}")
  73.         result_dict = json_to_dict(message.value)
  74.         log_str = result_dict.get("message")
  75.         print(f"message:{log_str}")
  76.         result = handler_log(log_str)
  77.         sql = "INSERT INTO nginx_log (date_time, ip, province, ISP, flow) VALUES (%s, %s, %s, %s, %s)"
  78.         cur.execute(sql, result)
  79.         conn.commit()
  80. if __name__ == "__main__":
  81.     topic = "nginxlog"
  82.     group_id = "message_group40"
  83.     partitions = [0, 1, 2]
  84.     def start_process(target, args):
  85.         p = Process(target=target, args=args)
  86.         p.name = f"Consumer-Partition-{args[2]}"
  87.         p.start()
  88.         return p
  89.     processes = []
  90.     for partition in partitions:
  91.         p = start_process(consume_kafka_partition, (topic, group_id, partition))
  92.         processes.append(p)
  93.     for p in processes:
  94.         p.join()
  95.     print("所有进程已结束,指定分区消费完成")
复制代码

5.celery部署

redis安装

  1. yum install redis -y
复制代码
redis 设置文件修改 /etc/redis.conf

  1. bind 0.0.0.0   #监听本机任意ip
复制代码
启动服务

  1. systemctl start redis
复制代码
redis详解

  1. redis:key-value 存储系统,是跨平台的非关系型数据库。
  2. redis支持的存储类型:
  3. String: 字符串
  4. Hash: 散列
  5. List: 列表
  6. Set: 集合
  7. Sorted Set: 有序集合
  8. 可以做消息中间件,可以做消息队列,可以做缓存  -- memcache
  9. redis持久化:
  10. RDB 持久化是通过对 Redis 中的数据进行快照(snapshot)来实现的。在指定的时间间隔内,Redis 会将内存中的数据集快照写入磁盘上的一个临时文件,成功后再将这个临时文件替换为之前的 RDB 文件。
  11. AOF 持久化是以日志的形式记录 Redis 服务器所执行的每一个写操作(如 SET、LPUSH 等命令)。这些写操作命令会按照执行的先后顺序追加到 AOF 文件的末尾。
复制代码
python库安装

  1. pip3 install celery -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
  2. pip3 install redis
复制代码
celery设置

  1. cd /opt
  2. mkdir monitor/celery_app -p
  3. 在celery_app中
  4. 1、编辑配置文件  config.py
  5. from celery.schedules import crontab
  6. BROKER_URL = 'redis://192.168.20.161:6379/0' # Broker配置,使用Redis作为消息中间件
  7. CELERY_RESULT_BACKEND = 'redis://192.168.20.161:6379/1' # BACKEND配置,这里使用redis
  8. CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
  9. CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
  10. CELERY_TIMEZONE='Asia/Shanghai'   # 时区配置
  11. CELERY_IMPORTS = (     # 指定导入的任务模块,可以指定多个
  12.     'celery_app.task',
  13. )
  14. CELERYBEAT_SCHEDULE = {
  15.       'celery_app.task.test': {
  16.           'task': 'celery_app.task.test',
  17.           'schedule': crontab(minute='*/1'),
  18.           'args': (-3, 10)
  19.       }
  20. }
  21. 2、编辑__init__.py  (双下划线开头,双下划线结尾)
  22. from celery import Celery
  23. app = Celery('task')
  24. app.config_from_object('celery_app.config')
  25. 3、编辑task.py
  26. from . import app
  27. @app.task
  28. def test(a,b):
  29.     print("task test start ...")
  30.     result = abs(a) + abs(b)
  31.     print("task test end....")
  32.     return result
复制代码
在celery_app中

1、编辑设置文件 config.py

  1. from celery.schedules import crontab
  2. BROKER_URL = 'redis://192.168.140.160:6379/0'
  3. CELERY_RESULT_BACKEND = 'redis://192.168.140.160:6379/1'
  4. CELERY_TIMEZONE = 'Asia/Shanghai'
  5. CELERY_IMPORTS = ('celery_app.task',)  # 确保导入任务模块
  6. # 定时任务配置(每分钟检查一次)
  7. CELERYBEAT_SCHEDULE = {
  8.     'check-traffic-every-minute': {
  9.         'task': 'celery_app.task.check_traffic',
  10.         'schedule': crontab(minute='*/1'),
  11.         'args': (100,)  # 传递阈值参数
  12.     }
  13. }
复制代码
2、编辑__init__.py (双下划线开头,双下划线末端)

  1. from celery import Celery
  2. app = Celery('task')
  3. app.config_from_object('celery_app.config')
复制代码
3、编辑task.py

  1. from . import app
  2. import pymysql
  3. import smtplib
  4. from email.mime.text import MIMEText
  5. from email.header import Header
  6. from email.utils import formataddr
  7. import logging
  8. # 邮件发送配置
  9. sender = '发送人@qq.com'
  10. password = 'qvcaaeboyoqibfgf'  # 确保这是QQ邮箱的授权码
  11. receiver = '接收人@qq.com'
  12. @app.task
  13. def check_traffic(threshold=100):
  14.     # 数据库连接配置
  15.     conn = pymysql.connect(
  16.         host='192.168.140.159',
  17.         user='sc',
  18.         password='Sctl@123456',
  19.         port=3306,
  20.         database='test2',
  21.     )
  22.     cur = conn.cursor()
  23.     # 执行查询(优化查询条件)
  24.     sql = "SELECT * FROM nginx_log WHERE flow > %s"
  25.     cur.execute(sql, (threshold,))
  26.     rows = cur.fetchall()
  27.     # 关闭连接
  28.     cur.close()
  29.     conn.close()
  30.     # 检查流量并发送邮件
  31.     alarms = []
  32.     for row in rows:
  33.         flow = row[5]
  34.         if flow > threshold:
  35.             # 构建邮件内容
  36.             mail_content = f"IP {row[2]} 的流量 {flow} 超过阈值 {threshold}!"
  37.             message = MIMEText(mail_content, 'plain', 'utf-8')
  38.             message['From'] = formataddr((str(Header("流量监控系统", 'utf-8')), sender))
  39.             message['To'] = receiver
  40.             message['Subject'] = Header("流量报警", 'utf-8')
  41.             # 发送邮件
  42.             try:
  43.                 with smtplib.SMTP_SSL("smtp.qq.com", 465) as smtp:
  44.                     smtp.login(sender, password)
  45.                     smtp.sendmail(sender, receiver, message.as_string())
  46.                     logging.info(f"邮件发送成功:IP {row[2]}")
  47.             except Exception as e:
  48.                 logging.error(f"邮件发送失败:{e}")
  49.             alarms.append(f"报警:IP {row[2]} 的流量 {flow} 超过阈值 {threshold}")
  50.     return "\n".join(alarms) if alarms else "流量正常"
复制代码
celery 启动beat (在monitor目次下执行)

  1. celery -A celery_app beat
复制代码
celery 启动worker

  1. celery -A celery_app worker -l info -c 4
复制代码
项目心得:

在这次项目中,我了解到了kafka,nginx,filebeat,celery以及发邮件等一系列知识,了解了整个项目的框架。在项目的搭建过程中,也曾由于设置堕落、逻辑不通等等一系列问题而报错,但最终在我的不懈对峙与积极下,最终将这个项目跑通,我的QQ邮箱也收到了报警,感觉收获满满,对整个日志收集项目有了比力全面的了解

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

科技颠覆者

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表