点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
- Flink CEP 开发的流程
- CEP 开发依靠
- CEP 案例:恶意登录检测实现
Fline CEP
之前已经介绍过,但是防止各人没看到,这里再简朴介绍以下。
基本概念
Flink CEP(Complex Event Processing)是Apache Flink提供的一个扩展库,用于实时复杂变乱处理。通过Flink CEP,开发者可以从流数据中识别出特定的变乱模式。这在敲诈检测、网络安全、实时监控、物联网等场景中非常有效。
Flink CEP的核心是通过定义变乱模式,从流中检测复杂变乱序列。
详细来说,CEP允许用户:
- 定义变乱模式:用户可以形貌感爱好的变乱组合(如连续变乱、延迟变乱等)。
- 匹配模式:Flink CEP从流中搜刮与定义模式相匹配的变乱序列。
- 处理匹配效果:一旦找到符合模式的变乱序列,用户可以定义如何处理这些匹配。
基本构成部门
- Pattern(模式):形貌要在变乱流中匹配的变乱序列。可以是单个变乱或多个变乱的组合。常用的模式操纵包括next(紧邻)、followedBy(接续)等。
- PatternStream(模式流):通过应用模式定义,将变乱流变化为模式流。
- Select函数:用于从模式流中提取匹配的变乱序列
CEP开发步骤
开发Flink CEP应用的基本步骤包括:
定义变乱流:创建一个DataStream,表示原始的变乱流。
定义变乱模式:利用Flink CEP的API定义变乱模式,比方连续变乱、迟到变乱等。
将模式应用到流中:将定义好的模式应用到变乱流上,生成模式流PatternStream。
提取匹配变乱:利用select函数提取匹配模式的变乱,并定义如何处理这些变乱。
利用场景
- 敲诈检测:可以通过CEP识别连续发生的非常行为,如频仍的登录实验等。
- 网络监控:检测一段时间内的特定网络攻击模式。
- 物联网:分析传感器数据,检测设备非常、温度非常等。
- 用户行为分析:分析用户在某一时间段内的行为序列,从而作出预测或检测非常。
案例2:检测交易活泼用户
业务需求
业务上必要找出24小时内,至少5次有效交易的用户。
数据源如下:
- new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
- new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
- new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
- new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
- new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
- new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
- new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
- new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
复制代码
- 获取数据源
- Watermark转化
- keyBy转化
- 至少5次:timeOrMore(5)
- 24小时之内:within(Time.hours(24))
- 模式匹配
- 提取匹配成功的数据
编写代码
- package icu.wzk;
- import org.apache.flink.api.common.eventtime.*;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.cep.CEP;
- import org.apache.flink.cep.PatternStream;
- import org.apache.flink.cep.functions.PatternProcessFunction;
- import org.apache.flink.cep.pattern.Pattern;
- import org.apache.flink.cep.pattern.conditions.SimpleCondition;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.Collector;
- import java.util.List;
- import java.util.Map;
- public class FlinkCepActiveUser {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
- DataStreamSource<CepActiveUserBean> data = env.fromElements(
- new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
- new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
- new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
- new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
- new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
- new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
- new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
- new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
- );
- SingleOutputStreamOperator<CepActiveUserBean> watermark = data
- .assignTimestampsAndWatermarks(new WatermarkStrategy<CepActiveUserBean>() {
- @Override
- public WatermarkGenerator<CepActiveUserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
- return new WatermarkGenerator<CepActiveUserBean>() {
- long maxTimestamp = Long.MAX_VALUE;
- long maxOutOfOrderness = 500L;
- @Override
- public void onEvent(CepActiveUserBean event, long eventTimestamp, WatermarkOutput output) {
- maxTimestamp = Math.max(event.getTimestamp(), maxTimestamp);
- }
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
- }
- };
- }
- }.withTimestampAssigner((element, recordTimes) -> element.getTimestamp())
- );
- KeyedStream<CepActiveUserBean, String> keyed = watermark
- .keyBy(new KeySelector<CepActiveUserBean, String>() {
- @Override
- public String getKey(CepActiveUserBean value) throws Exception {
- return value.getUsername();
- }
- });
- Pattern<CepActiveUserBean, CepActiveUserBean> pattern = Pattern
- .<CepActiveUserBean>begin("start")
- .where(new SimpleCondition<CepActiveUserBean>() {
- @Override
- public boolean filter(CepActiveUserBean value) throws Exception {
- return value.getPrice() > 0;
- }
- })
- .timesOrMore(5)
- .within(Time.hours(24));
- PatternStream<CepActiveUserBean> parentStream = CEP.pattern(keyed, pattern);
- SingleOutputStreamOperator<CepActiveUserBean> process = parentStream
- .process(new PatternProcessFunction<CepActiveUserBean, CepActiveUserBean>() {
- @Override
- public void processMatch(Map<String, List<CepActiveUserBean>> map, Context context, Collector<CepActiveUserBean> collector) throws Exception {
- System.out.println("map: " + map);
- }
- });
- process.print();
- env.execute("FlinkCepActiveUser");
- }
- }
- class CepActiveUserBean {
- private String username;
- private Double price;
- private Long timestamp;
- public CepActiveUserBean(String username, Double price, Long timestamp) {
- this.username = username;
- this.price = price;
- this.timestamp = timestamp;
- }
- public String getUsername() {
- return username;
- }
- public void setUsername(String username) {
- this.username = username;
- }
- public Double getPrice() {
- return price;
- }
- public void setPrice(Double price) {
- this.price = price;
- }
- public Long getTimestamp() {
- return timestamp;
- }
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
- @Override
- public String toString() {
- return "CepActiveUserBean{" +
- "username='" + username + '\'' +
- ", price=" + price +
- ", timestamp=" + timestamp +
- '}';
- }
- }
复制代码 运行效果
- map: {start=[CepActiveUserBean{username='100XX', price=100.0, timestamp=1597905235000}, CepActiveUserBean{username='100XX', price=200.0, timestamp=1597905236000}, CepActiveUserBean{username='100XX', price=300.0, timestamp=1597905237000}, CepActiveUserBean{username='100XX', price=400.0, timestamp=1597905238000}, CepActiveUserBean{username='100XX', price=500.0, timestamp=1597905239000}]}
- Process finished with exit code 0
复制代码 运行效果如下图所示:
案例3:超时未支付
业务需求
找出下单后10分钟没有支付的订单,数据源如下:
- new TimeOutPayBean(1L, "create", 1597905234000L),
- new TimeOutPayBean(1L, "pay", 1597905235000L),
- new TimeOutPayBean(2L, "create", 1597905236000L),
- new TimeOutPayBean(2L, "pay", 1597905237000L),
- new TimeOutPayBean(3L, "create", 1597905239000L)
复制代码
- 获取数据源
- 转 Watermark
- keyBy 转化
- 做出 Pattern (下单以后10分钟未支付)
- 模式匹配
- 取出匹配成功的数据
编写代码
- package icu.wzk;
- import org.apache.flink.api.common.eventtime.*;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.cep.CEP;
- import org.apache.flink.cep.PatternSelectFunction;
- import org.apache.flink.cep.PatternStream;
- import org.apache.flink.cep.PatternTimeoutFunction;
- import org.apache.flink.cep.pattern.Pattern;
- import org.apache.flink.cep.pattern.conditions.IterativeCondition;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.util.OutputTag;
- import java.util.List;
- import java.util.Map;
- public class FlinkCepTimeOutPay {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
- DataStreamSource<TimeOutPayBean> data = env.fromElements(
- new TimeOutPayBean(1L, "create", 1597905234000L),
- new TimeOutPayBean(1L, "pay", 1597905235000L),
- new TimeOutPayBean(2L, "create", 1597905236000L),
- new TimeOutPayBean(2L, "pay", 1597905237000L),
- new TimeOutPayBean(3L, "create", 1597905239000L)
- );
- DataStream<TimeOutPayBean> watermark = data
- .assignTimestampsAndWatermarks(new WatermarkStrategy<TimeOutPayBean>() {
- @Override
- public WatermarkGenerator<TimeOutPayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
- return new WatermarkGenerator<TimeOutPayBean>() {
- long maxTimestamp = Long.MAX_VALUE;
- long maxOutOfOrderness = 500L;
- @Override
- public void onEvent(TimeOutPayBean event, long eventTimestamp, WatermarkOutput output) {
- maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
- }
- @Override
- public void onPeriodicEmit(WatermarkOutput output) {
- output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
- }
- };
- }
- }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
- );
- KeyedStream<TimeOutPayBean, Long> keyedStream = watermark
- .keyBy(new KeySelector<TimeOutPayBean, Long>() {
- @Override
- public Long getKey(TimeOutPayBean value) throws Exception {
- return value.getUserId();
- }
- });
- // 逻辑处理代码
- OutputTag<TimeOutPayBean> orderTimeoutOutput = new OutputTag<>("orderTimeout") {};
- Pattern<TimeOutPayBean, TimeOutPayBean> pattern = Pattern
- .<TimeOutPayBean>begin("begin")
- .where(new IterativeCondition<TimeOutPayBean>() {
- @Override
- public boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {
- return timeOutPayBean.getOperation().equals("create");
- }
- })
- .followedBy("pay")
- .where(new IterativeCondition<TimeOutPayBean>() {
- @Override
- public boolean filter(TimeOutPayBean timeOutPayBean, Context<TimeOutPayBean> context) throws Exception {
- return timeOutPayBean.getOperation().equals("pay");
- }
- })
- .within(Time.seconds(600));
- PatternStream<TimeOutPayBean> patternStream = CEP.pattern(keyedStream, pattern);
- SingleOutputStreamOperator<TimeOutPayBean> result = patternStream
- .select(orderTimeoutOutput, new PatternTimeoutFunction<TimeOutPayBean, TimeOutPayBean>() {
- @Override
- public TimeOutPayBean timeout(Map<String, List<TimeOutPayBean>> map, long l) throws Exception {
- return map.get("begin").get(0);
- }
- }, new PatternSelectFunction<TimeOutPayBean, TimeOutPayBean>() {
- @Override
- public TimeOutPayBean select(Map<String, List<TimeOutPayBean>> map) throws Exception {
- return map.get("pay").get(0);
- }
- });
- // 输出结果
- // result.print();
- System.out.println("==============");
- DataStream<TimeOutPayBean> sideOutput = result
- .getSideOutput(orderTimeoutOutput);
- sideOutput.print();
- // 执行
- env.execute("FlinkCepTimeOutPay");
- }
- }
- class TimeOutPayBean {
- private Long userId;
- private String operation;
- private Long timestamp;
- public TimeOutPayBean(Long userId, String operation, Long timestamp) {
- this.userId = userId;
- this.operation = operation;
- this.timestamp = timestamp;
- }
- public Long getUserId() {
- return userId;
- }
- public void setUserId(Long userId) {
- this.userId = userId;
- }
- public String getOperation() {
- return operation;
- }
- public void setOperation(String operation) {
- this.operation = operation;
- }
- public Long getTimestamp() {
- return timestamp;
- }
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
- @Override
- public String toString() {
- return "TimeOutPayBean{" +
- "userId=" + userId +
- ", operation='" + operation + '\'' +
- ", timestamp=" + timestamp +
- '}';
- }
- }
复制代码 运行效果
控制台输出为:
- ==============
- TimeOutPayBean{userId=1, operation='pay', timestamp=1597905235000}
- TimeOutPayBean{userId=3, operation='create', timestamp=1597905239000}
- TimeOutPayBean{userId=2, operation='pay', timestamp=1597905237000}
- Process finished with exit code 0
复制代码 对应截图如下:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |