深入浅出Flink CEP丨如何通过Flink SQL作业动态更新Flink CEP作业 ...

十念  论坛元老 | 2024-12-19 12:36:36 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1031|帖子 1031|积分 3093

复杂变乱处理(CEP)是一种对变乱流进行分析的技能,它能够辨认出数据流中的变乱序列是否符合特定的模式,并允许用户对这些模式进行处理。Flink CEP 是 CEP 在 Apache Flink 中的具体实现,是 Apache Flink 的一个库,使用户可以在 Flink 的流处理引擎上进行复杂变乱的处理。
该技能广泛应用于金融风控、网络安全、物流跟踪和电商推荐系统等范畴。比如在风险用户辨认中,CEP可以读取并分析客户行为日志,将 5 分钟内转账次数超过 10 次且金额大于 10000 的客户辨认为异常用户。
而在袋鼠云实时开辟平台上,有更加高效的功能支持,允许通过Flink SQL作业动态更新Flink CEP作业。本文将通过产品Demo演示,展示如何在实时开辟平台中构建一个灵活加载最新规则的 Flink CEP 作业,处理来自上游 Kafka 的数据流,让大家更加直观地相识这一整套流程及其上风地点。
一、FLink CEP简介
1、CEP 的核心概念
模式(Pattern):这是定义复杂变乱的规则,它由一系列简单的变乱构成,这些变乱之间通过一定的逻辑关系相连。
变乱流(Event Stream):这是实时流入系统的数据流,CEP 会在这些流上寻找匹配的模式。
匹配(Match):当变乱流中的变乱序列符合定义的模式时,就会产生一个匹配。
2、Flink CEP 的使用流程
定义模式:使用 Flink CEP 的 API 定义感兴趣的变乱模式。
应用模式:将定义的模式应用到变乱流上,Flink CEP 会在流上寻找匹配该模式的变乱序列。
处理匹配变乱:对找到的匹配变乱进行处理,执行相应的业务逻辑。
Flink CEP 的上风在于能够处理高吞吐量、低延迟的实时数据流,而且提供了丰富的 API 来定义复杂的变乱模式。同时,Flink CEP 也支持基于变乱时间的处理,允许用户处理乱序到达的数据。
3、动态FLink CEP 背景介绍
在风险控制大概模式匹配等场景下,用户常常会想要在模式仍然能够提供服务的环境下,改变变乱必要匹配的模式。在目前的 Flink CEP 中,一个 CEP 算子有一个固定的 CEP 模式,不支持改变。因此,为了达到上述目的,用户必须重启 Flink 作业,并等候相对较长的更新时间。
另一种常见环境是,一个变乱流必要与多个模式匹配。虽然当前的 Flink CEP 不支持在一个 CEP 运算符中匹配多个模式,但用户必须为每个模式设置一个 Flink 作业或一个运算符。这可能会浪费内存和计算资源。为此我们支持了动态加载规则,支持任务不重启Flink 作用动态的匹配规则。
4、FLink CEP 的应用场景
Flink CEP 的应用场景包括:
风险控制:例如,检测用户行为模式,假如发现异常行为(如短时间内多次登录失败),则可以触发相应的风险控制步伐。
用户画像:通太过析用户的行为变乱流,可以构建用户画像,进而实现精准营销。
运维监控:在企业服务的运维管理中,CEP 可以用来配置复杂的监控规则,以实现对服务状态的实时监控。
二、ELink 动态CEP方案

