【Flink】Flink 处理函数之根本处理函数(一),2024年最新步伐人生
stream.process(new MyProcessFunction())这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction;MyProcessFunction 是它的一个详细实现。以是全部的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
代码实例:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
})
);
stream.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(value.toString());
if (value.getUser().equals("Mary")) {
out.collect(value.user + "click " + value.getUrl());
} else if (value.getUser().equals("Alice")) {
out.collect(value.user);
out.collect(value.user);
}
System.out.println("timestamp:" + ctx.timestamp());
System.out.println("watermark:" + ctx.timerService().currentWatermark());
System.out.println(getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
}).print();
env.execute();
}
运行结果:
https://i-blog.csdnimg.cn/blog_migrate/307c43b222ad5271a83998d187db1aff.png
这里第一次的水位线的值实在是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;
https://i-blog.csdnimg.cn/blog_migrate/3a8713cf9b3f3639b513bf38c34149dd.png
然后每次下一次的水位线都是上一次的timestamp - 1
2.1.2 ProcessFunction 解析
抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型范例参数:I 表示 Input,也就是输入的数据范例;O 表示 Output,也就是处理完成之后输出的数据范例。
内部单独界说了两个方法:一个是必须要实现的抽象方法.processElement();另一个黑白抽象方法.onTimer()。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
...
public abstract void processElement(I value, Context ctx, Collector<O> out)
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
throws Exception {}
...
}
2.1.2.1 抽象方法.processElement()
用于处理元素,界说了处理的焦点逻辑。这个方法对流中的每个元素都会调用一次,参数包罗三个: 输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来界说的。
[*]value: 当前流中的输入元素,也就是正在处理的数据,范例与流中数据范例划一。
[*]cts:范例是ProcessFunction中界说的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。
Context 抽象类界说如下:
public abstract class Context {
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
[*]out: “收集器”(范例为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。
ProcessFunction 可以轻松实现flatMap如许的根本转换功能(当然 map、filter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自界说状态(state)举行处理,这也就能实现聚合操纵的功能了。
2.1.2.2 非抽象方法.onTimer()
@Override
**自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。**
**深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!**
**因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。**
!(https://img-blog.csdnimg.cn/img_convert/675e320d67bf38b9616278d406e272be.png)
!(https://img-blog.csdnimg.cn/img_convert/7b748f90c663bc0691137163d229ee90.png)
!(https://img-blog.csdnimg.cn/img_convert/bce27bbb183424d80f14ec10f7c4ccf0.png)
!(https://img-blog.csdnimg.cn/img_convert/dd4d1547032c2a3e83f2f1af4a4009a7.png)
!(https://img-blog.csdnimg.cn/img_convert/3dac7852d6f513d30c3ea1afaa30571d.png)
**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!**
**由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**
**如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)**
!(https://img-blog.csdnimg.cn/img_convert/a114e3252cc1ac080a75778f55397ec7.png)
**一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
)**
[外链图片转存中...(img-rMDrI5rA-1712996421603)]
**一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]