忿忿的泥巴坨 发表于 2024-11-22 15:25:06

【Flink】Flink 处理函数之根本处理函数(一),2024年最新步伐人生

stream.process(new MyProcessFunction())

这里 ProcessFunction 不是接口,而是一个抽象类,继承了 AbstractRichFunction;MyProcessFunction 是它的一个详细实现。以是全部的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。
代码实例:
public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);
      SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                              return element.getTimestamp();
                            }
                        })
                );
      stream.process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect(value.toString());
                if (value.getUser().equals("Mary")) {
                  out.collect(value.user + "click " + value.getUrl());
                } else if (value.getUser().equals("Alice")) {
                  out.collect(value.user);
                  out.collect(value.user);
                }
                System.out.println("timestamp:" + ctx.timestamp());
                System.out.println("watermark:" + ctx.timerService().currentWatermark());

                System.out.println(getRuntimeContext().getIndexOfThisSubtask());

            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }

            @Override
            public void close() throws Exception {
                super.close();
            }
      }).print();

      env.execute();
    }

运行结果:
https://i-blog.csdnimg.cn/blog_migrate/307c43b222ad5271a83998d187db1aff.png
这里第一次的水位线的值实在是个默认值,Long.MIN_VALUE + outOfOrdernessMillis + 1;
https://i-blog.csdnimg.cn/blog_migrate/3a8713cf9b3f3639b513bf38c34149dd.png
然后每次下一次的水位线都是上一次的timestamp - 1
2.1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型范例参数:I 表示 Input,也就是输入的数据范例;O 表示 Output,也就是处理完成之后输出的数据范例。
内部单独界说了两个方法:一个是必须要实现的抽象方法.processElement();另一个黑白抽象方法.onTimer()。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
...
public abstract void processElement(I value, Context ctx, Collector<O> out)
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
throws Exception {}
...
}

2.1.2.1 抽象方法.processElement()

用于处理元素,界说了处理的焦点逻辑。这个方法对流中的每个元素都会调用一次,参数包罗三个: 输入数据值 value,上下文 ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器 out 来界说的。


[*]value: 当前流中的输入元素,也就是正在处理的数据,范例与流中数据范例划一。
[*]cts:范例是ProcessFunction中界说的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。
Context 抽象类界说如下:
public abstract class Context {
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}




[*]out: “收集器”(范例为 Collector),用于返回输出数据。使用方式与 flatMap算子中的收集器完全一样,直接调用 out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。
ProcessFunction 可以轻松实现flatMap如许的根本转换功能(当然 map、filter 更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自界说状态(state)举行处理,这也就能实现聚合操纵的功能了。
2.1.2.2 非抽象方法.onTimer()

@Override


**自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。**

**深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!**

**因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。**
!(https://img-blog.csdnimg.cn/img_convert/675e320d67bf38b9616278d406e272be.png)
!(https://img-blog.csdnimg.cn/img_convert/7b748f90c663bc0691137163d229ee90.png)
!(https://img-blog.csdnimg.cn/img_convert/bce27bbb183424d80f14ec10f7c4ccf0.png)
!(https://img-blog.csdnimg.cn/img_convert/dd4d1547032c2a3e83f2f1af4a4009a7.png)
!(https://img-blog.csdnimg.cn/img_convert/3dac7852d6f513d30c3ea1afaa30571d.png)

**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!**

**由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**

**如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)**
!(https://img-blog.csdnimg.cn/img_convert/a114e3252cc1ac080a75778f55397ec7.png)

**一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**

)**
[外链图片转存中...(img-rMDrI5rA-1712996421603)]

**一个人可以走的很快,但一群人才能走的更远。不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎扫码加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【Flink】Flink 处理函数之根本处理函数(一),2024年最新步伐人生