【Flink入门修炼】1-3 Flink WordCount 入门实现

打印 上一主题 下一主题

主题 901|帖子 901|积分 2707

本篇文章将带大家运行 Flink 最简单的程序 WordCount。先实践后理论,对其基本输入输出、编程代码有初步了解,后续篇章再对 Flink 的各种概念和架构进行介绍。
下面将从创建项目开始,介绍如何创建出一个 Flink 项目;然后从 DataStream 流处理和 FlinkSQL 执行两种方式来带大家学习 WordCount 程序的开发。
Flink 各版本之间变化较多,之前版本的函数在后续版本可能不再支持。跟随学习时,请尽量选择和笔者同版本的 Flink。本文使用的 Flink 版本是 1.13.2。
一、创建项目

在很多其他教程中,会看到如下来创建 Flink 程序的方式。虽然简单方便,但对初学者来说,不知道初始化项目的时候做了什么,如果报错了也不知道该如何排查。
mvn   archetype:generate  
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.2
通过指定 Maven 工程的三要素,即 GroupId、ArtifactId、Version 来创建一个新的工程。同时 Flink 给我提供了更为方便的创建 Flink 工程的方法:
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.13.2
因此,我们手动来创建一个 Maven 项目,看看到底如何创建出一个 Flink 项目。
1、通过 IDEA 创建一个 Maven 项目

2、pom.xml 添加:
这里我们选择的是 Flink 1.13.2 版本(Flink 1.14 之后部分类和函数有变化,可自行探索)。
  1.     <properties>
  2.         <maven.compiler.source>8</maven.compiler.source>
  3.         <maven.compiler.target>8</maven.compiler.target>
  4.         <flink.version>1.13.2</flink.version>
  5.         <target.java.version>1.8</target.java.version>
  6.         <scala.binary.version>2.12</scala.binary.version>
  7.         <maven.compiler.source>${target.java.version}</maven.compiler.source>
  8.         <maven.compiler.target>${target.java.version}</maven.compiler.target>
  9.     </properties>
  10.     <dependencies>
  11.         <dependency>
  12.             <groupId>org.apache.flink</groupId>
  13.             <artifactId>flink-java</artifactId>
  14.             <version>${flink.version}</version>
  15.             <scope>provided</scope>
  16.         </dependency>
  17.         <dependency>
  18.             <groupId>org.apache.flink</groupId>
  19.             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  20.             <version>${flink.version}</version>
  21.             <scope>provided</scope>
  22.         </dependency>
  23.         <dependency>
  24.             <groupId>org.apache.flink</groupId>
  25.             <artifactId>flink-clients_${scala.binary.version}</artifactId>
  26.             <version>${flink.version}</version>
  27.             <scope>provided</scope>
  28.         </dependency>
  29.     </dependencies>
复制代码
二、DataStream WordCount

一)编写程序

基础项目环境已经搞好了,接下来我们模仿一个流式环境,监听本地的 Socket 端口,使用 Flink 统计流入的不同单词个数。
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.util.Collector;
  7. public class SocketTextStreamWordCount {
  8.     public static void main(String[] args) throws Exception {
  9.         //参数检查
  10.         if (args.length != 2) {
  11.             // System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
  12.             // return;
  13.             args = new String[]{"127.0.0.1", "9000"};
  14.         }
  15.         String hostname = args[0];
  16.         Integer port = Integer.parseInt(args[1]);
  17.         // 创建 streaming execution environment
  18.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         // 获取数据
  20.         DataStreamSource<String> stream = env.socketTextStream(hostname, port);
  21.         // 计数
  22.         SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
  23.                 .keyBy(0)
  24.                 .sum(1);
  25.         sum.print();
  26.         env.execute("Java WordCount from SocketTextStream Example");
  27.     }
  28.     public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  29.         @Override
  30.         public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
  31.             String[] tokens = s.toLowerCase().split("\\W+");
  32.             for (String token: tokens) {
  33.                 if (token.length() > 0) {
  34.                     collector.collect(new Tuple2<String, Integer>(token, 1));
  35.                 }
  36.             }
  37.         }
  38.     }
  39. }
复制代码
二)测试

