基于FlinkCDC-3.1.1&Flink-1.18实现MySQL DDL审计告警

[复制链接]
发表于 2026-2-8 21:59:37 | 显示全部楼层 |阅读模式
本文基于FlinkCDC 3.1.1和Flink 1.18版本,对Mysql数据库的binlog增量监听,实现对DDL语句审计告警的本领。
一、项目本领:

基于FlinkCDC-3.1.1和Flink-1.18技能栈,采取Flink on yarn摆设模式,实现增量监听Mysql数据库,对Mysql中的DDL语句做审计告警,将告警消息发送至钉钉告警群。
二、紧张技能架构:

技能栈版本java1.8FlinkCDC3.1.1Flink1.18.0Hadoop (yarn)2.8.3三、摆设流程:


  • 安装Java环境和Hadoop环境,步调略;
  • 安装Flink环境:
  1. -> wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
  2. -> tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
  3. -> mkdir ./flink-1.18.0/job
复制代码

  • 下载Flink与Hadoop整合干系依赖jar包:
  1. -> cd flink-1.18.0/lib
  2. -> wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
复制代码

  • 设置flink-conf.yaml文件,紧张设置如下参数
设置项value阐明state.backendfilesystem状态后端存储模式state.checkpoints.dirhdfs://${HOST_NAME}:8020/flink/flink-checkpointscheckpoint地点,基于hdfsstate.savepoints.dirhdfs://${HOST_NAME}:8020/flink/flink-savepointssavepoint地点,同样基于hdfsenv.java.opts“-Dfile.encoding=UTF-8”全局编码(**紧张设置,必须要加引号)

  • 创建钉钉群,并创建告警呆板人
    创建过程略;(注意:当前版本只能处理处罚未加签呆板人,关键字设置为告警,审计);
  • 打包并上传:
  1. -> git clone
  2. -> mvn clean package -Dmaven.test.skip=ture
复制代码
​      将jar包上传到/flink-1.18.0/job目次下。

  • 创建立置文件:
​       你可以手动新增设置文件,也可以将项目中的config.json文件直接上传到/flink-1.18.0/job目次下,如下展示新增过程:
  1. -> cd /flink-1.18.0/job
  2. -> vim config.json
复制代码
  1. {
  2.   "auditName": "sql-audit",
  3.   "job": {
  4.     "serverId": "",
  5.     "startMode": "latest",
  6.     "timestamp": ""
  7.   },
  8.   "source": {
  9.     "sourceType": "mysql",
  10.     "hostname": "127.0.0.1",
  11.     "port": 3306,
  12.     "username": "root",
  13.     "password": "root",
  14.     "databaseList": [],
  15.     "tableList": [],
  16.     "blackDatabaseList": []
  17.   },
  18.   "alarm": {
  19.     "alarmType": "WEBHOOK",
  20.     "webhook": "https://oapi.dingtalk.com/robot/send?access_token=${YOUR_TOKEN}",
  21.     "msgtype": "text",
  22.     "at": {
  23.       "isAtAll": false,
  24.       "atMobiles": [
  25.         "13400000000"
  26.       ]
  27.     }
  28.   }
  29. }
复制代码
设置文件内容形貌如下:
参数名是否必填默认值阐明auditName是sql-audit使命名称serverId否-mysql slave server_idstartMode是latestFlink启动模式(支持 initial,earliest,latest及timestamp四种模式)timestamp否Null如果启动模式设置为timestamp,则该值必须设置sourceType是mysql数据源范例,当前只支持mysqlhostname是-mysql毗连ipport是-mysql毗连端口username是-mysql毗连用户名password是-mysql毗连暗码databaseList否-Flinkcdc 监听的数据库列表tableList否-Flinkcdc 监听的数据表列表blackDatabaseList否-须要过滤掉的黑名单数据库列表alarmType是WEBHOOK告警范例,默以为钉钉,当前也只支持钉钉webhook是-webhook地点msgtype是text消息范例,钉钉消息范例支持markdown和textisAtAll是false是否@全部人atMobiles是@人手机号

  • 创建启动脚本
  1. -> vim start.sh
  2. -> sh start.sh
复制代码
  1. #!/bin/bash
  2. export JAVA_HOME=${YOUR_JAVA_PATH}/jdk1.8.0_181
  3. export PATH=$JAVA_HOME/bin:$PATH
  4. export HADOOP_CLASSPATH=`hadoop classpath`
  5. export HADOOP_USER_NAME=hdfs
  6. flink_home=$PWD/../../bin
  7. jar_path=$PWD/sql-audit.jar
  8. config_path=$PWD/config.json
  9. job_name=sql-audit-app
  10. $flink_home/flink run -c com.someway.MySQLAuditExec -m yarn-cluster -ynm $job_name  -p 1 -yjm 1024 -ytm 1024 -yD env.java.opts="-Dfile.encoding=UTF-8" ${jar_path} ${config_path}
复制代码

  • 告警测试
通过在数据库创建表,删除表,truncate表和alert表,可以或许捕捉如下审计告警:

  • Create 告警样例:

  • DROP 告警样例

  • Truncate 告警样例:

  • Alter 告警样例:

四、末了

当前项目属于V0.1版本,内里支持的数据源不是很全面,告警渠道也有限。如果有须要的小同伴,可以自行下载代码,然后做二次开开发。FlinkCDC 3.0版本提供了许多新本领,为数据同步提供了更多的保障机制和大概性,后续会继续探索其他新功能和新利用场景。有须要交换的小同伴,欢迎关注我的公众号,一起交换学习。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表