【Flink】Flink 处理函数之根本处理函数(一),2024年最新步伐人生 ...

打印 上一主题 下一主题

主题 796|帖子 796|积分 2388

  1. stream.process(new MyProcessFunction())
复制代码
这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction;MyProcessFunction 是它的一个详细实现。以是全部的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
代码实例:
  1. public static void main(String[] args) throws Exception {
  2.         StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         env.setParallelism(1);
  4.         SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
  5.                 .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
  6.                         .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
  7.                             @Override
  8.                             public long extractTimestamp(Event element, long recordTimestamp) {
  9.                                 return element.getTimestamp();
  10.                             }
  11.                         })
  12.                 );
  13.         stream.process(new ProcessFunction<Event, String>() {
  14.             @Override
  15.             public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
  16.                 out.collect(value.toString());
  17.                 if (value.getUser().equals("Mary")) {
  18.                     out.collect(value.user + "click " + value.getUrl());
  19.                 } else if (value.getUser().equals("Alice")) {
  20.                     out.collect(value.user);
  21.                     out.collect(value.user);
  22.                 }
  23.                 System.out.println("timestamp:" + ctx.timestamp());
  24.                 System.out.println("watermark:" + ctx.timerService().currentWatermark());
  25.                 System.out.println(getRuntimeContext().getIndexOfThisSubtask());
  26.             }
  27.             @Override
  28.             public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
  29.                 super.onTimer(timestamp, ctx, out);
  30.             }
  31.             @Override
  32.             public void open(Configuration parameters) throws Exception {
  33.                 super.open(parameters);
  34.             }
  35.             @Override
  36.             public void close() throws Exception {
  37.                 super.close();
  38.             }
  39.         }).print();
  40.         env.execute();
  41.     }
复制代码
运行结果:

这里第一次的水位线的值实在是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;

然后每次下一次的水位线都是上一次的timestamp - 1
2.1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型范例参数:I 表示 Input,也就是输入的数据范例;O 表示 Output,也就是处理完成之后输出的数据范例。
内部单独界说了两个方法:一个是必须要实现的抽象方法.processElement();另一个黑白抽象方法.onTimer()。
  1. public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
  2. ...
  3. public abstract void processElement(I value, Context ctx, Collector<O> out)
  4. throws Exception;
  5. public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
  6. throws Exception {}
  7. ...
  8. }
复制代码
2.1.2.1 抽象方法.processElement()

用于处理元素,界说了处理的焦点逻辑。这个方法对流中的每个元素都会调用一次,参数包罗三个: 输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来界说的。


  • value: 当前流中的输入元素,也就是正在处理的数据,范例与流中数据范例划一。
  • cts:范例是ProcessFunction中界说的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。
Context 抽象类界说如下:
  1. public abstract class Context {
  2. public abstract Long timestamp();
  3. public abstract TimerService timerService();
  4. public abstract <X> void output(OutputTag<X> outputTag, X value);
  5. }
复制代码


  • out: “收集器”(范例为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。
ProcessFunction 可以轻松实现flatMap如许的根本转换功能(当然 map、filter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自界说状态(state)举行处理,这也就能实现聚合操纵的功能了。
2.1.2.2 非抽象方法.onTimer()

  1. @Override
  2. **自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。**
  3. **深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!**
  4. **因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。**
  5. ![img](https://img-blog.csdnimg.cn/img_convert/675e320d67bf38b9616278d406e272be.png)
  6. ![img](https://img-blog.csdnimg.cn/img_convert/7b748f90c663bc0691137163d229ee90.png)
  7. ![img](https://img-blog.csdnimg.cn/img_convert/bce27bbb183424d80f14ec10f7c4ccf0.png)
  8. ![img](https://img-blog.csdnimg.cn/img_convert/dd4d1547032c2a3e83f2f1af4a4009a7.png)
  9. ![img](https://img-blog.csdnimg.cn/img_convert/3dac7852d6f513d30c3ea1afaa30571d.png)
  10. **既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!**
  11. **由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**
  12. **如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)**
  13. ![img](https://img-blog.csdnimg.cn/img_convert/a114e3252cc1ac080a75778f55397ec7.png)
  14. **一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
  15. )**
  16. [外链图片转存中...(img-rMDrI5rA-1712996421603)]
  17. **一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表