Flink CEP能够从事件流中匹配特定的事件模式 (事件中定义的一组条件和规则), 并对其举行相应的操作和处置惩罚。例如:在金融范畴中检测欺诈交易行为时,必要考虑多笔交易之间的时序和金额关系。在物联网范畴中必要在设备运行状态事件流中实时监测设备状态变化及设备之间协作。
1. Flink CEP开辟步骤
利用Flink CEP对事件举行模式匹配时,从四个步骤入手
- #1.定义输入事件流
- DataStream<Event> input = ...;
- #2.定义模式匹配规则
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
- .where(SimpleCondition.of(event -> event.getId() == 42))
- .next("middle")
- .subtype(SubEvent.class)
- .where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0))
- .followedBy("end")
- .where(SimpleCondition.of(event -> event.getName().equals("end")));
- #3.模式匹配应用在事件流上检测
- PatternStream<Event> patternStream = CEP.pattern(input, pattern);
- #4.对匹配的复杂事件进行结果输出
- DataStream<Alert> result = patternStream.process(
- new PatternProcessFunction<Event, Alert>() {
- @Override
- public void processMatch(Map<String, List<Event>> pattern,
- Context ctx,Collector<Alert> out) throws Exception {
- out.collect(createAlertFrom(pattern));
- }
- });
复制代码 2.Pattern API设置事件模式
利用 量词和 条件 设置事件模式
2.1 量词
oneOrMore()
指定模式期望匹配到的事件至少出现一次。假设pattern表示匹配a事件,pattern.oneOrMore()表示匹配a事件一次到多次。
times(#ofTimes)
指定模式期望匹配到的事件恰好出现的次数。假设pattern表示匹配a事件,pattern.times(2)表示匹配a事件2次。
times(#fromTimes,$toTimes)
指定模式期望匹配到的事件出现次数在#fromTimes和#toTimes之间。假设pattern表示匹配a事件,pattern.times(2,4)表示匹配a事件2、3、4次。
timesOrMore(#times)
指定模式期望匹配到的事件至少出现times次。假设pattern表示匹配a事件,pattern.timesOrMore(2)表示匹配a事件2次到多次。
optional()
指定该模式是可选的,表示该模式可以出现也可以不出现,对前面这些量词都实用,在多个单独模式组合成组合模式中利用才故意义。假设pattern表示匹配a事件,pattern.times(2,4).optional()表示匹配a事件0、2、3、4次。
greedy()
指定该模式是贪心的,会重复尽可能多的次数,只对量词实用,不支持模式组,在 多个单独模式组合成组合模式 中利用才故意义。假设pattern表示匹配a事件,pattern.times(2,4).greedy()表示尽可能多的匹配a事件,如果输入4次a,那么2次a和3次a事件都不算匹配事件
2.2 条件
2.2.1 简单条件
简单条件非常简单,从事件本身中获取信息来举行判断,只必要考虑当前事件本身即可。
- start.where(SimpleCondition.of(value -> value.getName().startsWith("foo")));
复制代码 2.2.2 组合条件
组合条件是将简单条件举行归并,通常情况下也可以利用where方法举行条件的组合,默认每个条件通过AND逻辑相连。如果必要利用OR逻辑,直接利用or方法毗连条件即可。
- pattern
- .where(SimpleCondition.of(value -> ... /*一些判断条件*/))
- .or(SimpleCondition.of(value -> ... /*一些判断条件*/));
复制代码 2.2.3 迭代条件
迭代条件必要在 当前事件处置惩罚中获取到该模式先前匹配的事件 举行对比或处置惩罚才能决定当前事件是否被匹配,这种必要对该模式先前匹配事件举行处置惩罚作为判断当前事件是否匹配模式的条件叫做迭代条件。
- middle.oneOrMore()
- .subtype(SubEvent.class)
- .where(new IterativeCondition<SubEvent>() {
- @Override
- public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
- //判断当前事件是否以foo开头
- if (!value.getName().startsWith("foo")) {
- return false;
- }
- double sum = value.getPrice();
- //获取当前模式先前已经匹配的事件,计算sum总和
- for (Event event : ctx.getEventsForPattern("middle")) {
- sum += event.getPrice();
- }
- return Double.compare(sum, 5.0) < 0;
- }
- });
复制代码 3. 事件模式的种类
3.1 单例模式
一个事件的匹配,比如:基站日志通话数据流匹配通话时长大于10s的事件
3.2 组合模式
多个单独模式组合在一起就形成了模式序列,即:组合模式。
可以对单独模式通过调用next、followedBy、followedByAny方法举行毗连,不同方法表示了事件之间不同的连续战略
分别对应严格邻近(Strict Contiguity)、宽松邻近(Relaxed Contiguity)、非确定宽松邻近(Non-Deterministic Relaxed Contiguity)三种事件连续战略。
3.2.1 严格邻近
严格邻近中期望所有匹配的事件严格一个接一个出现,中心没有任何不匹配的事件
3.2.2 宽松邻近
宽松邻近并不会像严格邻近这么严格,两个匹配的事件之间可以有其他不匹配的事件出现,也就是说宽松邻近会忽略没有成功匹配的事件直到下一个匹配事件出现为止。
3.2.3 非确定宽松邻近
非确定宽松邻近战略相比于宽松邻近战略更加宽松,可以忽略已经匹配的事件,也就是说可以重复利用匹配过的事件,非确定宽松邻近比宽松邻近匹配的结果每每更多。
3.3 循环模式
某个单独模式利用了oneOrMore()/times这些量词形成循环模式,默认循环中匹配各个事件之间连续战略为宽松邻近战略
4. 跳过战略
Flink CEP中对于给定的一个模式匹配,同一个事件可能会分配到多个匹配结果中。例如,复杂事件模式为a+ b (表示a事件可以有1到多个,再往后可以跟上1个b事件),输入数据流:a1,a2,b 输出结果会有{a1,a2,b、a1,b、a2,b} ,a1事件就会输出到多个匹配结果中,一些情况下我们希望对这些匹配的结果举行精简,可以利用Flink CEP中提供的“匹配后跳过战略”(After Match Skip Strategy)。
- AfterMatchSkipStrategy.noSkip()
- AfterMatchSkipStrategy.skipToNext()
- AfterMatchSkipStrategy.skipPastLastEvent()
- AfterMatchSkipStrategy.skipToFirst(patternName)
- AfterMatchSkipStrategy.skipToLast(patternName)
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |