项目情况:centOS7.9 mariadb5.6 celery5.0 kafka3.6.1
项目时间:2025年1月
项目描述:这个项目搭建了一个基于 Nginx 和 Flask 的 Web 集群,使用 Filebeat 将 Nginx 的访问日志发送到 Kafka 集群。通过 Python 消费者程序解析日志并存储到 MySQL 数据库中,最后使用 Celery 监控数据库中的流量数据,当流量凌驾阈值时发送邮件告警。
项目目的
搭建一个日志收集平台,用于监控 Nginx 的 Web 访问日志流量。通过反向代理、日志收集、数据处理和实时监控,实现流量异常告警功能。
体系架构
1.Web 集群:
使用两台 Nginx 服务器搭建反向代理集群
用户哀求通过 Nginx 被代理到后端的 Flask 程序
Nginx 生成访问日志(access.log)
2.日志收集:
使用 Filebeat 监听 Nginx 的 access 日志
Filebeat 将日志实时发送到 Kafka 集群
3.消息队列:
Kafka 集群包罗三个 Broker 节点
创建 Kafka Topic,每个 Topic 包罗三个分区和三个副本,确保高可用性和负载平衡
4.日志处理与存储:
使用 Python-kafka 编写的消费者程序并发消费 Kafka 中的日志数据
提取日志中的关键信息(如 IP、流量、时间、省份、运营商等)
将清洗后的数据存储到 MySQL 数据库中
5.流量监控与告警:
使用 Celery 定时任务监控 MySQL 数据库中的流量数据
如果某一分钟内的流量凌驾设定阈值(过高或过低),触发邮件告警
架构图
关键技术栈
Nginx:提供反向代理服务,生成访问日志
Filebeat:轻量级日志收集器,将日志发送到 Kafka
Kafka:分布式消息队列,存储和转发日志数据
Python-kafka:用于编写 Kafka 消费者程序
MySQL:存储清洗后的日志数据
Celery:分布式任务队列,用于定时监控和告警
SMTP:用于发送邮件告警
实现步调
1.情况预备
依赖软件安装
- yum源配置:
- cd /etc/yum.repos.d
- mkdir repo
- mv *.repo repo/
- 下载阿里云源:
- curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
- 下载依赖软件:
- yum install epel-release -y
- yum install wget vim java-11-openjdk.x86_64 -y
复制代码 设置静态ip地址,修改/etc/sysconfig/network-scripts/ifcfg-ens33
- PROXY_METHOD=none
- BROWSER_ONLY=no
- BOOTPROTO=none
- DEFROUTE=yes
- IPV4_FAILURE_FATAL=no
- NAME=ens33
- UUID=0f3239b9-6ba7-406e-94e8-fa7b680a4d82
- DEVICE=ens33
- ONBOOT=yes
- IPADDR=192.168.20.163
- NETMASK=255.255.255.0
- GATEWAY=192.168.20.2
- DNS1=114.114.114.114
复制代码 设置主机名
- hostnamectl set-hostname kafka1
复制代码 修改/etc/hosts文件,添加主机名和ip地址映射
- 192.168.20.161 kafka1
- 192.168.20.162 kafka2
- 192.168.20.163 kafka3
复制代码 关闭防火墙与selinux
- 关闭防火墙:
- iptables -F #清空防火墙规则
- systemctl stop firewalld #关闭防火墙服务
- systemctl disable firewalld #设置开机不自启
- 关闭selinux,编辑/etc/selinux/config 文件
- SELINUX=disabled
- 重启系统:
- reboot
复制代码 2.部署kafka集群
下载kafka
- cd /opt
- wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
复制代码 解压缩
- tar xf kafka_2.13-3.6.1.tgz
- cd kafka_2.13-3.6.1
复制代码 修改设置文件,位于kafka目次下config/kraft/server.properties
- #修改节点id,每个节点唯一
- node.id=1
- #修改控制器投票列表
- controller.quorum.voters=1@192.168.223.161:9093,2@192.168.223.162:9093,3@192.168.223.163:9093
- #修改监听器和控制器,绑定ip。其中kafka1为主机名,可用本机ip地址代替
- listeners=PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
- # 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
- # 如果未设置,则使用"listeners"的值.
- advertised.listeners=PLAINTEXT://kafka3:9092
复制代码 设置文件详解
- ############################# Server Basics #############################
- # 此服务器的角色。设置此项将进入KRaft模式(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功能)
- process.roles=broker,controller
- # 节点 ID
- node.id=2
- # 全 Controller 列表
- controller.quorum.voters=2@192.168.58.130:9093,3@192.168.58.131:9093,4@192.168.58.132:9093
- ############################# Socket Server Settings #############################
- # 套接字服务器侦听的地址.
- # 组合节点(即具有`process.roles=broker,controller`的节点)必须至少在此处列出控制器侦听器
- # 如果没有定义代理侦听器,那么默认侦听器将使用一个等于java.net.InetAddress.getCanonicalHostName()值的主机名,
- # 带有PLAINTEXT侦听器名称和端口9092
- # FORMAT:
- # listeners = listener_name://host_name:port
- # EXAMPLE:
- # listeners = PLAINTEXT://your.host.name:9092
- #不同服务器绑定的端口
- listeners=PLAINTEXT://192.168.58.130:9092,CONTROLLER://192.168.58.130:9093
- # 用于代理之间通信的侦听器的名称(broker 服务协议别名)
- inter.broker.listener.name=PLAINTEXT
- # 侦听器名称、主机名和代理将向客户端公布的端口.(broker 对外暴露的地址)
- # 如果未设置,则使用"listeners"的值.
- advertised.listeners=PLAINTEXT://192.168.58.130:9092
- # controller 服务协议别名
- # 控制器使用的侦听器名称的逗号分隔列表
- # 如果`listener.security.protocol.map`中未设置显式映射,则默认使用PLAINTEXT协议
- # 如果在KRaft模式下运行,这是必需的。
- controller.listener.names=CONTROLLER
- # 将侦听器名称映射到安全协议,默认情况下它们是相同的。(协议别名到安全协议的映射)有关更多详细信息,请参阅配置文档.
- listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
- # 服务器用于从网络接收请求并向网络发送响应的线程数
- num.network.threads=3
- # 服务器用于处理请求的线程数,其中可能包括磁盘I/O
- num.io.threads=8
- # 套接字服务器使用的发送缓冲区(SO_SNDBUF)
- socket.send.buffer.bytes=102400
- # 套接字服务器使用的接收缓冲区(SO_RCVBUF)
- socket.receive.buffer.bytes=102400
- # 套接字服务器将接受的请求的最大大小(防止OOM)
- socket.request.max.bytes=104857600
- ############################# Log Basics #############################
- # 存储日志文件的目录的逗号分隔列表(kafka 数据存储目录)
- #log.dirs=/usr/kafka/kafka_2.13-3.6.1/datas
- log.dirs=/tmp/kraft-combined-logs
- # 每个主题的默认日志分区数。更多的分区允许更大的并行性以供使用,但这也会导致代理之间有更多的文件。
- num.partitions=1
- # 启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。
- # 对于数据目录位于RAID阵列中的安装,建议增加此值。
- num.recovery.threads.per.data.dir=1
- ############################# Internal Topic Settings #############################
- # 组元数据内部主题"__consumer_offsets"和"__transaction_state"的复制因子
- # 对于除开发测试以外的任何测试,建议使用大于1的值来确保可用性,例如3.
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- ############################# Log Flush Policy #############################
- # 消息会立即写入文件系统,但默认情况下,我们只使用fsync()进行同步
- # 操作系统缓存延迟。以下配置控制将数据刷新到磁盘.
- # 这里有一些重要的权衡:
- # 1. Durability(持久性): 如果不使用复制,未清理的数据可能会丢失
- # 2. Latency(延迟): 当刷新发生时,非常大的刷新间隔可能会导致延迟峰值,因为将有大量数据要刷新.
- # 3. Throughput(吞吐量): 刷新通常是最昂贵的操作,较小的刷新间隔可能导致过多的寻道.
- # 下面的设置允许配置刷新策略,以便在一段时间后或每N条消息(或两者兼有)刷新数据。这可以全局完成,并在每个主题的基础上覆盖
- # 强制将数据刷新到磁盘之前要接受的消息数
- #log.flush.interval.messages=10000
- # 在我们强制刷新之前,消息可以在日志中停留的最长时间
- #log.flush.interval.ms=1000
- ############################# Log Retention Policy #############################
- # 以下配置控制日志段的处理。可以将该策略设置为在一段时间后删除分段,或者在累积了给定大小之后删除分段。
- # 只要满足这些条件中的任意一个,segment就会被删除。删除总是从日志的末尾开始
- # 日志文件因使用年限而有资格删除的最短使用年限
- log.retention.hours=168
- # 基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否则将从日志中删除段。独立于log.retention.hours的函数。
- #log.retention.bytes=1073741824
- # 日志segment文件的最大大小。当达到此大小时,将创建一个新的日志segment
- log.segment.bytes=1073741824
- # 检查日志segments以查看是否可以根据保留策略删除它们的间隔
- log.retention.check.interval.ms=300000
复制代码
- cd /opt/kafka_2.13-3.6.1
- # 在其中一台执行,生成集群UUID命令,拿到集群UUID保存在当前tmp_random文件中
- bin/kafka-storage.sh random-uuid >tmp_random
- # 查看uuid
- [root@chainmaker1 kafka_2.13-3.6.1]# cat tmp_random
- z3oq9M4IQguOBm2rt1ovmQ
- # 在所有机器上执行,它会初始化存储区域,为 Kafka 集群的元数据存储和后续操作做好准备。z3oq9M4IQguOBm2rt1ovmQ为自己生成的集群uuid
- bin/kafka-storage.sh format -t z3oq9M4IQguOBm2rt1ovmQ -c /opt/kafka_2.13-3.6.1/config/kraft/server.properties
复制代码
- 启动:
- bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
- 关闭:
- bin/kafka-server-stop.sh
复制代码
- 使用systemctl管理服务 -- systemd
- ## 编辑文件 /usr/lib/systemd/system/kafka.service
- [Unit]
- Description=Apache Kafka server (KRaft mode)
- Documentation=http://kafka.apache.org/documentation.html
- After=network.target
- [Service]
- Type=forking
- User=root
- Group=root
- Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/usr/lib/jvm/java-11-openjdk-11.0.23.0.9-2.el7_9.x86_64/bin/"
- ExecStart=/opt/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.13-3.6.1/config/kraft/server.properties
- ExecStop=/opt/kafka_2.13-3.6.1/bin/kafka-server-stop.sh
- Restart=on-failure
- [Install]
- WantedBy=multi-user.target
-
- #重新加载systemd配置
- systemctl daemon-reload
-
- #启动kafka服务
- systemctl start kafka
-
- #关闭kafka服务
- systemctl stop kafka
-
- #设置开机自启
- systemctl enable kafka
复制代码
- 测试集群Kraft模式下Kafka脚本的使用-阿里云开发者社区 (aliyun.com)
- # 创建topic
- bin/kafka-topics.sh --create --bootstrap-server kafka3:9092 --replication-factor 3 --partitions 3 --topic my_topic
- ** --replication-factor指定副本因子,--partitions指定分区数,--topic指定主题名称。
- # 查看topic
- bin/kafka-topics.sh --list --bootstrap-server kafka3:9092
- #创建生产者,发送消息,测试用
- bin/kafka-console-producer.sh --broker-list kafka3:9092 --topic my_topic
- #创建消费者,获取数据,测试用
- bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic my_topic --from-beginning
复制代码
3.部署filebeat
- 一篇文章搞懂filebeat(ELK) - 一寸HUI - 博客园
安装
- 1、rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
- 2、编辑 vim /etc/yum.repos.d/fb.repo
- [elastic-7.x]
- name=Elastic repository for 7.x packages
- baseurl=https://artifacts.elastic.co/packages/7.x/yum
- gpgcheck=1
- gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
- enabled=1
- autorefresh=1
- type=rpm-md
- 3、yum安装
- yum install filebeat -y
- rpm -qa |grep filebeat #可以查看filebeat有没有安装 rpm -qa 是查看机器上安装的所有软件包
- rpm -ql filebeat 查看filebeat安装到哪里去了,牵扯的文件有哪些
复制代码 设置,修改设置文件/etc/filebeat/filebeat.yml
- filebeat.inputs:
- - type: log
- # Change to true to enable this input configuration.
- enabled: true
- # Paths that should be crawled and fetched. Glob based paths.
- paths:
- - /var/log/nginx/access.log
- - /var/log/nginx/error.log
- #==========------------------------------kafka-----------------------------------
- output.kafka:
- hosts: ["192.168.20.161:9092","192.168.20.162:9092","192.168.20.163:9092"]
- topic: nginxlog
- keep_alive: 10s
复制代码 创建主题
- cd /opt/kafka_2.13-3.6.1
- bin/kafka-topics.sh --create --bootstrap-server kafka3:9092 --replication-factor 3 --partitions 3 --topic nginxlog
复制代码 启动服务
- systemctl start filebeat
- systemctl enable filebeat #设置开机自启
复制代码 4.nginx反向代理集群搭建
安装nginx
- yum install epel-release -y
- yum install nginx -y
复制代码 编辑设置文件 /etc/nginx/conf.d/sc.conf
- upstream flask {
- server 192.168.20.162:5000;
- server 192.168.20.163:5000;
- }
- server {
- server_name www.sc.com;
- location / {
- proxy_pass http://flask;
- }
- }
复制代码 启动nginx
4.后端flask程序
安装flask情况
- yum install python3 -y
- pip3 install flask -i https://pypi.tuna.tsinghua.edu.cn/simple
复制代码 编辑/opt/python-flask/app.py文件
- from flask import Flask
- app = Flask(__name__)
- @app.route("/")
- def index():
- return "this is flask web kafka2"
- app.run(host = "0.0.0.0")
复制代码 启动flask
4.消费nginx日志
使用kafka-python并发消费nginx日志,举行清洗
查看解析IP网址http://whois.pconline.com.cn/ipJson.jsp?ip=123.123.123.123&json=true
- # -*- coding: utf-8 -*-
- from kafka import KafkaConsumer
- from multiprocessing import Process, current_process
- import time
- import json
- import pymysql
- import requests
- from datetime import datetime
- # 配置信息
- IP_URL = "https://whois.pconline.com.cn/ipJson.jsp?json=true&ip="
- DB_HOST = "192.168.140.159"
- DB_PORT = 3306
- DB_USER = "sc"
- DB_PASSWD = "Sctl@123456"
- DB = "test2"
- # 将传入的 JSON 字符串转换成字典格式
- def json_to_dict(message: str) -> dict:
- d1 = {}
- try:
- d1 = json.loads(message)
- except:
- print("输入信息非 JSON 格式")
- return d1
- # 解析给定的 IP 地址
- def resolve_ip(ip):
- #if ip.startswith("192.168.") or ip.startswith("10.") or ip.startswith("172.16."):
- # return "局域网", "局域网"
- url = IP_URL + ip
- response = requests.get(url)
- data = response.json()
- prov = data.get("pro")
- isp = data.get("addr").split()[1]
- return prov, isp
- # 时间格式转换
- def time_deformat(time_str):
- format_str = "%d/%b/%Y:%H:%M:%S"
- struct_time = time.strptime(time_str, format_str)
- result_time = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)
- return result_time
- # 处理日志字符串
- def handler_log(log_str):
- log_str = log_str.split()
- ip = log_str[0]
- time_str = log_str[3][1:]
- flow = log_str[9]
- prov, isp = resolve_ip(ip)
- time = time_deformat(time_str)
- return time, ip, prov, isp, flow
- def consume_kafka_partition(topic, group_id, partition):
- """
- 进程执行的函数,用于消费指定分区的 Kafka 消息
- """
- consumer = KafkaConsumer(
- group_id=group_id,
- bootstrap_servers=['192.168.140.158:9092', '192.168.140.159:9092', '192.168.140.160:9092'],
- auto_offset_reset='earliest',
- enable_auto_commit=True,
- auto_commit_interval_ms=5000,
- value_deserializer=lambda x: x.decode('utf-8')
- )
- consumer.subscribe([topic])
- conn = pymysql.connect(
- host=DB_HOST,
- user=DB_USER,
- password=DB_PASSWD,
- port=DB_PORT,
- database=DB
- )
- cur = conn.cursor()
- for message in consumer:
- # print(f"message: {message}")
- # print(f"进程 {current_process().name} 消费到来自分区 {partition} 的消息:{message.value}")
- result_dict = json_to_dict(message.value)
- log_str = result_dict.get("message")
- print(f"message:{log_str}")
- result = handler_log(log_str)
- sql = "INSERT INTO nginx_log (date_time, ip, province, ISP, flow) VALUES (%s, %s, %s, %s, %s)"
- cur.execute(sql, result)
- conn.commit()
- if __name__ == "__main__":
- topic = "nginxlog"
- group_id = "message_group40"
- partitions = [0, 1, 2]
- def start_process(target, args):
- p = Process(target=target, args=args)
- p.name = f"Consumer-Partition-{args[2]}"
- p.start()
- return p
- processes = []
- for partition in partitions:
- p = start_process(consume_kafka_partition, (topic, group_id, partition))
- processes.append(p)
- for p in processes:
- p.join()
- print("所有进程已结束,指定分区消费完成")
复制代码
5.celery部署
redis安装
redis 设置文件修改 /etc/redis.conf
启动服务
redis详解
- redis:key-value 存储系统,是跨平台的非关系型数据库。
- redis支持的存储类型:
- String: 字符串
- Hash: 散列
- List: 列表
- Set: 集合
- Sorted Set: 有序集合
- 可以做消息中间件,可以做消息队列,可以做缓存 -- memcache
- redis持久化:
- RDB 持久化是通过对 Redis 中的数据进行快照(snapshot)来实现的。在指定的时间间隔内,Redis 会将内存中的数据集快照写入磁盘上的一个临时文件,成功后再将这个临时文件替换为之前的 RDB 文件。
- AOF 持久化是以日志的形式记录 Redis 服务器所执行的每一个写操作(如 SET、LPUSH 等命令)。这些写操作命令会按照执行的先后顺序追加到 AOF 文件的末尾。
复制代码 python库安装
- pip3 install celery -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
- pip3 install redis
复制代码 celery设置
- cd /opt
- mkdir monitor/celery_app -p
- 在celery_app中
- 1、编辑配置文件 config.py
- from celery.schedules import crontab
- BROKER_URL = 'redis://192.168.20.161:6379/0' # Broker配置,使用Redis作为消息中间件
- CELERY_RESULT_BACKEND = 'redis://192.168.20.161:6379/1' # BACKEND配置,这里使用redis
- CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
- CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
- CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
- CELERY_IMPORTS = ( # 指定导入的任务模块,可以指定多个
- 'celery_app.task',
- )
- CELERYBEAT_SCHEDULE = {
- 'celery_app.task.test': {
- 'task': 'celery_app.task.test',
- 'schedule': crontab(minute='*/1'),
- 'args': (-3, 10)
- }
- }
-
- 2、编辑__init__.py (双下划线开头,双下划线结尾)
- from celery import Celery
- app = Celery('task')
- app.config_from_object('celery_app.config')
- 3、编辑task.py
- from . import app
- @app.task
- def test(a,b):
- print("task test start ...")
- result = abs(a) + abs(b)
- print("task test end....")
- return result
复制代码 在celery_app中
1、编辑设置文件 config.py
- from celery.schedules import crontab
- BROKER_URL = 'redis://192.168.140.160:6379/0'
- CELERY_RESULT_BACKEND = 'redis://192.168.140.160:6379/1'
- CELERY_TIMEZONE = 'Asia/Shanghai'
- CELERY_IMPORTS = ('celery_app.task',) # 确保导入任务模块
- # 定时任务配置(每分钟检查一次)
- CELERYBEAT_SCHEDULE = {
- 'check-traffic-every-minute': {
- 'task': 'celery_app.task.check_traffic',
- 'schedule': crontab(minute='*/1'),
- 'args': (100,) # 传递阈值参数
- }
- }
复制代码 2、编辑__init__.py (双下划线开头,双下划线末端)
- from celery import Celery
- app = Celery('task')
- app.config_from_object('celery_app.config')
复制代码 3、编辑task.py
- from . import app
- import pymysql
- import smtplib
- from email.mime.text import MIMEText
- from email.header import Header
- from email.utils import formataddr
- import logging
- # 邮件发送配置
- sender = '发送人@qq.com'
- password = 'qvcaaeboyoqibfgf' # 确保这是QQ邮箱的授权码
- receiver = '接收人@qq.com'
- @app.task
- def check_traffic(threshold=100):
- # 数据库连接配置
- conn = pymysql.connect(
- host='192.168.140.159',
- user='sc',
- password='Sctl@123456',
- port=3306,
- database='test2',
- )
- cur = conn.cursor()
- # 执行查询(优化查询条件)
- sql = "SELECT * FROM nginx_log WHERE flow > %s"
- cur.execute(sql, (threshold,))
- rows = cur.fetchall()
- # 关闭连接
- cur.close()
- conn.close()
- # 检查流量并发送邮件
- alarms = []
- for row in rows:
- flow = row[5]
- if flow > threshold:
- # 构建邮件内容
- mail_content = f"IP {row[2]} 的流量 {flow} 超过阈值 {threshold}!"
- message = MIMEText(mail_content, 'plain', 'utf-8')
- message['From'] = formataddr((str(Header("流量监控系统", 'utf-8')), sender))
- message['To'] = receiver
- message['Subject'] = Header("流量报警", 'utf-8')
- # 发送邮件
- try:
- with smtplib.SMTP_SSL("smtp.qq.com", 465) as smtp:
- smtp.login(sender, password)
- smtp.sendmail(sender, receiver, message.as_string())
- logging.info(f"邮件发送成功:IP {row[2]}")
- except Exception as e:
- logging.error(f"邮件发送失败:{e}")
- alarms.append(f"报警:IP {row[2]} 的流量 {flow} 超过阈值 {threshold}")
- return "\n".join(alarms) if alarms else "流量正常"
复制代码 celery 启动beat (在monitor目次下执行)
- celery -A celery_app beat
复制代码 celery 启动worker
- celery -A celery_app worker -l info -c 4
复制代码 项目心得:
在这次项目中,我了解到了kafka,nginx,filebeat,celery以及发邮件等一系列知识,了解了整个项目的框架。在项目的搭建过程中,也曾由于设置堕落、逻辑不通等等一系列问题而报错,但最终在我的不懈对峙与积极下,最终将这个项目跑通,我的QQ邮箱也收到了报警,感觉收获满满,对整个日志收集项目有了比力全面的了解
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |