本文基于FlinkCDC 3.1.1和Flink 1.18版本,对Mysql数据库的binlog增量监听,实现对DDL语句审计告警的本领。
一、项目本领:
基于FlinkCDC-3.1.1和Flink-1.18技能栈,采取Flink on yarn摆设模式,实现增量监听Mysql数据库,对Mysql中的DDL语句做审计告警,将告警消息发送至钉钉告警群。
二、紧张技能架构:
技能栈版本java1.8FlinkCDC3.1.1Flink1.18.0Hadoop (yarn)2.8.3三、摆设流程:
- 安装Java环境和Hadoop环境,步调略;
- 安装Flink环境:
- -> wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
- -> tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
- -> mkdir ./flink-1.18.0/job
复制代码
- 下载Flink与Hadoop整合干系依赖jar包:
- -> cd flink-1.18.0/lib
- -> wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
复制代码
- 设置flink-conf.yaml文件,紧张设置如下参数:
设置项value阐明state.backendfilesystem状态后端存储模式state.checkpoints.dirhdfs://${HOST_NAME}:8020/flink/flink-checkpointscheckpoint地点,基于hdfsstate.savepoints.dirhdfs://${HOST_NAME}:8020/flink/flink-savepointssavepoint地点,同样基于hdfsenv.java.opts“-Dfile.encoding=UTF-8”全局编码(**紧张设置,必须要加引号)
- 创建钉钉群,并创建告警呆板人。
创建过程略;(注意:当前版本只能处理处罚未加签呆板人,关键字设置为告警,审计);
- 打包并上传:
- -> git clone
- -> mvn clean package -Dmaven.test.skip=ture
复制代码 将jar包上传到/flink-1.18.0/job目次下。
你可以手动新增设置文件,也可以将项目中的config.json文件直接上传到/flink-1.18.0/job目次下,如下展示新增过程:- -> cd /flink-1.18.0/job
- -> vim config.json
复制代码- {
- "auditName": "sql-audit",
- "job": {
- "serverId": "",
- "startMode": "latest",
- "timestamp": ""
- },
- "source": {
- "sourceType": "mysql",
- "hostname": "127.0.0.1",
- "port": 3306,
- "username": "root",
- "password": "root",
- "databaseList": [],
- "tableList": [],
- "blackDatabaseList": []
- },
- "alarm": {
- "alarmType": "WEBHOOK",
- "webhook": "https://oapi.dingtalk.com/robot/send?access_token=${YOUR_TOKEN}",
- "msgtype": "text",
- "at": {
- "isAtAll": false,
- "atMobiles": [
- "13400000000"
- ]
- }
- }
- }
复制代码 设置文件内容形貌如下:
参数名是否必填默认值阐明auditName是sql-audit使命名称serverId否-mysql slave server_idstartMode是latestFlink启动模式(支持 initial,earliest,latest及timestamp四种模式)timestamp否Null如果启动模式设置为timestamp,则该值必须设置sourceType是mysql数据源范例,当前只支持mysqlhostname是-mysql毗连ipport是-mysql毗连端口username是-mysql毗连用户名password是-mysql毗连暗码databaseList否-Flinkcdc 监听的数据库列表tableList否-Flinkcdc 监听的数据表列表blackDatabaseList否-须要过滤掉的黑名单数据库列表alarmType是WEBHOOK告警范例,默以为钉钉,当前也只支持钉钉webhook是-webhook地点msgtype是text消息范例,钉钉消息范例支持markdown和textisAtAll是false是否@全部人atMobiles是@人手机号- -> vim start.sh
- -> sh start.sh
复制代码- #!/bin/bash
- export JAVA_HOME=${YOUR_JAVA_PATH}/jdk1.8.0_181
- export PATH=$JAVA_HOME/bin:$PATH
- export HADOOP_CLASSPATH=`hadoop classpath`
- export HADOOP_USER_NAME=hdfs
- flink_home=$PWD/../../bin
- jar_path=$PWD/sql-audit.jar
- config_path=$PWD/config.json
- job_name=sql-audit-app
- $flink_home/flink run -c com.someway.MySQLAuditExec -m yarn-cluster -ynm $job_name -p 1 -yjm 1024 -ytm 1024 -yD env.java.opts="-Dfile.encoding=UTF-8" ${jar_path} ${config_path}
复制代码 通过在数据库创建表,删除表,truncate表和alert表,可以或许捕捉如下审计告警:
- Create 告警样例:
- DROP 告警样例:
- Truncate 告警样例:
- Alter 告警样例:
四、末了
当前项目属于V0.1版本,内里支持的数据源不是很全面,告警渠道也有限。如果有须要的小同伴,可以自行下载代码,然后做二次开开发。FlinkCDC 3.0版本提供了许多新本领,为数据同步提供了更多的保障机制和大概性,后续会继续探索其他新功能和新利用场景。有须要交换的小同伴,欢迎关注我的公众号,一起交换学习。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金 |