- stream.process(new MyProcessFunction())
复制代码 这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction;MyProcessFunction 是它的一个详细实现。以是全部的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
代码实例:
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
- .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
- .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
- @Override
- public long extractTimestamp(Event element, long recordTimestamp) {
- return element.getTimestamp();
- }
- })
- );
- stream.process(new ProcessFunction<Event, String>() {
- @Override
- public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
- out.collect(value.toString());
- if (value.getUser().equals("Mary")) {
- out.collect(value.user + "click " + value.getUrl());
- } else if (value.getUser().equals("Alice")) {
- out.collect(value.user);
- out.collect(value.user);
- }
- System.out.println("timestamp:" + ctx.timestamp());
- System.out.println("watermark:" + ctx.timerService().currentWatermark());
- System.out.println(getRuntimeContext().getIndexOfThisSubtask());
- }
- @Override
- public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
- super.onTimer(timestamp, ctx, out);
- }
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- }
- @Override
- public void close() throws Exception {
- super.close();
- }
- }).print();
- env.execute();
- }
复制代码 运行结果:
这里第一次的水位线的值实在是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;
然后每次下一次的水位线都是上一次的timestamp - 1
2.1.2 ProcessFunction 解析
抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型范例参数:I 表示 Input,也就是输入的数据范例;O 表示 Output,也就是处理完成之后输出的数据范例。
内部单独界说了两个方法:一个是必须要实现的抽象方法.processElement();另一个黑白抽象方法.onTimer()。
- public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
- ...
- public abstract void processElement(I value, Context ctx, Collector<O> out)
- throws Exception;
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
- throws Exception {}
- ...
- }
复制代码 2.1.2.1 抽象方法.processElement()
用于处理元素,界说了处理的焦点逻辑。这个方法对流中的每个元素都会调用一次,参数包罗三个: 输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来界说的。
- value: 当前流中的输入元素,也就是正在处理的数据,范例与流中数据范例划一。
- cts:范例是ProcessFunction中界说的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。
Context 抽象类界说如下:
- public abstract class Context {
- public abstract Long timestamp();
- public abstract TimerService timerService();
- public abstract <X> void output(OutputTag<X> outputTag, X value);
- }
复制代码
- out: “收集器”(范例为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。
ProcessFunction 可以轻松实现flatMap如许的根本转换功能(当然 map、filter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自界说状态(state)举行处理,这也就能实现聚合操纵的功能了。
2.1.2.2 非抽象方法.onTimer()
- @Override
- **自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。**
- **深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!**
- **因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。**
- ![img](https://img-blog.csdnimg.cn/img_convert/675e320d67bf38b9616278d406e272be.png)
- ![img](https://img-blog.csdnimg.cn/img_convert/7b748f90c663bc0691137163d229ee90.png)
- ![img](https://img-blog.csdnimg.cn/img_convert/bce27bbb183424d80f14ec10f7c4ccf0.png)
- ![img](https://img-blog.csdnimg.cn/img_convert/dd4d1547032c2a3e83f2f1af4a4009a7.png)
- ![img](https://img-blog.csdnimg.cn/img_convert/3dac7852d6f513d30c3ea1afaa30571d.png)
- **既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!**
- **由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**
- **如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)**
- ![img](https://img-blog.csdnimg.cn/img_convert/a114e3252cc1ac080a75778f55397ec7.png)
- **一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
- )**
- [外链图片转存中...(img-rMDrI5rA-1712996421603)]
- **一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |