目次
前提条件
基本准备
批处理API实现WordCount
流处理API实现WordCount
数据源是文件
数据源是socket文本流
打包
提交到集群运行
下令行提交作业
Web UI提交作业
上传代码到gitee
前提条件
Windows安装好jdk8、Maven3、IDEA
Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
基本准备
创建项目
利用IDEA创建一个新的Maven项目,项目名称,比方:flinkdemo
添加依赖
在项目的pom.xml文件中添加Flink的依赖。
- <properties>
- <flink.version>1.17.1</flink.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
- </dependencies>
复制代码
刷新依赖
刷新依赖后,能看到干系依赖如下
刷新依赖过程必要等候一些时间来下载干系依赖。
如果依赖下载慢,可以设置阿里云仓库镜像:
1.设置maven的settings.xml
在</mirrors>上面一行添加阿里云仓库镜像
- <mirror>
- <id>alimaven</id>
- <name>aliyun maven</name>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- <mirrorOf>central</mirrorOf>
- </mirror>
复制代码
2.IDEA设置maven
数据准备
在工程的根目次下,新建一个data文件夹
并在data文件夹下创建文本文件words.txt
内容如下
- hello world
- hello java
- hello flink
复制代码
新建包
右键src/main下的java,新建Package
填写包名org.example,包名与groupId的内容同等。
批处理API实现WordCount
在org.exmaple下新建wc包及BatchWordCount类
填写wc.BatchWordCount
结果如下
BatchWordCount.java代码如下:
- package org.example.wc;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.AggregateOperator;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.operators.FlatMapOperator;
- import org.apache.flink.api.java.operators.UnsortedGrouping;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
- public class BatchWordCount {
- public static void main(String[] args) throws Exception {
- // 1. 创建执行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- // 2. 从文件读取数据 按行读取
- DataSource<String> lineDS = env.readTextFile("data/words.txt");
- // 3. 转换数据格式
- FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
- @Override
- public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
- String[] words = line.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word,1L));
- }
- }
- });
- // 4. 按照 word 进行分组
- UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
- // 5. 分组内聚合统计
- AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
- // 6. 打印结果
- sum.print();
- }
- }
复制代码 运行步伐,检察结果
注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接利用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
- $ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
复制代码
流处理API实现WordCount
数据源是文件
在org.example.wc包下新建Java类StreamWordCount,代码如下:
- package org.example.wc;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- public class StreamWordCount {
- public static void main(String[] args) throws Exception {
- // 1. 创建流式执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 2. 读取文件
- DataStreamSource<String> lineStream = env.readTextFile("data/words.txt");
- // 3. 转换、分组、求和,得到统计结果
- SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
- @Override
- public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
- String[] words = line.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word, 1L));
- }
- }
- }).keyBy(data -> data.f0)
- .sum(1);
- // 4. 打印
- sum.print();
- // 5. 执行
- env.execute();
- }
- }
复制代码 运行结果
与批处理步伐BatchWordCount的区别:
- 创建执行环境的不同,流处理步伐利用的是StreamExecutionEnvironment。
- 转换处理之后,得到的数据对象类型不同。
- 分组操纵调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
- 代码末尾必要调用env的execute方法,开始执行任务。
数据源是socket文本流
流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,更换成读取socket文本流的方法socketTextStream。
在org.example.wc包下新建Java类SocketStreamWordCount,代码如下:
- package org.example.wc;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- public class SocketStreamWordCount {
- public static void main(String[] args) throws Exception {
- // 1. 创建流式执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
- DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);
- // 3. 转换、分组、求和,得到统计结果
- SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
- String[] words = line.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word, 1L));
- }
- }).returns(Types.TUPLE(Types.STRING, Types.LONG))
- .keyBy(data -> data.f0)
- .sum(1);
- // 4. 打印
- sum.print();
- // 5. 执行
- env.execute();
- }
- }
复制代码
进入node2终端,如果没有nc下令,必要先安装nc下令,安装nc下令如下:
- [hadoop@node2 ~]$ sudo yum install nc -y
复制代码
开启nc监听
- [hadoop@node2 ~]$ nc -lk 7777
复制代码
IDEA中,运行SocketStreamWordCount步伐。
往7777端口发送数据,比方发送hello world
控制台输出
继续往7777端口发送数据,比方发送hello flink
控制台输出
停止SocketStreamWordCount步伐。
按Ctrl+c停止nc下令。
打包
这里的打包是将写好的步伐打成jar包。
点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。
打包乐成后,看到如下输出信息,生成的jar包在项目的target目次下
提交到集群运行
把jar包提交到flink集群运行有两种方式:
1.通过下令行提交作业
2.通过Web UI提交作业
下令行提交作业
将jar包上传Linux
启动flink集群
- [hadoop@node2 ~]$ start-cluster.sh
- Starting cluster.
- Starting standalonesession daemon on host node2.
- Starting taskexecutor daemon on host node2.
- Starting taskexecutor daemon on host node3.
- Starting taskexecutor daemon on host node4.
-
复制代码 开启nc监听
- [hadoop@node2 ~]$ nc -lk 7777
-
复制代码 下令提交作业
开启另一个node2终端,利用flink run下令提交作业到flink集群
- [hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar
复制代码
-m指定提交到的JobManager,-c指定步伐入口类。
发送测试数据
在nc监听终端,往7777端口发送数据
检察结果
Web UI检察结果
浏览器访问
看到正在运行的作业如下
检察结果
继续发送测试数据
在nc终端继续发送数据
Web UI刷新结果
下令行检察结果
打开新的node2终端,检察结果
- [hadoop@node2 ~]$ cd $FLINK_HOME/log
- [hadoop@node2 log]$ ls
- flink-hadoop-client-node2.log flink-hadoop-standalonesession-0-node2.out
- flink-hadoop-standalonesession-0-node2.log flink-hadoop-taskexecutor-0-node2.log
- flink-hadoop-standalonesession-0-node2.log.1 flink-hadoop-taskexecutor-0-node2.log.1
- flink-hadoop-standalonesession-0-node2.log.2 flink-hadoop-taskexecutor-0-node2.log.2
- flink-hadoop-standalonesession-0-node2.log.3 flink-hadoop-taskexecutor-0-node2.log.3
- flink-hadoop-standalonesession-0-node2.log.4 flink-hadoop-taskexecutor-0-node2.log.4
- flink-hadoop-standalonesession-0-node2.log.5 flink-hadoop-taskexecutor-0-node2.out
- [hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out
- (hello,1)
- (flink,1)
- (hello,2)
- (world,1)
-
复制代码
取消flink作业
点击Cancel Job取消作业
停止nc监听
按Ctrl+c停止nc下令
Web UI提交作业
开启nc监听
开启nc监听发送数据
- [hadoop@node2 ~]$ nc -lk 7777
复制代码
Web UI提交作业
浏览器访问
点击Submit New Job
点击Add New
选择flink作业jar包所在路径
点击jar包名称
填写干系内容,点击Submit提交作业
Entry Class填写运行的主类,比方:org.example.wc.SocketStreamWordCount
Parallesim填写作业的并行度,比方:1
提交后,在Running Jobs里看到运行的作业
发送测试数据
往7777端口发送数据
检察结果
继续发送测试数据
刷新结果
取消作业
停止nc监听
按住Ctrl+c停止nc下令
关闭flink集群
- [hadoop@node2 ~]$ stop-cluster.sh
- Stopping taskexecutor daemon (pid: 2283) on host node2.
- Stopping taskexecutor daemon (pid: 1827) on host node3.
- Stopping taskexecutor daemon (pid: 1829) on host node4.
- Stopping standalonesession daemon (pid: 1929) on host node2.
复制代码
上传代码到gitee
登录gitee
注意:如果还没有gitee账号,必要先注册;如果之前没有设置过SSH公钥,必要先设置SSH公钥。
创建仓库
提交接码
利用IDEA提交接码
提示有告诫,忽略告诫,继续提交
提交乐成后,IDEA显示如下
刷新浏览器检察gitee界面,看到代码已上传乐成
完成!enjoy it!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |