大数据-131 - Flink CEP 案例:检测交易活泼用户、超时未交付 ...

莱莱  论坛元老 | 2024-9-16 17:36:27 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1029|帖子 1029|积分 3087

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Flink CEP 开发的流程
  • CEP 开发依靠
  • CEP 案例:恶意登录检测实现

Fline CEP

之前已经介绍过,但是防止各人没看到,这里再简朴介绍以下。
基本概念

Flink CEP(Complex Event Processing)是Apache Flink提供的一个扩展库,用于实时复杂变乱处理。通过Flink CEP,开发者可以从流数据中识别出特定的变乱模式。这在敲诈检测、网络安全、实时监控、物联网等场景中非常有效。
Flink CEP的核心是通过定义变乱模式,从流中检测复杂变乱序列。
详细来说,CEP允许用户:


  • 定义变乱模式:用户可以形貌感爱好的变乱组合(如连续变乱、延迟变乱等)。
  • 匹配模式:Flink CEP从流中搜刮与定义模式相匹配的变乱序列。
  • 处理匹配效果:一旦找到符合模式的变乱序列,用户可以定义如何处理这些匹配。
基本构成部门



  • Pattern(模式):形貌要在变乱流中匹配的变乱序列。可以是单个变乱或多个变乱的组合。常用的模式操纵包括next(紧邻)、followedBy(接续)等。
  • PatternStream(模式流):通过应用模式定义,将变乱流变化为模式流。
  • Select函数:用于从模式流中提取匹配的变乱序列
CEP开发步骤

开发Flink CEP应用的基本步骤包括:
定义变乱流:创建一个DataStream,表示原始的变乱流。
定义变乱模式:利用Flink CEP的API定义变乱模式,比方连续变乱、迟到变乱等。
将模式应用到流中:将定义好的模式应用到变乱流上,生成模式流PatternStream。
提取匹配变乱:利用select函数提取匹配模式的变乱,并定义如何处理这些变乱。
利用场景



  • 敲诈检测:可以通过CEP识别连续发生的非常行为,如频仍的登录实验等。
  • 网络监控:检测一段时间内的特定网络攻击模式。
  • 物联网:分析传感器数据,检测设备非常、温度非常等。
  • 用户行为分析:分析用户在某一时间段内的行为序列,从而作出预测或检测非常。
案例2:检测交易活泼用户

业务需求

业务上必要找出24小时内,至少5次有效交易的用户。
数据源如下:
  1. new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
  2. new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
  3. new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
  4. new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
  5. new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
  6. new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
  7. new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
  8. new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
复制代码


  • 获取数据源
  • Watermark转化
  • keyBy转化
  • 至少5次:timeOrMore(5)
  • 24小时之内:within(Time.hours(24))
  • 模式匹配
  • 提取匹配成功的数据
