大数据技术之Flink

打印 上一主题 下一主题

主题 1039|帖子 1039|积分 3117

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
第1章 Flink概述
1.1 Flink是什么
1.2 Flink特点
1.3 Flink vs SparkStreaming
表 Flink 和 Streaming对比
Flink Streaming
盘算模型 流盘算 微批处理处罚
时间语义 变乱时间、处理处罚时间 处理处罚时间
窗口 多、灵活 少、不灵活(窗口必须是批次的整数倍)
状态 有 没有
流式SQL 有 没有
1.4 Flink的应用场景
1.5 Flink分层API
第2章 Flink快速上手
2.1 创建项目
在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。起首我们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。
1)创建工程
(1)打开IntelliJ IDEA,创建一个Maven工程。
(2)将这个Maven工程命名为FlinkTutorial。
(3)选定这个Maven工程地点存储路径,并点击Finish,Maven工程即创建成功。
2)添加项目依赖
在项目的pom文件中,添加Flink的依赖,包罗flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。

<flink.version>1.17.0</flink.version>

  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.apache.flink</groupId>
  4.         <artifactId>flink-streaming-java</artifactId>
  5.         <version>${flink.version}</version>
  6.     </dependency>
  7. <dependency>
  8.         <groupId>org.apache.flink</groupId>
  9.         <artifactId>flink-clients</artifactId>
  10.         <version>${flink.version}</version>
  11. </dependency>
复制代码
2.2 WordCount代码编写 需求:统计一段文字中,每个单词出现的频次。 环境准备:在src/main/java目录下,新建一个包,命名为com.atguigu.wc。 2.2.1 批处理处罚 批处理处罚根本思绪:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。 1)数据准备 (1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt (2)在words.txt中输入一些文字,例如: hello flink hello world hello java 2)代码编写 (1)在com.atguigu.wc包下新建Java类BatchWordCount,在静态main方法中编写代码。具体代码实现如下: import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWordCount {
  1. public static void main(String[] args) throws Exception {
  2.     // 1. 创建执行环境
  3.     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  4.    
  5.     // 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)
  6.     DataSource<String> lineDS = env.readTextFile("input/words.txt");
  7.    
  8.     // 3. 转换数据格式
  9.     FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  10.         @Override
  11.         public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
  12.             String[] words = line.split(" ");
  13.             for (String word : words) {
  14.                 out.collect(Tuple2.of(word,1L));
  15.             }
  16.         }
  17.     });
  18.     // 4. 按照 word 进行分组
  19.     UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
  20.    
  21.     // 5. 分组内聚合统计
  22.     AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
  23.     // 6. 打印结果
  24.     sum.print();
  25. }
复制代码
}
(2)输出
(flink,1)
(world,1)
(hello,3)
(java,1)
需要留意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理处罚转换,是看作数据集来进行操纵的。毕竟上Flink自己是流批同一的处理处罚架构,批量的数据集本质上也是流,没有须要用两套不同的API来实现。以是从Flink 1.12开始,官方保举的做法是直接使用DataStream API,在提交任务时通过将实验模式设为BATCH来进行批处理处罚:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
如许,DataSet API就没什么用了,在实际应用中我们只要维护一套DataStream API就可以。这里只是为了方便各人理解,我们依然用DataSet API做了批处理处罚的实现。
2.2.2 流处理处罚
对于Flink而言,流才是整个处理处罚逻辑的底层核心,以是流批同一之后的DataStream API更增强盛,可以直接处理处罚批处理处罚和流处理处罚的所有场景。
下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理处罚。
1)读取文件
我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。团体思绪与之前的批处理处罚非常类似,代码模式也根本同等。
在com.atguigu.wc包下新建Java类StreamWordCount,在静态main方法中编写代码。具体代码实现如下:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class StreamWordCount {
  1. public static void main(String[] args) throws Exception {
  2.     // 1. 创建流式执行环境
  3.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.    
  5.     // 2. 读取文件
  6.     DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
  7.    
  8.     // 3. 转换、分组、求和,得到统计结果
  9.     SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  10.         @Override
  11.         public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
  12.             String[] words = line.split(" ");
  13.             for (String word : words) {
  14.                 out.collect(Tuple2.of(word, 1L));
  15.             }
  16.         }
  17.     }).keyBy(data -> data.f0)
  18.        .sum(1);
  19.     // 4. 打印
  20.     sum.print();
  21.    
  22.     // 5. 执行
  23.     env.execute();
  24. }
复制代码
}
输出:
3> (java,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
13> (flink,1)
9> (world,1)
紧张观察与批处理处罚程序BatchWordCount的不同:
 创建实验环境的不同,流处理处罚程序使用的是StreamExecutionEnvironment。
 转换处理处罚之后,得到的数据对象类型不同。
 分组操纵调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
 代码末尾需要调用env的execute方法,开始实验任务。
2)读取socket文本流
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有竣事,这就要求我们需要持续地处理处罚捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。
(1)将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。具体代码实现如下:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketStreamWordCount {
  1. public static void main(String[] args) throws Exception {
  2.     // 1. 创建流式执行环境
  3.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.    
  5.     // 2. 读取文本流:hadoop102表示发送端主机名、7777表示端口号
  6.     DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);
  7.    
  8.     // 3. 转换、分组、求和,得到统计结果
  9.     SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
  10.         String[] words = line.split(" ");
  11.         for (String word : words) {
  12.             out.collect(Tuple2.of(word, 1L));
  13.         }
  14.     }).returns(Types.TUPLE(Types.STRING, Types.LONG))
  15.             .keyBy(data -> data.f0)
  16.             .sum(1);
  17.     // 4. 打印
  18.     sum.print();
  19.    
  20.     // 5. 执行
  21.     env.execute();
  22. }