接下来我们进行程序测试。
我们在本地使用 netcat 命令启动一个端口:
  1. nc -l 9000
复制代码
然后启动程序,能看到控制台一些输出:

接下来,在 nc 中输入:
  1. $ nc -l 9000
  2. hello world
  3. flink flink flink
复制代码
回到我们的程序,能看到统计的输出:
  1. 3> (hello,1)
  2. 6> (world,1)
  3. 8> (flink,1)
  4. 8> (flink,2)
  5. 8> (flink,3)
复制代码

三)如果有报错

如果出现执行报错:
  1. Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/io/TextInputFormat
  2.         at com.shuofxz.SocketTextStreamWordCount.main(SocketTextStreamWordCount.java:25)
  3. Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.java.io.TextInputFormat
  4.         at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  5.         at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
  6.         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  7.         at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
  8.         ... 1 more
复制代码
在 IDE 中把 「Add dependencies with "rovided" scope to classpath」勾选上:

三、Flink Table & SQL WordCount

一)介绍 FlinkSQL

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
上面单词统计的逻辑可以转化为下面的 SQL。
直接来看这个 SQL:
  1. select word as word, sum(frequency) as frequency from WordCount group by word
复制代码

  • WordCount 是要进行单词统计的表,我们会先做一些处理,将输入的单词都存放到这个表中
  • 表我们定义为两列(word, frequency),初始转化输入每个单词占一行,frequency 都是 1
  • 然后,就可以按照 SQL 的逻辑来进行统计聚合了。
其中,WordCount 表数据如下:
wordfrequencyhello1world1flink1flink1flink1那么接下来我们看,如何写一个 FlinkSQL 的程序。
二)环境和程序

首先,添加 FlinkSQL 需要的依赖:
  1.         <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
  4.             <version>${flink.version}</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.apache.flink</groupId>
  8.             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  9.             <version>${flink.version}</version>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>org.apache.flink</groupId>
  13.             <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  14.             <version>${flink.version}</version>
  15.         </dependency>
  16.         <dependency>
  17.             <groupId>org.apache.flink</groupId>
  18.             <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  19.             <version>${flink.version}</version>
  20.         </dependency>
复制代码
程序如下:
  1. public class SQLWordCount {
  2.     public static void main(String[] args) throws Exception {
  3.         // 创建上下文环境
  4.         ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
  5.         BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
  6.         // 读取一行模拟数据作为输入
  7.         String words = "hello world flink flink flink";
  8.         String[] split = words.split("\\W+");
  9.         ArrayList<WC> list = new ArrayList<>();
  10.         for (String word : split) {
  11.             WC wc = new WC(word, 1);
  12.             list.add(wc);
  13.         }
  14.         DataSource<WC> input = fbEnv.fromCollection(list);
  15.         // DataSet 转 SQL,指定字段名
  16.         Table table = fbTableEnv.fromDataSet(input, "word,frequency");
  17.         table.printSchema();
  18.         // 注册为一个表
  19.         fbTableEnv.createTemporaryView("WordCount", table);
  20.         Table table1 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount group by word");
  21.         DataSet<WC> ds1 = fbTableEnv.toDataSet(table1, WC.class);
  22.         ds1.printToErr();
  23.     }
  24.     public static class WC {
  25.         public String word;
  26.         public long frequency;
  27.         public WC() {}
  28.         public WC(String word, long frequency) {
  29.             this.word = word;
  30.             this.frequency = frequency;
  31.         }
  32.         @Override
  33.         public String toString() {
  34.             return  word + ", " + frequency;
  35.         }
  36.     }
  37. }
复制代码
执行,结果输出:
  1. (
  2.   `word` STRING,
  3.   `frequency` BIGINT
  4. )
  5. flink, 3
  6. world, 1
  7. hello, 1
复制代码

四、小结

本篇手把手的带大家搭建起 Flink Maven 项目,然后使用 DataStream 和 FlinkSQL 两种方式来学习 WordCount 单词计数这一最简单最经典的 Flink 程序开发。跟着步骤一步步执行下来,大家应该对 Flink 程序基本执行流程有个初步的了解,为后续的学习打下了基础。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表