编写代码

  1. package icu.wzk;
  2. import org.apache.flink.api.common.eventtime.*;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.cep.CEP;
  5. import org.apache.flink.cep.PatternStream;
  6. import org.apache.flink.cep.functions.PatternProcessFunction;
  7. import org.apache.flink.cep.pattern.Pattern;
  8. import org.apache.flink.cep.pattern.conditions.SimpleCondition;
  9. import org.apache.flink.streaming.api.TimeCharacteristic;
  10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  11. import org.apache.flink.streaming.api.datastream.KeyedStream;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import org.apache.flink.util.Collector;
  16. import java.util.List;
  17. import java.util.Map;
  18. public class FlinkCepActiveUser {
  19.     public static void main(String[] args) throws Exception {
  20.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  22.         env.setParallelism(1);
  23.         DataStreamSource<CepActiveUserBean> data = env.fromElements(
  24.                 new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
  25.                 new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
  26.                 new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
  27.                 new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
  28.                 new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
  29.                 new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
  30.                 new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
  31.                 new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
  32.         );
  33.         SingleOutputStreamOperator<CepActiveUserBean> watermark = data
  34.                 .assignTimestampsAndWatermarks(new WatermarkStrategy<CepActiveUserBean>() {
  35.                     @Override
  36.                     public WatermarkGenerator<CepActiveUserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  37.                         return new WatermarkGenerator<CepActiveUserBean>() {
  38.                             long maxTimestamp = Long.MAX_VALUE;
  39.                             long maxOutOfOrderness = 500L;
  40.                             @Override
  41.                             public void onEvent(CepActiveUserBean event, long eventTimestamp, WatermarkOutput output) {
  42.                                 maxTimestamp = Math.max(event.getTimestamp(), maxTimestamp);
  43.                             }
  44.                             @Override
  45.                             public void onPeriodicEmit(WatermarkOutput output) {
  46.                                 output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
  47.                             }
  48.                         };
  49.                     }
  50.                 }.withTimestampAssigner((element, recordTimes) -> element.getTimestamp())
  51.                 );
  52.         KeyedStream<CepActiveUserBean, String> keyed = watermark
  53.                 .keyBy(new KeySelector<CepActiveUserBean, String>() {
  54.                     @Override
  55.                     public String getKey(CepActiveUserBean value) throws Exception {
  56.                         return value.getUsername();
  57.                     }
  58.                 });
  59.         Pattern<CepActiveUserBean, CepActiveUserBean> pattern = Pattern
  60.                 .<CepActiveUserBean>begin("start")
  61.                 .where(new SimpleCondition<CepActiveUserBean>() {
  62.                     @Override
  63.                     public boolean filter(CepActiveUserBean value) throws Exception {
  64.                         return value.getPrice() > 0;
  65.                     }
  66.                 })
  67.                 .timesOrMore(5)
  68.                 .within(Time.hours(24));
  69.         PatternStream<CepActiveUserBean> parentStream = CEP.pattern(keyed, pattern);
  70.         SingleOutputStreamOperator<CepActiveUserBean> process = parentStream
  71.                 .process(new PatternProcessFunction<CepActiveUserBean, CepActiveUserBean>() {
  72.                     @Override
  73.                     public void processMatch(Map<String, List<CepActiveUserBean>> map, Context context, Collector<CepActiveUserBean> collector) throws Exception {
  74.                         System.out.println("map: " + map);
  75.                     }
  76.                 });
  77.         process.print();
  78.         env.execute("FlinkCepActiveUser");
  79.     }
  80. }
  81. class CepActiveUserBean {
  82.     private String username;
  83.     private Double price;
  84.     private Long timestamp;
  85.     public CepActiveUserBean(String username, Double price, Long timestamp) {
  86.         this.username = username;
  87.         this.price = price;
  88.         this.timestamp = timestamp;
  89.     }
  90.     public String getUsername() {
  91.         return username;
  92.     }
  93.     public void setUsername(String username) {
  94.         this.username = username;
  95.     }
  96.     public Double getPrice() {
  97.         return price;
  98.     }
  99.     public void setPrice(Double price) {
  100.         this.price = price;
  101.     }
  102.     public Long getTimestamp() {
  103.         return timestamp;
  104.     }
  105.     public void setTimestamp(Long timestamp) {
  106.         this.timestamp = timestamp;
  107.     }
  108.     @Override
  109.     public String toString() {
  110.         return "CepActiveUserBean{" +
  111.                 "username='" + username + '\'' +
  112.                 ", price=" + price +
  113.                 ", timestamp=" + timestamp +
  114.                 '}';
  115.     }
  116. }
复制代码
运行效果

  1. map: {start=[CepActiveUserBean{username='100XX', price=100.0, timestamp=1597905235000}, CepActiveUserBean{username='100XX', price=200.0, timestamp=1597905236000}, CepActiveUserBean{username='100XX', price=300.0, timestamp=1597905237000}, CepActiveUserBean{username='100XX', price=400.0, timestamp=1597905238000}, CepActiveUserBean{username='100XX', price=500.0, timestamp=1597905239000}]}
  2. Process finished with exit code 0
复制代码
运行效果如下图所示:

案例3:超时未支付

业务需求

找出下单后10分钟没有支付的订单,数据源如下:
  1. new TimeOutPayBean(1L, "create", 1597905234000L),
  2. new TimeOutPayBean(1L, "pay", 1597905235000L),
  3. new TimeOutPayBean(2L, "create", 1597905236000L),
  4. new TimeOutPayBean(2L, "pay", 1597905237000L),
  5. new TimeOutPayBean(3L, "create", 1597905239000L)
复制代码


  • 获取数据源
  • 转 Watermark
  • keyBy 转化
  • 做出 Pattern (下单以后10分钟未支付)
  • 模式匹配
  • 取出匹配成功的数据
编写代码

  1. package icu.wzk;
  2. import org.apache.flink.api.common.eventtime.*;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.cep.CEP;
  5. import org.apache.flink.cep.PatternSelectFunction;
  6. import org.apache.flink.cep.PatternStream;
  7. import org.apache.flink.cep.PatternTimeoutFunction;
  8. import org.apache.flink.cep.pattern.Pattern;
  9. import org.apache.flink.cep.pattern.conditions.IterativeCondition;
  10. import org.apache.flink.streaming.api.TimeCharacteristic;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.datastream.KeyedStream;
  14. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  15. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  16. import org.apache.flink.streaming.api.windowing.time.Time;
  17. import org.apache.flink.util.OutputTag;
  18. import java.util.List;
  19. import java.util.Map;
  20. public class FlinkCepTimeOutPay {
  21.     public static void main(String[] args) throws Exception {
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  24.         env.setParallelism(1);
  25.         DataStreamSource<TimeOutPayBean> data = env.fromElements(
  26.                 new TimeOutPayBean(1L, "create", 1597905234000L),
  27.                 new TimeOutPayBean(1L, "pay", 1597905235000L),
  28.                 new TimeOutPayBean(2L, "create", 1597905236000L),
  29.                 new TimeOutPayBean(2L, "pay", 1597905237000L),
  30.                 new TimeOutPayBean(3L, "create", 1597905239000L)
  31.         );
  32.         DataStream<TimeOutPayBean> watermark = data
  33.                 .assignTimestampsAndWatermarks(new WatermarkStrategy<TimeOutPayBean>() {
  34.                     @Override
  35.                     public WatermarkGenerator<TimeOutPayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  36.                         return new WatermarkGenerator<TimeOutPayBean>() {
  37.                             long maxTimestamp = Long.MAX_VALUE;
  38.                             long maxOutOfOrderness = 500L;
  39.                             @Override
  40.                             public void onEvent(TimeOutPayBean event, long eventTimestamp, WatermarkOutput output) {
  41.                                 maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
  42.                             }
  43.                             @Override
  44.                             public void onPeriodicEmit(WatermarkOutput output) {
  45.                                 output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
  46.                             }
  47.                         };
  48.                     }
  49.                 }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
  50.                 );
  51.         KeyedStream<TimeOutPayBean, Long> keyedStream = watermark
  52.                 .keyBy(new KeySelector<TimeOutPayBean, Long>() {
  53.                     @Override
  54.                     public Long getKey(TimeOutPayBean value) throws Exception {
  55.                         return value.getUserId();
  56.                     }
  57.                 });
  58.         // 逻辑处理代码
  59.         OutputTag<TimeOutPayBean> orderTimeoutOutput = new OutputTag<>("orderTimeout") {};
  60.         Pattern<TimeOutPayBean, TimeOutPayBean> pattern = Pattern
  61.                 .<TimeOutPayBean>begin("begin")
  62.                 .where(new IterativeCondition<TimeOutPayBean>() {
  63.                     @Override
  64.                     public boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {
  65.                         return timeOutPayBean.getOperation().equals("create");
  66.                     }
  67.                 })
  68.                 .followedBy("pay")
  69.                 .where(new IterativeCondition<TimeOutPayBean>() {
  70.                     @Override
  71.                     public boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {
  72.                         return timeOutPayBean.getOperation().equals("pay");
  73.                     }
  74.                 })
  75.                 .within(Time.seconds(600));
  76.         PatternStream<TimeOutPayBean> patternStream = CEP.pattern(keyedStream, pattern);
  77.         SingleOutputStreamOperator<TimeOutPayBean> result = patternStream
  78.                 .select(orderTimeoutOutput, new PatternTimeoutFunction<TimeOutPayBean, TimeOutPayBean>() {
  79.                     @Override
  80.                     public TimeOutPayBean timeout(Map<String, List<TimeOutPayBean>> map, long l) throws Exception {
  81.                         return map.get("begin").get(0);
  82.                     }
  83.                 }, new PatternSelectFunction<TimeOutPayBean, TimeOutPayBean>() {
  84.                     @Override
  85.                     public TimeOutPayBean select(Map<String, List<TimeOutPayBean>> map) throws Exception {
  86.                         return map.get("pay").get(0);
  87.                     }
  88.                 });
  89.         // 输出结果
  90.         // result.print();
  91.         System.out.println("==============");
  92.         DataStream<TimeOutPayBean> sideOutput = result
  93.                 .getSideOutput(orderTimeoutOutput);
  94.         sideOutput.print();
  95.         // 执行
  96.         env.execute("FlinkCepTimeOutPay");
  97.     }
  98. }
  99. class TimeOutPayBean {
  100.     private Long userId;
  101.     private String operation;
  102.     private Long timestamp;
  103.     public TimeOutPayBean(Long userId, String operation, Long timestamp) {
  104.         this.userId = userId;
  105.         this.operation = operation;
  106.         this.timestamp = timestamp;
  107.     }
  108.     public Long getUserId() {
  109.         return userId;
  110.     }
  111.     public void setUserId(Long userId) {
  112.         this.userId = userId;
  113.     }
  114.     public String getOperation() {
  115.         return operation;
  116.     }
  117.     public void setOperation(String operation) {
  118.         this.operation = operation;
  119.     }
  120.     public Long getTimestamp() {
  121.         return timestamp;
  122.     }
  123.     public void setTimestamp(Long timestamp) {
  124.         this.timestamp = timestamp;
  125.     }
  126.     @Override
  127.     public String toString() {
  128.         return "TimeOutPayBean{" +
  129.                 "userId=" + userId +
  130.                 ", operation='" + operation + '\'' +
  131.                 ", timestamp=" + timestamp +
  132.                 '}';
  133.     }
  134. }
复制代码
运行效果

控制台输出为:
  1. ==============
  2. TimeOutPayBean{userId=1, operation='pay', timestamp=1597905235000}
  3. TimeOutPayBean{userId=3, operation='create', timestamp=1597905239000}
  4. TimeOutPayBean{userId=2, operation='pay', timestamp=1597905237000}
  5. Process finished with exit code 0
复制代码
对应截图如下:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莱莱

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表