复制代码
}
(2)在Linux环境的主机hadoop102上,实验下列命令,发送数据进行测试
[atguigu@hadoop102 ~]$ nc -lk 7777
留意:要先启动端口,后启动StreamWordCount程序,否则会报超时毗连异常。
(3)启动StreamWordCount程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,由于Flink的流处理处罚是变乱驱动的,当前程序会不停处于监听状态,只有接收到数据才会实验任务、输出统计结果。
(4)从hadoop102发送数据
①在hadoop102主机中,输入“hello flink”,输出如下内容
13> (flink,1)
5> (hello,1)
②再输入“hello world”,输出如下内容
2> (world,1)
5> (hello,2)
阐明:
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,主动获取类型信息,从而得到对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),主动提取的信息是不敷精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重修出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或进步其性能。
由于对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地剖析出完整数据。
第3章 Flink部署
3.1 集群角色
3.2 Flink集群搭建
3.2.1 集群启动
0)集群规划
表3-1 集群角色分配
节点服务器 hadoop102 hadoop103 hadoop104
角色 JobManager
TaskManager TaskManager TaskManager
具体安装部署步骤如下:
1)下载并解压安装包
(1)下载安装包flink-1.17.0-bin-scala_2.12.tgz,将该jar包上传到hadoop102节点服务器的/opt/software路径上。
(2)在/opt/software路径上解压flink-1.17.0-bin-scala_2.12.tgz到/opt/module路径上。
[atguigu@hadoop102 software]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
2)修改集群设置
(1)进入conf路径,修改flink-conf.yaml文件,指定hadoop102节点服务器为JobManager
[atguigu@hadoop102 conf]$ vim flink-conf.yaml
修改如下内容:
JobManager节点地点.

jobmanager.rpc.address: hadoop102
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop102
rest.bind-address: 0.0.0.0
TaskManager节点地点.需要设置为当前呆板名

taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop102
(2)修改workers文件,指定hadoop102、hadoop103和hadoop104为TaskManager
[atguigu@hadoop102 conf]$ vim workers
修改如下内容:
hadoop102
hadoop103
hadoop104
(3)修改masters文件
[atguigu@hadoop102 conf]$ vim masters
修改如下内容:
hadoop102:8081
(4)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化设置,紧张设置项如下:
 jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行设置,包罗JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调解。
 taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行设置,包罗JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调解。
 taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数目进行设置,默认为1,可根据TaskManager地点的呆板能够提供给Flink的CPU数目决定。所谓Slot就是TaskManager中具体运行一个任务所分配的盘算资源。
 parallelism.default:Flink任务实验的并行度,默认为1。优先级低于代码中进行的并行度设置和任务提交时使用参数指定的并行度数目。
关于Slot和并行度的概念,我们会在下一章做具体解说。
3)分发安装目录
(1)设置修改完毕后,将Flink安装目录发给另外两个节点服务器。
[atguigu@hadoop102 module]$ xsync flink-1.17.0/
(2)修改hadoop103的 taskmanager.host
[atguigu@hadoop103 conf]$ vim flink-conf.yaml
修改如下内容:
TaskManager节点地点.需要设置为当前呆板名

taskmanager.host: hadoop103
(3)修改hadoop104的 taskmanager.host
[atguigu@hadoop104 conf]$ vim flink-conf.yaml
修改如下内容:
TaskManager节点地点.需要设置为当前呆板名

taskmanager.host: hadoop104
4)启动集群
(1)在hadoop102节点服务器上实验start-cluster.sh启动Flink集群:
[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
(2)查看进程情况:
[atguigu@hadoop102 flink-1.17.0]$ jpsall
=============== hadoop102 ===============
4453 StandaloneSessionClusterEntrypoint
4458 TaskManagerRunner
4533 Jps
=============== hadoop103 ===============
2872 TaskManagerRunner
2941 Jps
=============== hadoop104 ===============
2948 Jps
2876 TaskManagerRunner
5)访问Web UI
启动成功后,同样可以访问http://hadoop102:8081对flink集群和任务进行监控管理。
这里可以明显看到,当前集群的TaskManager数目为3;由于默认每个TaskManager的Slot数目为1,以是总Slot数和可用Slot数都为3。
3.2.2 向集群提交作业
在上一章中,我们已经编写读取socket发送的单词并统计单词的个数程序案例。本节我们将以该程序为例,演示如何将任务提交到集群中进行实验。具体步骤如下。
1)环境准备
在hadoop102中实验以下命令启动netcat。
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
2)程序打包
(1)在我们编写的Flink入门程序的pom.xml文件中添加打包插件的设置,具体如下:



org.apache.maven.plugins
maven-shade-plugin
3.2.4


package

shade




com.google.code.findbugs:jsr305
org.slf4j:
log4j:





:

META-INF/.SF
META-INF/.DSA
META-INF/*.RSA





[INFO] BUILD SUCCESS

打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,FlinkTutorial-1.0-SNAPSHOT.jar和FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,由于集群中已经具备任务运行所需的所有依赖,以是建议使用FlinkTutorial-1.0-SNAPSHOT.jar。
3)在Web UI上提交作业
(1)任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包,如下图所示。
JAR包上传完成,如下图所示:
(2)点击该JAR包,出现任务设置页面,进行相应设置。
紧张设置程序入口主类的全类名,任务运行的并行度,任务运行所需的设置参数和保存点路径等,如下图所示,设置完成后,即可点击按钮“Submit”,将任务提交到集群运行。
(3)任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况。
(4)测试
①在socket端口中输入hello
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
hello
②先点击Task Manager,然后点击右侧的192.168.10.104服务器节点
  1. ③点击Stdout,就可以看到hello单词的统计
  2. 注意:如果hadoop104节点没有统计单词数据,可以去其他TaskManager节点查看。
复制代码
(4)点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”竣事任务运行。
4)命令行提交作业
除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把jar包直接上传到目录flink-1.17.0下
(1)起首需要启动集群。
[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
(2)在hadoop102中实验以下命令启动netcat。
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
(3)将flink程序运行jar包上传到/opt/module/flink-1.17.0路径。
(4)进入到flink的安装路径下,在命令行使用flink run命令提交作业。
[atguigu@hadoop102 flink-1.17.0]$ bin/flink run -m hadoop102:8081 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
这里的参数 -m指定了提交到的JobManager,-c指定了入口类。
(5)在欣赏器中打开Web UI,http://hadoop102:8081查看应用实验情况。
用netcat输入数据,可以在TaskManager的尺度输出(Stdout)看到对应的统计结果。
(6)在/opt/module/flink-1.17.0/log路径中,可以查看TaskManager节点。
[atguigu@hadoop102 log]$ cat flink-atguigu-standalonesession-0-hadoop102.out
(hello,1)
(hello,2)
(flink,1)
(hello,3)
(scala,1)
3.3 部署模式
在一些应用场景中,对于集群资源分配和占用的方式,大概会有特定的需求。Flink为各种场景提供了不同的部署模式,紧张有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。
它们的区别紧张在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里实验——客户端(Client)照旧JobManager。
3.3.1 会话模式(Session Mode)
3.3.2 单作业模式(Per-Job Mode)
3.3.3 应用模式(Application Mode)
这里我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台团结起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者的场景,具体介绍Flink的部署方式。
3.4 Standalone运行模式(了解)
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,大概出现故障,没有主动扩展或重分配资源的保证,必须手动处理处罚。以是独立模式一般只用在开发测试或作业非常少的场景下。
3.4.1 会话模式部署
我们在第3.2节用的就是Standalone集群的会话模式部署。
提前启动集群,并通过Web页面客户端提交任务(可以多个任务,但是集群资源固定)。
3.4.2 单作业模式部署
Flink的Standalone集群并不支持单作业模式部署。由于单作业模式需要借助一些资源管理平台。
3.4.3 应用模式部署
应用模式下不会提前创建集群,以是不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager。
具体步骤如下:
(0)环境准备。在hadoop102中实验以下命令启动netcat。
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
(1)进入到Flink的安装路径下,将应用程序的jar包放到lib/目录下。
[atguigu@hadoop102 flink-1.17.0]$ mv FlinkTutorial-1.0-SNAPSHOT.jar lib/
(2)实验以下命令,启动JobManager。
[atguigu@hadoop102 flink-1.17.0]$ bin/standalone-job.sh start --job-classname com.atguigu.wc.SocketStreamWordCount
这里我们直接指定作业入口类,脚本会到lib目录扫描所有的jar包。
(3)同样是使用bin目录下的脚本,启动TaskManager。
[atguigu@hadoop102 flink-1.17.0]$ bin/taskmanager.sh start
(4)在hadoop102上模拟发送单词数据。
[atguigu@hadoop102 ~]$ nc -lk 7777
hello
(5)在hadoop102:8081地点中观察输出数据
(6)如果希望停掉集群,同样可以使用脚本,命令如下。
[atguigu@hadoop102 flink-1.17.0]$ bin/taskmanager.sh stop
[atguigu@hadoop102 flink-1.17.0]$ bin/standalone-job.sh stop
3.5 YARN运行模式(重点)
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数目动态分配TaskManager资源。
3.5.1 相关准备和设置
在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。
具体设置步骤如下:
(1)设置环境变量,增长环境变量设置如下:
$ sudo vim /etc/profile.d/my_env.sh
HADOOP_HOME=/opt/module/hadoop-3.3.4
export PATH= P A T H : PATH: PATH:HADOOP_HOME/bin: H A D O O P H O M E / s b i n e x p o r t H A D O O P C O N F D I R = HADOOP_HOME/sbin export HADOOP_CONF_DIR= HADOOPH​OME/sbinexportHADOOPC​ONFD​IR={HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=hadoop classpath
(2)启动Hadoop集群,包罗HDFS和YARN。
[atguigu@hadoop102 hadoop-3.3.4]$ start-dfs.sh
[atguigu@hadoop103 hadoop-3.3.4]$ start-yarn.sh
(3)在hadoop102中实验以下命令启动netcat。
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
3.5.2 会话模式部署
YARN的会话模式与独立集群略有不同,需要起首申请一个YARN会话(YARN Session)来启动Flink集群。具体步骤如下:
1)启动集群
(1)启动Hadoop集群(HDFS、YARN)。
(2)实验脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。
[atguigu@hadoop102 flink-1.17.0]$ bin/yarn-session.sh -nm test
可用参数解读:
 -d:分离模式,如果你不想让Flink YARN客户端不停前台运行,可以使用这个参数,纵然关掉当前对话窗口,YARN session也可以后台运行。
 -jm(–jobManagerMemory):设置JobManager所需内存,默认单位MB。
 -nm(–name):设置在YARN UI界面上显示的任务名。
 -qu(–queue):指定YARN队列名。
 -tm(–taskManager):设置每个TaskManager所使用内存。
留意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数目和slot数目,YARN会按照需求动态分配TaskManager和slot。以是从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。
YARN Session启动之后会给出一个Web UI地点以及一个YARN application ID,如下所示,用户可以通过Web UI大概命令行两种方式提交作业。
2022-11-17 15:20:52,711 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop104:40825 of application ‘application_1668668287070_0005’.
JobManager Web Interface: http://hadoop104:40825
2)提交作业
(1)通过Web UI提交作业
这种方式比较简朴,与上文所述Standalone部署模式根本雷同。
(2)通过命令行提交作业
① 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群。
② 实验以下命令将该任务提交到已经开启的Yarn-Session中运行。
[atguigu@hadoop102 flink-1.17.0]$ bin/flink run
-c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
客户端可以自行确定JobManager的地点,也可以通过-m大概-jobmanager参数指定JobManager的地点,JobManager的地点在YARN Session的启动页面中可以找到。
③ 任务提交成功后,可在YARN的Web UI界面查看运行情况。hadoop103:8088。
从上图中可以看到我们创建的Yarn-Session实际上是一个Yarn的Application,并且有唯一的Application ID。
④也可以通过Flink的Web UI页面查看提交任务的运行情况,如下图所示。
3.5.3 单作业模式部署
在YARN环境中,由于有了外部平台做资源调治,以是我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。
(1)实验命令提交作业。
[atguigu@hadoop102 flink-1.17.0]$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
留意:如果启动过程中报如下异常。
Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
办理办法:在flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml设置文件中设置
[atguigu@hadoop102 conf]$ vim flink-conf.yaml
classloader.check-leaked-classloader: false
(2)在YARN的ResourceManager界面查看实验情况。
点击可以打开Flink Web UI页面进行监控,如下图所示:
(3)可以使用命令行查看或取消作业,命令如下。
[atguigu@hadoop102 flink-1.17.0]$ bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
[atguigu@hadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
这里的application_XXXX_YY是当前应用的ID,是作业的ID。留意如果取消作业,整个Flink集群也会停掉。
3.5.4 应用模式部署
应用模式同样非常简朴,与单作业模式类似,直接实验flink run-application命令即可。
1)命令行提交
(1)实验命令提交作业。
[atguigu@hadoop102 flink-1.17.0]$ bin/flink run-application -t yarn-application -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
(2)在命令行中查看或取消作业。
[atguigu@hadoop102 flink-1.17.0]$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
[atguigu@hadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY
2)上传HDFS提交
可以通过yarn.provided.lib.dirs设置选项指定位置,将flink的依赖上传到远程。
(1)上传flink的lib和plugins到HDFS上
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -mkdir /flink-dist
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -put lib/ /flink-dist
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -put plugins/ /flink-dist
(2)上传自己的jar包到HDFS
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -mkdir /flink-jars
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -put FlinkTutorial-1.0-SNAPSHOT.jar /flink-jars
(3)提交作业
[atguigu@hadoop102 flink-1.17.0]$ bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs=“hdfs://hadoop102:8020/flink-dist” -c com.atguigu.wc.SocketStreamWordCount hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar
这种方式下,flink自己的依赖和用户jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。
3.6 K8S 运行模式(了解)
容器化部署是现在业界流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。根本原理与YARN是类似的,具体设置可以参见官网阐明,这里我们就不做过多解说了。
3.7 汗青服务器
运行 Flink job 的集群一旦制止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位题目了,以是如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些题目。
Flink提供了汗青服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有看成业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出照旧异常退出。
此外,它对外提供了 REST API,它担当 HTTP 请求并使用 JSON 数据进行响应。Flink 任务制止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务制止后可以对任务统计信息进行查询。比如:末了一次的 Checkpoint、任务运行时的相关设置。
1)创建存储目录
hadoop fs -mkdir -p /logs/flink-job
2)在 flink-config.yaml中添加如下设置
jobmanager.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job
historyserver.web.address: hadoop102
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
3)启动汗青服务器
bin/historyserver.sh start
4)制止汗青服务器
bin/historyserver.sh stop
5)在欣赏器地点栏输入:http://hadoop102:8082 查看已经制止的 job 的统计信息
第4章 Flink运行时架构
4.1 系统架构
1)作业管理器(JobManager)
JobManager是一个Flink集群中任务管理和调治的核心,是控制应用实验的主进程。也就是说,每个应用都应该被唯一的JobManager所控制实验。
JobManger又包含3个不同的组件。
(1)JobMaster
JobMaster是JobManager中最核心的组件,负责处理处罚单独的作业(Job)。以是JobMaster和具体的Job是一一对应的,多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster。需要留意在早期版本的Flink中,没有JobMaster的概念;而JobManager的概念范围较小,实际指的就是现在所说的JobMaster。
在作业提交时,JobMaster会先接收到要实验的应用。JobMaster会把JobGraph转换成一个物理层面的数据流图,这个图被叫作“实验图”(ExecutionGraph),它包含了所有可以并发实验的任务。JobMaster会向资源管理器(ResourceManager)发出请求,申请实验任务须要的资源。一旦它获取到了足够的资源,就会将实验图分发到真正运行它们的TaskManager上。
而在运行过程中,JobMaster会负责所有需要中央协调的操纵,比如说检查点(checkpoints)的协调。
(2)资源管理器(ResourceManager)
ResourceManager紧张负责资源的分配和管理,在Flink 集群中只有一个。所谓“资源”,紧张是指TaskManager的任务槽(task slots)。任务槽就是Flink集群中的资源调配单元,包含了呆板用来实验盘算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上实验。
这里留意要把Flink内置的ResourceManager和其他资源管理平台(比如YARN)的ResourceManager区分开。
(3)分发器(Dispatcher)
Dispatcher紧张负责提供一个REST接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的JobMaster 组件。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业实验的信息。Dispatcher在架构中并不是必需的,在不同的部署模式下大概会被忽略掉。
2)任务管理器(TaskManager)
TaskManager是Flink中的工作进程,数据流的具体盘算就是它来做的。Flink集群中必须至少有一个TaskManager;每一个TaskManager都包含了一定数目的任务槽(task slots)。Slot是资源调治的最小单位,slot的数目限制了TaskManager能够并行处理处罚的任务数目。
启动之后,TaskManager会向资源管理器注册它的slots;收到资源管理器的指令后,TaskManager就会将一个大概多个槽位提供给JobMaster调用,JobMaster就可以分配任务来实验了。
在实验过程中,TaskManager可以缓冲数据,还可以跟其他运行同一应用的TaskManager互换数据。
4.2 核心概念
4.2.1 并行度(Parallelism)
1)并行子任务和并行度
当要处理处罚的数据量非常大时,我们可以把一个算子操纵,“复制”多份到多个节点,数据来了之后就可以到其中任意一个实验。如许一来,一个算子任务就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行盘算。
在Flink实验过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地实验。
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。如许,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子大概具有不同的并行度。
例如:如上图所示,当前数据流中有source、map、window、sink四个算子,其中sink算子的并行度为1,其他算子的并行度都为2。以是这段流处理处罚程序的并行度就是2。
2)并行度的设置
在Flink中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
(1)代码中设置
我们在代码中,可以很简朴地在算子后跟着调用setParallelism()方法,来设置当前算子的并行度:
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
这种方式设置的并行度,只针对当前算子有效。
另外,我们也可以直接调用实验环境的setParallelism()方法,全局设定并行度:
env.setParallelism(2);
如许代码中所有算子,默认的并行度就都为2了。我们一般不会在程序中设置全局并行度,由于如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
这里要留意的是,由于keyBy不是算子,以是无法对keyBy设置并行度。
(2)提交应用时设置
在使用flink run命令提交应用时,可以增长-p参数来指定当前应用程序实验的并行度,它的作用类似于实验环境的全局设置:
bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount
./FlinkTutorial-1.0-SNAPSHOT.jar
如果我们直接在Web UI上提交作业,也可以在对应输入框中直接添加并行度。
(3)设置文件中设置
我们还可以直接在集群的设置文件flink-conf.yaml中直接更改默认并行度:
parallelism.default: 2
这个设置对于整个集群上提交的所有作业有效,初始值为1。无论在代码中设置、照旧提交时的-p参数,都不是必须的;以是在没有指定并行度的时间,就会接纳设置文件中的集群默认并行度。在开发环境中,没有设置文件,默认并行度就是当前呆板的CPU核心数。
4.2.2 算子链(Operator Chain)
1)算子间的数据传输
一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通(forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。
(1)一对一(One-to-one,forwarding)
这种模式下,数据流维护着分区以及元素的顺序。比如图中的source和map算子,source算子读取数据之后,可以直接发送给map算子做处理处罚,它们之间不需要重新分区,也不需要调解数据的顺序。这就意味着map 算子的子任务,看到的元素个数温顺序跟source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。
(2)重分区(Redistributing)
在这种模式下,数据流的分区会发生改变。比如图中的map和背面的keyBy/window算子之间,以及keyBy/window算子和Sink算子之间,都是如许的关系。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于Spark中的shuffle。
2)合并算子链
在Flink中,并行度雷同的一对一(one to one)算子操纵,可以直接链接在一起形成一个“大”的任务(task),如许原来的算子就成为了真正任务里的一部分,如下图所示。每个task会被一个线程实验。如许的技术被称为“算子链”(Operator Chain)。
上图中Source和map之间满意了算子链的要求,以是可以直接合并在一起,形成了一个任务;由于并行度为2,以是合并后的任务也有两个并行子任务。如许,这个数据流图所表示的作业最终会有5个任务,由5个线程并行实验。
将算子链接成task黑白常有效的优化:可以淘汰线程之间的切换和基于缓存区的数据互换,在淘汰时延的同时提拔吞吐量。
Flink默认会按照算子链的原则进行链接合并,如果我们想要禁止合并大概自行界说,也可以在代码中对算子做一些特定的设置:
// 禁用算子链
.map(word -> Tuple2.of(word, 1L)).disableChaining();
// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()
4.2.3 任务槽(Task Slots)
1)任务槽(Task Slots)
Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行实验多个子任务(subtask)。
很显然,TaskManager的盘算资源是有限的,并行的任务越多,每个线程的资源就会越少。那一个TaskManager到底能并行处理处罚多少个任务呢?为了控制并发量,我们需要在TaskManager上对每个任务运行所占用的资源做出明白的分别,这就是所谓的任务槽(task slots)。
每个任务槽(task slot)其实表示了TaskManager拥有盘算资源的一个固定大小的子集。这些资源就是用来独立实验一个子任务的。
2)任务槽数目的设置
在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml设置文件中,可以设置TaskManager的slot数目,默认是1个slot。
taskmanager.numberOfTaskSlots: 8
需要留意的是,slot现在仅仅用来隔离内存,不会涉及CPU的隔离。在具体应用时,可以将slot数目设置为呆板的CPU核心数,只管避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为呆板CPU数目的原因。
3)任务对任务槽的共享
默认情况下,Flink是允许子任务共享slot的。如果我们保持sink任务并行度为1稳定,而作业提交时设置全局并行度为6,那么前两个任务节点就会各自有6个并行子任务,整个流处理处罚程序则有13个子任务。如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上实验。以是对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。
当我们将资源麋集型和非麋集型的任务同时放到一个slot中,它们就可以自行分配对资源占用的比例,从而保证最重的活平均分配给所有的TaskManager。
slot共享另一个好处就是允许我们保存完整的作业管道。如许一来,纵然某个TaskManager出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继承实验。
当然,Flink默认是允许slot共享的,如果希望某个算子对应的任务完全独占一个slot,大概只有某一部分算子共享slot,我们也可以通过设置“slot共享组”手动指定:
.map(word -> Tuple2.of(word, 1L)).slotSharingGroup(“1”);
如许,只有属于同一个slot共享组的子任务,才会开启slot共享;不同组之间的任务是完全隔离的,必须分配到不同的slot上。在这种场景下,总共需要的slot数目,就是各个slot共享组最大并行度的总和。
4.2.4 任务槽和并行度的关系
任务槽和并行度都跟程序的并行实验有关,但两者是完全不同的概念。简朴来说任务槽是静态的概念,是指TaskManager具有的并发实验能力,可以通过参数taskmanager.numberOfTaskSlots进行设置;而并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行设置。
举例阐明:假设一共有3个TaskManager,每一个TaskManager中的slot数目设置为3个,那么一共有9个task slot,表示集群最多能并行实验9个同一算子的子任务。
而我们界说word count程序的处理处罚操纵是四个转换算子:
source→ flatmap→ reduce→ sink
当所有算子并行度雷同时,轻易看出source和flatmap可以合并算子链,于是最终有三个任务节点。
通过这个例子也可以明白地看到,整个流处理处罚程序的并行度,就应该是所有算子并行度中最大的谁人,这代表了运行程序需要的slot数目。
4.3 作业提交流程
4.3.1 Standalone会话模式作业提交流程
4.3.2 逻辑流图/作业图/实验图/物理流图
我们已经彻底了解了由代码天生任务的过程,现在来做个梳理总结。
逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 实验图(ExecutionGraph)→ 物理图(Physical Graph)。
1)逻辑流图(StreamGraph)
这是根据用户通过 DataStream API编写的代码天生的最初的DAG图,用来表示程序的拓扑布局。这一步一般在客户端完成。
2)作业图(JobGraph)
StreamGraph颠末优化后天生的就是作业图(JobGraph),这是提交给 JobManager 的数据布局,确定了当前作业中所有任务的分别。紧张的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,如许可以淘汰数据互换的消耗。JobGraph一般也是在客户端天生的,在作业提交时传递给JobMaster。
我们提交作业之后,打开Flink自带的Web UI,点击作业就能看到对应的作业图。
3)实验图(ExecutionGraph)
JobMaster收到JobGraph后,会根据它来天生实验图(ExecutionGraph)。ExecutionGraph是JobGraph的并行化版本,是调治层最核心的数据布局。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,并明白了任务间数据传输的方式。
4)物理图(Physical Graph)
JobMaster天生实验图后,会将它分发给TaskManager;各个TaskManager会根据实验图部署任务,最终的物理实验过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体实验层面的图,并不是一个具体的数据布局。
物理图紧张就是在实验图的根本上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager就可以对传递来的数据进行处理处罚盘算了。
4.3.3 Yarn应用模式作业提交流程
第5章 DataStream API
DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码根本上都由以下几部分构成:
5.1 实验环境(Execution Environment)
Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中实验程序,也可以提交到远程集群上运行。
不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业实验盘算时,起首必须获取当前Flink的运行环境,从而建立起与Flink框架之间的接洽。
5.1.1 创建实验环境
我们要获取的实验环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的根本。在代码中创建实验环境的方式,就是调用这个类的静态方法,具体有以下三种。
1)getExecutionEnvironment
最简朴的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地实验环境;如果是创建了jar包,然后从命令行调用它并提交到集群实验,那么就返回集群的实验环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

这种方式,用起来简朴高效,是最常用的一种创建实验环境的方式。
2)createLocalEnvironment
这个方法返回一个本地实验环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
3)createRemoteEnvironment
这个方法返回集群实验环境。需要在调用时指定JobManager的主机名和端标语,并指定要在集群中运行的Jar包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
“host”, // JobManager主机名
1234, // JobManager进程端标语
“path/to/jarFile.jar” // 提交给JobManager的JAR包
);
在获取到程序实验环境后,我们还可以对实验环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以界说程序的时间语义、设置容错机制。
5.1.2 实验模式(Execution Mode)
从Flink 1.12开始,官方保举的做法是直接使用DataStream API,在提交任务时通过将实验模式设为BATCH来进行批处理处罚。不建议使用DataSet API。
// 流处理处罚环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API实验模式包罗:流实验模式、批实验模式和主动模式。
 流实验模式(Streaming)
这是DataStream API最经典的模式,一般用于需要持续实时处理处罚的无界数据流。默认情况下,程序使用的就是Streaming实验模式。
 批实验模式(Batch)
专门用于批处理处罚的实验模式。
 主动模式(AutoMatic)
在这种模式下,将由程序根据输入数据源是否有界,来主动选择实验模式。
批实验模式的使用。紧张有两种方式:
(1)通过命令行设置
bin/flink run -Dexecution.runtime-mode=BATCH …
在提交作业时,增长execution.runtime-mode参数,指定值为BATCH。
(2)通过代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于实验环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中设置,而是使用命令行,如许更加灵活。
5.1.3 触发程序实验
需要留意的是,写完输出(sink)操纵并不代表程序已经竣事。由于当main()方法被调用时,其实只是界说了作业的每个实验操纵,然后添加到数据流图中;这时并没有真正处理处罚数据——由于数据大概还没来。Flink是由变乱驱动的,只有比及数据到来,才会触发真正的盘算,这也被称为“延迟实验”或“懒实验”。
以是我们需要显式地调用实验环境的execute()方法,来触发程序实验。execute()方法将不停等待作业完成,然后返回一个实验结果(JobExecutionResult)。
env.execute();
5.2 源算子(Source)
Flink可以从各种泉源获取数据,然后构建DataStream进行转换处理处罚。一般将数据的输入泉源称为数据源(data source),而读取数据的算子就是源算子(source operator)。以是,source就是我们整个处理处罚程序的输入端。
在Flink1.12以前,旧的添加source的方式,是调用实验环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,紧张使用流批同一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了许多预实现的接口,此外另有许多外部毗连工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。
5.2.1 准备工作
为了方便练习,这里使用WaterSensor作为数据模型。
字段名 数据类型 阐明
id String 水位传感器类型
ts Long 传感器记录时间戳
vc Integer 水位记录
具体代码如下:
public class WaterSensor {
public String id;
public Long ts;
public Integer vc;
  1. public WaterSensor() {
  2. }
  3. public WaterSensor(String id, Long ts, Integer vc) {
  4.     this.id = id;
  5.     this.ts = ts;
  6.     this.vc = vc;
  7. }
  8. public String getId() {
  9.     return id;
  10. }
  11. public void setId(String id) {
  12.     this.id = id;
  13. }
  14. public Long getTs() {
  15.     return ts;
  16. }
  17. public void setTs(Long ts) {
  18.     this.ts = ts;
  19. }
  20. public Integer getVc() {
  21.     return vc;
  22. }
  23. public void setVc(Integer vc) {
  24.     this.vc = vc;
  25. }
  26. @Override
  27. public String toString() {
  28.     return "WaterSensor{" +
  29.             "id='" + id + '\'' +
  30.             ", ts=" + ts +
  31.             ", vc=" + vc +
  32.             '}';
  33. }
  34. @Override
  35. public boolean equals(Object o) {
  36.     if (this == o) {
  37.         return true;
  38.     }
  39.     if (o == null || getClass() != o.getClass()) {
  40.         return false;
  41.     }
  42.     WaterSensor that = (WaterSensor) o;
  43.     return Objects.equals(id, that.id) &&
  44.             Objects.equals(ts, that.ts) &&
  45.             Objects.equals(vc, that.vc);
  46. }
  47. @Override
  48. public int hashCode() {
  49.     return Objects.hash(id, ts, vc);
  50. }
复制代码
}
这里需要留意,我们界说的WaterSensor,有如许几个特点:
 类是公有(public)的
 有一个无参的构造方法
 所有属性都是公有(public)的
 所有属性的类型都是可以序列化的
Flink会把如许的类作为一种特殊的POJO(Plain Ordinary Java Object简朴的Java对象,实际就是平常JavaBeans)数据类型来对待,方便数据的剖析和序列化。另外我们在类中还重写了toString方法,紧张是为了测试输出显示更清晰。
我们这里自界说的POJO类会在背面的代码中频仍使用,以是在背面的代码中遇到,把这里的POJO类导入就好了。
5.2.2 从聚集中读取数据
最简朴的读取数据的方式,就是在代码中直接创建一个Java聚集,然后调用实验环境的fromCollection方法进行读取。这相当于将数据暂时存储到内存中,形成特殊的数据布局后,作为数据源使用,一般用于测试。
public static void main(String[] args) throws Exception {
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
复制代码
List data = Arrays.asList(1, 22, 3);
DataStreamSource ds = env.fromCollection(data);
  1. stream.print();
  2. env.execute();
复制代码
}
5.2.3 从文件读取数据
真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理处罚中最常见的读取方式。
读取文件,需要添加文件毗连器依赖:

org.apache.flink
flink-connector-files
${flink.version}

示例如下:
public static void main(String[] args) throws Exception {
  1.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();    env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();    env.execute();
复制代码
}
阐明:
 参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://…;
 路径可以是相对路径,也可以是绝对路径;
 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;
5.2.4 从Socket读取数据
不论从聚集照旧文件,我们读取的其实都是有界数据。在流处理处罚的场景中,数据通常是无界的。
我们之前用到的读取socket文本流,就是流处理处罚场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
DataStream stream = env.socketTextStream(“localhost”, 7777);
5.2.5 从Kafka读取数据
Flink官方提供了毗连工具flink-connector-kafka,直接帮我们实现了一个消耗者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
以是想要以Kafka作为数据源获取数据,我们只需要引入Kafka毗连器的依赖。Flink官方提供的是一个通用的Kafka毗连器,它会主动跟踪最新版本的Kafka客户端。现在最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。

org.apache.flink
flink-connector-kafka
${flink.version}

代码如下:
public class SourceKafka {
public static void main(String[] args) throws Exception {
  1.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     KafkaSource<String> kafkaSource = KafkaSource.<String>builder()        .setBootstrapServers("hadoop102:9092")        .setTopics("topic_1")        .setGroupId("atguigu")        .setStartingOffsets(OffsetsInitializer.latest())        .setValueOnlyDeserializer(new SimpleStringSchema())         .build();    DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");    stream.print("Kafka");    env.execute();}
复制代码
}
5.2.6 从数据天生器读取数据
Flink从1.11开始提供了一个内置的DataGen 毗连器,紧张是用于天生一些随机数,用于在没有数据源的时间,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:

org.apache.flink
flink-connector-datagen
${flink.version}

代码如下:
public class DataGeneratorDemo {
public static void main(String[] args) throws Exception {
  1.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     env.setParallelism(1);    DataGeneratorSource<String> dataGeneratorSource =            new DataGeneratorSource<>(                    new GeneratorFunction<Long, String>() {                        @Override                        public String map(Long value) throws Exception {                            return "Number:"+value;                        }                    },                    Long.MAX_VALUE,                    RateLimiterStrategy.perSecond(10),                    Types.STRING            );    env            .fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator")            .print();    env.execute();}
复制代码
}
5.2.7 Flink支持的数据类型
1)Flink的类型系统
Flink使用“类型信息”(TypeInformation)来同一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些根本属性,并为每个数据类型天生特定的序列化器、反序列化器和比较器。
2)Flink支持的数据类型
对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了分别,这些类型可以在Types工具类中找到:
(1)根本类型
所有Java根本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。
(2)数组类型
包罗根本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。
(3)复合数据类型
 Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
 Scala 样例类及Scala元组:不支持空字段。
 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
 POJO:Flink自界说的类似于Java bean模式的类。
(4)辅助类型
Option、Either、List、Map等。
(5)泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。不外如果没有按照上面POJO类型的要求来界说,就会被Flink看成泛型类来处理处罚。Flink会把泛型类型看成黑盒,无法获取它们内部的属性;它们也不是由Flink自己序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,由于它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的界说中直接使用字段名,这会让我们的代码可读性大大增长。以是,在项目实践中,通常会将流处理处罚程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
 类是公有(public)的
 有一个无参的构造方法
 所有属性都是公有(public)的
 所有属性的类型都是可以序列化的
3)类型提示(Type Hints)
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,主动获取类型信息,从而得到对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),主动提取的信息是不敷精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重修出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或进步其性能。
为了办理这类题目,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理处罚程序,我们在将String类型的每个词转换成(word, count)二元组后,就明白地用returns指定了返回的类型。由于对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地剖析出完整数据。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且不停记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明白地指定转换之后的DataStream里元素的类型。
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
5.3 转换算子(Transformation)
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
5.3.1 根本转换算子(map/ filter/ flatMap)
5.3.1.1 映射(map)
map是各人非常熟悉的大数据操纵算子,紧张用于将数据流中的数据进行转换,形成新的数据流。简朴来说,就是一个“一一映射”,消耗一个元素就产出一个元素。
我们只需要基于DataStream调用map()方法就可以进行转换处理处罚。方法需要传入的参数是接口MapFunction的实现;返回值类型照旧DataStream,不外泛型(流中的元素类型)大概改变。
下面的代码用不同的方式,实现了提取WaterSensor中的id字段的功能。
public class TransMap {
public static void main(String[] args) throws Exception {
  1.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     DataStreamSource<WaterSensor> stream = env.fromElements(            new WaterSensor("sensor_1", 1, 1),            new WaterSensor("sensor_2", 2, 2)    );    // 方式一:传入匿名类,实现MapFunction    stream.map(new MapFunction<WaterSensor, String>() {        @Override        public String map(WaterSensor e) throws Exception {            return e.id;        }    }).print();    // 方式二:传入MapFunction的实现类    // stream.map(new UserMap()).print();    env.execute();}public static class UserMap implements MapFunction<WaterSensor, String> {    @Override    public String map(WaterSensor e) throws Exception {        return e.id;    }}
复制代码
}
上面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时间,需要指定两个泛型,分别是输入变乱和输失变乱的类型,还需要重写一个map()方法,界说从一个输入变乱转换为另一个输失变乱的具体逻辑。
5.3.1.2 过滤(filter)
filter转换操纵,顾名思义是对数据流实验一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判定,若为true则元素正常输出,若为false则元素被过滤掉。
进行filter转换之后的新数据流的数据类型与原数据流是雷同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。
案例需求:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。
public class TransFilter {
public static void main(String[] args) throws Exception {
  1.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     DataStreamSource<WaterSensor> stream = env.fromElements(
复制代码
new WaterSensor(“sensor_1”, 1, 1),
new WaterSensor(“sensor_1”, 2, 2),
new WaterSensor(“sensor_2”, 2, 2),
new WaterSensor(“sensor_3”, 3, 3)
);
  1.     // 方式一:传入匿名类实现FilterFunction
  2.     stream.filter(new FilterFunction<WaterSensor>() {
  3.         @Override
  4.         public boolean filter(WaterSensor e) throws Exception {
  5.             return e.id.equals("sensor_1");
  6.         }
  7.     }).print();
  8.     // 方式二:传入FilterFunction实现类
  9.     // stream.filter(new UserFilter()).print();
  10.    
  11.     env.execute();
  12. }
  13. public static class UserFilter implements FilterFunction<WaterSensor> {
  14.     @Override
  15.     public boolean filter(WaterSensor e) throws Exception {
  16.         return e.id.equals("sensor_1");
  17.     }
  18. }
复制代码
}
5.3.1.3 扁平映射(flatMap)
flatMap操纵又称为扁平映射,紧张是将数据流中的团体(一般是聚集类型)拆分成一个一个的个体使用。消耗一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操纵的团结,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理处罚。
同map一样,flatMap也可以使用Lambda表达式大概FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流雷同,也可以不同。
案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。
实当代码如下:
public class TransFlatmap {
public static void main(String[] args) throws Exception {
  1.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     DataStreamSource<WaterSensor> stream = env.fromElements(
复制代码
new WaterSensor(“sensor_1”, 1, 1),
new WaterSensor(“sensor_1”, 2, 2),
new WaterSensor(“sensor_2”, 2, 2),
new WaterSensor(“sensor_3”, 3, 3)
  1.     );
  2.     stream.flatMap(new MyFlatMap()).print();
  3.     env.execute();
  4. }
  5. public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {
  6.     @Override
  7.     public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
  8.         if (value.id.equals("sensor_1")) {
  9.             out.collect(String.valueOf(value.vc));
  10.         } else if (value.id.equals("sensor_2")) {
  11.             out.collect(String.valueOf(value.ts));
  12.             out.collect(String.valueOf(value.vc));
  13.         }
  14.     }
  15. }
复制代码
}
5.3.2 聚合算子(Aggregation)
盘算的结果不但依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操纵。
5.3.2.1 按键分区(keyBy)
对于Flink而言,DataStream是没有直接进行聚合的API的。由于我们对海量数据做聚合肯定要进行分区并行处理处罚,如许才能进步效率。以是在Flink中,要做聚合,需要先进行分区;这个操纵就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上分别成不同的分区(partitions)。这里所说的分区,其实就是并行处理处罚的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;如许一来,所有具有雷同的key的数据,都将被发往同一个分区。
在内部,是通过盘算key的哈希值(hash code),对分区数进行取模运算来实现的。以是这里key如果是POJO的话,必须要重写hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有许多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置大概多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式大概实现一个键选择器(KeySelector),用于阐明从数据中提取key的逻辑。
我们可以以id作为key做一个分区操纵,代码实现如下:
public class TransKeyBy {
public static void main(String[] args) throws Exception {
  1.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     DataStreamSource<WaterSensor> stream = env.fromElements(
复制代码
new WaterSensor(“sensor_1”, 1, 1),
new WaterSensor(“sensor_1”, 2, 2),
new WaterSensor(“sensor_2”, 2, 2),
new WaterSensor(“sensor_3”, 3, 3)
);
  1.     // 方式一:使用Lambda表达式
  2.     KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);
  3.     // 方式二:使用匿名类实现KeySelector
  4.     KeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {
  5.         @Override
  6.         public String getKey(WaterSensor e) throws Exception {
  7.             return e.id;
  8.         }
  9.     });
  10.     env.execute();
  11. }
复制代码
}
需要留意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”大概“键控流”,它是对DataStream按照key的一个逻辑分区,以是泛型有两个类型:撤除当前流中的元素类型外,还需要指定key的类型。
KeyedStream也继承自DataStream,以是基于它的操纵也都归属于DataStream API。但它跟之前的转换操纵得到的SingleOutputStreamOperator不同,只是一个流的分区操纵,并不是一个转换算子。KeyedStream是一个非常紧张的数据布局,只有基于它才可以做后续的聚合操纵(比如sum,reduce)。
5.3.2.2 简朴聚合(sum/min/max/minBy/maxBy)
有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操纵了。Flink为我们内置实现了一些最根本、最简朴的聚合API,紧张有以下几种:
 sum():在输入流上,对指定的字段做叠加求和的操纵。
 min():在输入流上,对指定的字段求最小值。
 max():在输入流上,对指定的字段求最大值。
 minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只盘算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
 maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全同等。
简朴聚合算子使用非常方便,语义也非常明白。这些聚合方法调用时,也需要传入参数;但并不像根本转换算子那样需要实现自界说函数,只要阐明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,可以使用这两种方式来指定字段。需要留意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。
如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。
public class TransAggregation {
  1. public static void main(String[] args) throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     DataStreamSource<WaterSensor> stream = env.fromElements(
复制代码
new WaterSensor(“sensor_1”, 1, 1),
new WaterSensor(“sensor_1”, 2, 2),
new WaterSensor(“sensor_2”, 2, 2),
new WaterSensor(“sensor_3”, 3, 3)
);
  1.     stream.keyBy(e -> e.id).max("vc");    // 指定字段名称
  2.     env.execute();
  3. }
复制代码
}
简朴聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了通例的DataStream。以是可以如许理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且颠末简朴聚合之后的数据流,元素的数据类型保持稳定。
一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。以是每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的变乱到下游算子。对于无界流来说,这些状态是永远不会被清除的,以是我们使用聚合算子,应该只用在含有有限个key的数据流上。
5.3.2.3 归约聚合(reduce)
reduce可以对已有的数据进行归约处理处罚,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合盘算。
reduce操纵也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,以是输出类型和输入类型是一样的。
调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的界说如下:
public interface ReduceFunction extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入变乱,颠末转换处理处罚之后输出一个雷同类型的变乱。在流处理处罚的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
我们可以单独界说一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能。
为了方便后续使用,界说一个WaterSensorMapFunction:
public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(“,”);
return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );
}
}
案例:使用reduce实现max和maxBy的功能。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env
.socketTextStream(“hadoop102”, 7777)
.map(new WaterSensorMapFunction())
.keyBy(WaterSensor::getId)
.reduce(new ReduceFunction()
{
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println(“Demo7_Reduce.reduce”);
  1.        int maxVc = Math.max(value1.getVc(), value2.getVc());
  2.        //实现max(vc)的效果  取最大值,其他字段以当前组的第一个为主
  3.        //value1.setVc(maxVc);
  4.        //实现maxBy(vc)的效果  取当前最大值的所有字段
  5.        if (value1.getVc() > value2.getVc()){
  6.            value1.setVc(maxVc);
  7.            return value1;
  8.        }else {
  9.            value2.setVc(maxVc);
  10.            return value2;
  11.        }
  12.    }
复制代码
})
.print();
env.execute();
reduce同简朴聚合算子一样,也要针对每一个key保存状态。由于状态不会清空,以是我们需要将reduce算子作用在一个有限key的流上。
5.3.3 用户自界说函数(UDF)
用户自界说函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自界说函数分为:函数类、匿名函数、富函数类。
5.3.3.1 函数类(Function Classes)
Flink袒露了所有UDF函数的接口,具体实现方式为接口大概抽象类,例如MapFunction、FilterFunction、ReduceFunction等。以是用户可以自界说一个函数类,实现对应的接口。
需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:
方式一:实现FilterFunction接口
public class TransFunctionUDF {
  1. public static void main(String[] args) throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     DataStreamSource<WaterSensor> stream = env.fromElements(
复制代码
new WaterSensor(“sensor_1”, 1, 1),
new WaterSensor(“sensor_1”, 2, 2),
new WaterSensor(“sensor_2”, 2, 2),
new WaterSensor(“sensor_3”, 3, 3)
);
  1.     DataStream<String> filter = stream.filter(new UserFilter());
  2.   
  3.     filter.print();
  4.     env.execute();
  5. }
  6. public static class UserFilter implements FilterFunction<WaterSensor> {
  7.     @Override
  8.     public boolean filter(WaterSensor e) throws Exception {
  9.         return e.id.equals("sensor_1");
  10.     }
  11. }
复制代码
}
方式二:通过匿名类来实现FilterFunction接口:
DataStream stream = stream.filter(new FilterFunction< WaterSensor>() {
@Override
public boolean filter(WaterSensor e) throws Exception {
return e.id.equals(“sensor_1”);
}
});
方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。
DataStreamSource stream = env.fromElements(
new WaterSensor(“sensor_1”, 1, 1),
new WaterSensor(“sensor_1”, 2, 2),
new WaterSensor(“sensor_2”, 2, 2),
new WaterSensor(“sensor_3”, 3, 3)
);
DataStream stream = stream.filter(new FilterFunctionImpl(“sensor_1”));
public static class FilterFunctionImpl implements FilterFunction {
private String id;
  1. FilterFunctionImpl(String id) { this.id=id; }
  2. @Override
  3. public boolean filter(WaterSensor value) throws Exception {
  4.     return thid.id.equals(value.id);
  5. }
复制代码
}
方式三:接纳匿名函数(Lambda)
public class TransFunctionUDF {
  1. public static void main(String[] args) throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2.     DataStreamSource<WaterSe
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

汕尾海湾

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表