PatternProcessorDiscoverer 在 Jobmanager上定期抓取规则更新信息并将规则更新信息推送给 taskmanager 上的 CEP 算子。CEP 算子获取到更新的规则后更新 NFA(非确定有限主动机)。
1、PatternProcessorDiscoverer
PatternProcessorDiscoverer内部维护一个定时线程,通过定时线程定期轮询 database 获取规则信息。假如感知到规则变更则会将规则变更信息通知给 PatternProcessorManager。
2、DynamicCepOperatorCoordinator
DynamicCepOperatorCoordinator实现了 PatternProcessorManager和 OperatorCoordinator接口。
DynamicCepOperatorCoordinator接受到来自 PatternProcessorManager的变更信息后会将变更信息通过UpdatePatternProcessorEvent发送给每一个 DynamicCep 算子。
3、DynamicCepOperator
DynamicCepOperator实现了OperatorEventHandler接口,接收来自DynamicCepOperatorCoordinator 的UpdatePatternProcessorEvent。对于每个UpdatePatternProcessorEvent,DynamicCepOperator会利用规则处理器将 JSON 大概 SQL 体现的规则转为Pattern,再由Pattern获得 NFA。多个Pattern 的环境下会生成多个 NFA。NFA 会与PatternProcessFunction一起被生存到 Map 中。
当上游算子的数据发送到DynamicCepOperator时,同一条数据会被多个 NFA 处理,当某一个NFA match到数据之后,会将数据发送给对应的PatternProcessFunction处理,被PatternProcessFunction处理完的数据则会被发送给下游算子。这里必要注意的是:当有多个规则与PatternProcessFunction 时,不同的PatternProcessFunction处理完的数据布局必要保持一致,因为数据都会统一发送到下游算子,下游算子能接受的数据布局是固定的。
4、PatternProcessFunction
PatternProcessFunction的作用是处理匹配到的查询。PatternProcessFunction会在jobmanager中初始化,然后通过序列化后发送到taskmanager中,因此必要注意下jobmanager和中taskmanager加载到的PatternProcessFunction来自同一个依赖包,否则会有序列化问题。目前不支持对包含了PatternProcessFunction的jar进行热加载,以是假如必要更换新jar来增加PatternProcessFunction,只能通过重启任务实现。
对于使用 Flink Dynamic CEP on SQL 模式:还必要提供一种手段将包含了 PatternProcessFunction的 jar 提交到jm和tm上,数栈上可以通过ADD JAR语法实现。
三、FLink 动态CEP DEMO演示
1、规则阐明
此规则定义了一个简单的流程,它有三个节点,每个节点都基于 action 的值来决定是否消费变乱。假如 action 的值依次为 0、1、2,那么变乱将依次通过 start、middle、end 节点。假如 action 的值不满足当前节点的条件,流程将跳至下一个节点。这个规则没有定义时间窗口,而且匹配到变乱模式后不会跳过任何变乱。
2、定义匹配规则
在Mysql 中进行SQL 编辑页面创建一张数据表,命名为 cep_demo,四个字段 id、version、pattern、function、type、is_deleted。id 和 version 用于标名唯一的版本和 id 信息,pattern 代表了序列化后的 Pattern Stream,function 用于指代规则 match 后执行的 function。规则 match 到数据后,数据会由该 function 处理然后发送到下游算子。type 用于代表规则体现类型。假如以 SQL 体现规则,则填 SQL;假如以 JSON 体现规则,则填写 JSON。然后编写 FlinkSQL 作业并提交到任务调度中运行。
{
"afterMatchStrategy": {
  1. "type": "NO_SKIP"
复制代码
},
"edges": [
  1. {
  2.        
  3.   "source": "middle",
  4.                
  5.   "target": "end",
  6.                
  7.   "type": "STRICT"
  8.                
  9. },
  10.        
  11. {
  12.        
  13.   "source": "start",
  14.                
  15.   "target": "middle",
  16.                
  17.   "type": "SKIP_TILL_NEXT"
  18.                
  19. }
复制代码
],
"name": "end",
"nodes": [
  1. {
  2.        
  3.   "condition": {
  4.                
  5.     "expression": "action == 2",
  6.                        
  7.     "type": "AVIATOR"
  8.                        
  9.   },
  10.                
  11.   "name": "end",
  12.                
  13.   "quantifier": {
  14.                
  15.     "consumingStrategy": "SKIP_TILL_NEXT",
  16.                        
  17.     "properties": [
  18.                        
  19.       "SINGLE"
  20.                                
  21.     ]
  22.                        
  23.   },
  24.                
  25.   "type": "ATOMIC"
  26.                
  27. },
  28.        
  29. {
  30.        
  31.        
  32.   "condition": {
  33.                
  34.     "expression": "action == 1",
  35.                        
  36.     "type": "AVIATOR"
  37.                        
  38.   },
  39.                
  40.   "name": "middle",
  41.                
  42.   "quantifier": {
  43.                
  44.     "consumingStrategy": "SKIP_TILL_NEXT",
  45.                        
  46.     "properties": [
  47.                        
  48.       "SINGLE"
  49.                                
  50.     ]
  51.                        
  52.   },
  53.                
  54.   "type": "ATOMIC"
  55.                
  56. },
  57.        
  58. {
  59.        
  60.   "condition": {
  61.                
  62.     "expression": "action == 0",
  63.                        
  64.     "type": "AVIATOR"
  65.                        
  66.   },
  67.                
  68.   "name": "start",
  69.                
  70.   "quantifier": {
  71.                
  72.     "consumingStrategy": "SKIP_TILL_NEXT",
  73.                        
  74.     "properties": [
  75.                        
  76.       "SINGLE"
  77.                                
  78.     ]
  79.                        
  80.   },
  81.                
  82.   "type": "ATOMIC"
  83.                
  84. }
复制代码
],
"quantifier": {
  1. "consumingStrategy": "SKIP_TILL_NEXT",
  2.        
  3. "properties": [
  4.        
  5.   "SINGLE"
  6.                
  7. ]
复制代码
},
"type": "COMPOSITE",
"version": 1,
"window": null
}
3、数据开辟编写FlinkSQL
FlinkSQL:
ADD JAR WITH /data/sftp/11_dynamic-cep-jar-1_dynamic-cep-jar-1.0-SNAPSHOT.jar AS functions.jar;
CREATE TABLE source (
  1. id  INT,
  2.        
  3. name      VARCHAR,
  4.        
  5. productionId INT,
  6.        
  7. action INT,
  8.        
  9. eventTime BIGINT,
  10.        
  11. procTime AS PROCTIME()
复制代码
) WITH (
  1. 'connector' = 'kafka-x',
  2.        
  3. 'topic'     = 'stream',
  4.        
  5. 'properties.group.id'     = 'stream',
  6.        
  7. 'scan.startup.mode'     = 'latest-offset',
  8.        
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10.        
  11. 'format'    = 'json'
复制代码
);
CREATE TABLE sink
(
  1. id          int
复制代码
) WITH (
  1.   'connector' = 'mysql-x',  
  2.                
  3.   'url' = 'jdbc:mysql://localhost:3306/stream',  
  4.                
  5.   'schema-name' = 'stream',  
  6.                
  7.   'table-name' = 'stream_cep_001',  
  8.                
  9.   'username' = 'drpeco',  
  10.                
  11.   'password' = '******',  
  12.                
  13.   'sink.buffer-flush.max-rows' = '1024',
  14.                
  15.   'sink.buffer-flush.interval' = '10000',
  16.                
  17.   'sink.all-replace' = 'true',
  18.                
  19.   'sink.parallelism' = '1'
  20.                
  21.   );
复制代码
INSERT INTO sink
SELECT id_total as id
FROM source
  1. DYNAMIC MATCH_RECOGNIZE (
  2.        
  3.     PARTITION BY productionId
  4.                        
  5.     ORDER BY procTime
  6.                        
  7.     OUTPUT (id_total int)  
  8.                        
  9.     WITH_PATTERN (
  10.                        
  11.         'tableName' = 'dynamic_cep',
  12.                                        
  13.         'user' = 'drpeco',
  14.                                        
  15.         'password' = '******',
  16.                                        
  17.                                        
  18.         'driver' = 'com.mysql.cj.jdbc.Driver',
  19.                                        
  20.         'jdbcUrl' = 'jdbc:mysql://localhost:3306/cep',
  21.                                        
  22.         'jdbcIntervalMillis' = '1000'
  23.                                        
  24.     )
  25.                        
  26. )  
  27.        
  28. AS T;
复制代码

4、提交调度运行
FlinkSQL任务语法检查成功后,提交调度任务运行中状态时,kafka 源表Topic 中依次打入Demo数据。
{
  1. "id": 1,
  2.        
  3. "name" : "middle",
  4.        
  5. "productionId" : 11,
  6.        
  7. "action" : 0,
  8.        
  9. "eventTime" : 1
复制代码
};
{
  1. "id": 2,
  2.        
  3. "name" : "middle",
  4.        
  5. "productionId" : 11,
  6.        
  7. "action" : 1,
  8.        
  9. "eventTime" : 1
复制代码
};
{
  1. "id": 3,
  2.        
  3. "name" : "middle",
  4.        
  5. "productionId" : 11,
  6.        
  7. "action" : 2,
  8.        
  9. "eventTime" : 1
复制代码
}

接下来我们检查匹配的结果,action 的值依次为 0、1、2。Jar包中的function为将匹配到的数据,id字段sum相加得出最后id_totle值作为输出条件。最后输出的结果 id_total 值为 6。

此时在任务不停止的状态下,直接操作Mysql数据库修改CEP Json 将规则匹配条件改为 action 的值依次为 0、1、3 为匹配条件。再次打入与上次同样的 kafka数据,此时匹配失败则不输出id_total值。
四、总结
本文和大家分享了什么是CEP,Flink CEP的原理与使用方法,并介绍了如何通过Flink SQL作业动态更新Flink CEP作业,可以说是干货满满
《数据资产管理白皮书》下载地址https://www.dtstack.com/resources/1073/?src=szsm
《行业指标体系白皮书》下载地址https://www.dtstack.com/resources/1057/?src=szsm
《数据治理行业实践白皮书》下载地址https://www.dtstack.com/resources/1001/?src=szsm
《数栈V6.0产品白皮书》下载地址https://www.dtstack.com/resources/1004/?src=szsm
想相识或咨询更多有关袋鼠云大数据产品、行业办理方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szsm
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技能群」,交换最新开源技能信息,群号码:30537511,项目地址:https://github.com/DTStack
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057?src=szsm
《数栈产品白皮书》下载地址:https://www.dtstack.com/resources/1004?src=szsm
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm
想相识或咨询更多有关大数据产品、行业办理方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szbky

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

十念

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表