Flink WordCount实践

打印 上一主题 下一主题

主题 539|帖子 539|积分 1619

目次
前提条件
基本准备
批处理API实现WordCount
流处理API实现WordCount
数据源是文件
数据源是socket文本流
打包
提交到集群运行
下令行提交作业
Web UI提交作业
上传代码到gitee


前提条件

Windows安装好jdk8、Maven3、IDEA
Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 
基本准备

创建项目

利用IDEA创建一个新的Maven项目,项目名称,比方:flinkdemo



添加依赖

在项目的pom.xml文件中添加Flink的依赖。
  1.         <properties>
  2.         <flink.version>1.17.1</flink.version>
  3.     </properties>
  4.     <dependencies>
  5.         <dependency>
  6.             <groupId>org.apache.flink</groupId>
  7.             <artifactId>flink-streaming-java</artifactId>
  8.             <version>${flink.version}</version>
  9.         </dependency>
  10.         <dependency>
  11.             <groupId>org.apache.flink</groupId>
  12.             <artifactId>flink-clients</artifactId>
  13.             <version>${flink.version}</version>
  14.         </dependency>
  15.     </dependencies>
复制代码


刷新依赖

刷新依赖后,能看到干系依赖如下

刷新依赖过程必要等候一些时间来下载干系依赖。
如果依赖下载慢,可以设置阿里云仓库镜像:
 1.设置maven的settings.xml


在</mirrors>上面一行添加阿里云仓库镜像
  1.         <mirror>
  2.       <id>alimaven</id>
  3.       <name>aliyun maven</name>
  4.       <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5.       <mirrorOf>central</mirrorOf>        
  6.     </mirror>
复制代码


2.IDEA设置maven



数据准备

在工程的根目次下,新建一个data文件夹

并在data文件夹下创建文本文件words.txt

内容如下
  1. hello world
  2. hello java
  3. hello flink
复制代码


新建包

右键src/main下的java,新建Package


填写包名org.example,包名与groupId的内容同等。



批处理API实现WordCount

在org.exmaple下新建wc包及BatchWordCount类

填写wc.BatchWordCount

结果如下

BatchWordCount.java代码如下:
  1. package org.example.wc;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.operators.FlatMapOperator;
  7. import org.apache.flink.api.java.operators.UnsortedGrouping;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10. public class BatchWordCount {
  11.     public static void main(String[] args) throws Exception {
  12.         // 1. 创建执行环境
  13.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  14.         // 2. 从文件读取数据 按行读取
  15.         DataSource<String> lineDS = env.readTextFile("data/words.txt");
  16.         // 3. 转换数据格式
  17.         FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  18.             @Override
  19.             public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
  20.                 String[] words = line.split(" ");
  21.                 for (String word : words) {
  22.                     out.collect(Tuple2.of(word,1L));
  23.                 }
  24.             }
  25.         });
  26.         // 4. 按照 word 进行分组
  27.         UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
  28.         // 5. 分组内聚合统计
  29.         AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
  30.         // 6. 打印结果
  31.         sum.print();
  32.     }
  33. }
复制代码
运行步伐,检察结果

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接利用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
  1. $ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
复制代码



流处理API实现WordCount

数据源是文件

在org.example.wc包下新建Java类StreamWordCount,代码如下:
  1. package org.example.wc;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. public class StreamWordCount {
  9.     public static void main(String[] args) throws Exception {
  10.         // 1. 创建流式执行环境
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         // 2. 读取文件
  13.         DataStreamSource<String> lineStream = env.readTextFile("data/words.txt");
  14.         // 3. 转换、分组、求和,得到统计结果
  15.         SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
  16.                     @Override
  17.                     public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
  18.                         String[] words = line.split(" ");
  19.                         for (String word : words) {
  20.                             out.collect(Tuple2.of(word, 1L));
  21.                         }
  22.                     }
  23.                 }).keyBy(data -> data.f0)
  24.                 .sum(1);
  25.         // 4. 打印
  26.         sum.print();
  27.         // 5. 执行
  28.         env.execute();
  29.     }
  30. }
复制代码
运行结果


与批处理步伐BatchWordCount的区别:


  • 创建执行环境的不同,流处理步伐利用的是StreamExecutionEnvironment。
  • 转换处理之后,得到的数据对象类型不同。
  • 分组操纵调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
  • 代码末尾必要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,更换成读取socket文本流的方法socketTextStream。
在org.example.wc包下新建Java类SocketStreamWordCount,代码如下:
  1. package org.example.wc;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. public class SocketStreamWordCount {
  9.     public static void main(String[] args) throws Exception {
  10.         // 1. 创建流式执行环境
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
  13.         DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);
  14.         // 3. 转换、分组、求和,得到统计结果
  15.         SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
  16.                     String[] words = line.split(" ");
  17.                     for (String word : words) {
  18.                         out.collect(Tuple2.of(word, 1L));
  19.                     }
  20.                 }).returns(Types.TUPLE(Types.STRING, Types.LONG))
  21.                 .keyBy(data -> data.f0)
  22.                 .sum(1);
  23.         // 4. 打印
  24.         sum.print();
  25.         // 5. 执行
  26.         env.execute();
  27.     }
  28. }
复制代码

进入node2终端,如果没有nc下令,必要先安装nc下令,安装nc下令如下:
  1. [hadoop@node2 ~]$ sudo yum install nc -y
复制代码

开启nc监听
  1. [hadoop@node2 ~]$ nc -lk 7777
复制代码

IDEA中,运行SocketStreamWordCount步伐。


往7777端口发送数据,比方发送hello world


控制台输出


继续往7777端口发送数据,比方发送hello flink


控制台输出


停止SocketStreamWordCount步伐。

按Ctrl+c停止nc下令。

打包

这里的打包是将写好的步伐打成jar包。
点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。


打包乐成后,看到如下输出信息,生成的jar包在项目的target目次下


提交到集群运行

把jar包提交到flink集群运行有两种方式:
1.通过下令行提交作业   
2.通过Web UI提交作业

下令行提交作业

将jar包上传Linux


启动flink集群

  1. [hadoop@node2 ~]$ start-cluster.sh
  2. Starting cluster.
  3. Starting standalonesession daemon on host node2.
  4. Starting taskexecutor daemon on host node2.
  5. Starting taskexecutor daemon on host node3.
  6. Starting taskexecutor daemon on host node4.
复制代码
开启nc监听

  1. [hadoop@node2 ~]$ nc -lk 7777
复制代码
下令提交作业

开启另一个node2终端,利用flink run下令提交作业到flink集群
  1. [hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar
复制代码

-m指定提交到的JobManager,-c指定步伐入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

检察结果

Web UI检察结果

浏览器访问
  1. node2:8081
复制代码
看到正在运行的作业如下


检察结果



继续发送测试数据

在nc终端继续发送数据


Web UI刷新结果



下令行检察结果

打开新的node2终端,检察结果
  1. [hadoop@node2 ~]$ cd $FLINK_HOME/log
  2. [hadoop@node2 log]$ ls
  3. flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
  4. flink-hadoop-standalonesession-0-node2.log    flink-hadoop-taskexecutor-0-node2.log
  5. flink-hadoop-standalonesession-0-node2.log.1  flink-hadoop-taskexecutor-0-node2.log.1
  6. flink-hadoop-standalonesession-0-node2.log.2  flink-hadoop-taskexecutor-0-node2.log.2
  7. flink-hadoop-standalonesession-0-node2.log.3  flink-hadoop-taskexecutor-0-node2.log.3
  8. flink-hadoop-standalonesession-0-node2.log.4  flink-hadoop-taskexecutor-0-node2.log.4
  9. flink-hadoop-standalonesession-0-node2.log.5  flink-hadoop-taskexecutor-0-node2.out
  10. [hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out
  11. (hello,1)
  12. (flink,1)
  13. (hello,2)
  14. (world,1)
复制代码


取消flink作业



点击Cancel Job取消作业 



停止nc监听

按Ctrl+c停止nc下令

Web UI提交作业


开启nc监听

开启nc监听发送数据
  1. [hadoop@node2 ~]$ nc -lk 7777
复制代码

Web UI提交作业

浏览器访问
  1. node2:8081
复制代码

点击Submit New Job

点击Add New

选择flink作业jar包所在路径


点击jar包名称


填写干系内容,点击Submit提交作业
Entry Class填写运行的主类,比方:org.example.wc.SocketStreamWordCount
Parallesim填写作业的并行度,比方:1


提交后,在Running Jobs里看到运行的作业


发送测试数据

往7777端口发送数据


检察结果



继续发送测试数据



刷新结果



取消作业



停止nc监听

按住Ctrl+c停止nc下令

关闭flink集群

  1. [hadoop@node2 ~]$ stop-cluster.sh
  2. Stopping taskexecutor daemon (pid: 2283) on host node2.
  3. Stopping taskexecutor daemon (pid: 1827) on host node3.
  4. Stopping taskexecutor daemon (pid: 1829) on host node4.
  5. Stopping standalonesession daemon (pid: 1929) on host node2.
复制代码


上传代码到gitee


登录gitee

  1. https://gitee.com/
复制代码
注意:如果还没有gitee账号,必要先注册;如果之前没有设置过SSH公钥,必要先设置SSH公钥。

创建仓库










提交接码

利用IDEA提交接码




提示有告诫,忽略告诫,继续提交












提交乐成后,IDEA显示如下


刷新浏览器检察gitee界面,看到代码已上传乐成




完成!enjoy it!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南七星之家

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表