南七星之家 发表于 2024-7-15 00:17:30

Flink WordCount实践

目次
前提条件
基本准备
批处理API实现WordCount
流处理API实现WordCount
数据源是文件
数据源是socket文本流
打包
提交到集群运行
下令行提交作业
Web UI提交作业
上传代码到gitee

前提条件

Windows安装好jdk8、Maven3、IDEA
Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式
 
基本准备

创建项目

利用IDEA创建一个新的Maven项目,项目名称,比方:flinkdemo
https://img-blog.csdnimg.cn/direct/298c293b0451432ca13fb1fc02b4baed.png
https://img-blog.csdnimg.cn/direct/e7da8df27d8c415995d1e8f6ff73a7cb.png

添加依赖

在项目的pom.xml文件中添加Flink的依赖。
        <properties>
      <flink.version>1.17.1</flink.version>
    </properties>

    <dependencies>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
      </dependency>
    </dependencies> https://img-blog.csdnimg.cn/direct/04a6718ce8304efeb0e940ee2133c846.png

刷新依赖
https://img-blog.csdnimg.cn/direct/ba94d9f8a015441d8a48f0089898519c.png
刷新依赖后,能看到干系依赖如下
https://img-blog.csdnimg.cn/direct/dc1eaee64afb46e2b19d3ea452d652d6.png
刷新依赖过程必要等候一些时间来下载干系依赖。
如果依赖下载慢,可以设置阿里云仓库镜像:
 1.设置maven的settings.xml
https://img-blog.csdnimg.cn/direct/d028c1852ab247b38b339874b64eade7.png

在</mirrors>上面一行添加阿里云仓库镜像
        <mirror>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <mirrorOf>central</mirrorOf>      
    </mirror> https://img-blog.csdnimg.cn/direct/5f89d5b1e6c24ac1952340b639b6fe59.png

2.IDEA设置maven
https://img-blog.csdnimg.cn/direct/4afc6fda56f94cb1b174f90b364c1e25.png
https://img-blog.csdnimg.cn/direct/3fa42b40a0fd4bb8b38b3413f7083546.png

数据准备

在工程的根目次下,新建一个data文件夹
https://img-blog.csdnimg.cn/direct/eac98a3849364dc7b09ebeb255385f3d.png
并在data文件夹下创建文本文件words.txt
https://img-blog.csdnimg.cn/direct/7a9da91d9d2b40fe8a5cc06d75b3ad9a.png
内容如下
hello world
hello java
hello flink https://img-blog.csdnimg.cn/direct/969c92b45aa140a1bccb11b541de858b.png

新建包

右键src/main下的java,新建Package
https://img-blog.csdnimg.cn/direct/4b41031eb7174ce38e0281918ec4ca3e.png

填写包名org.example,包名与groupId的内容同等。
https://img-blog.csdnimg.cn/direct/4b5afe1da0664cebaddac519eb6396e6.png


批处理API实现WordCount

在org.exmaple下新建wc包及BatchWordCount类
https://img-blog.csdnimg.cn/direct/1324aae8d4cc4b9bb08477048b02d6ce.png
填写wc.BatchWordCount
https://img-blog.csdnimg.cn/direct/883c406f9f8041a2ad7086e8cb35720c.png
结果如下
https://img-blog.csdnimg.cn/direct/24c9b8055f0c46fcb40415b83a71ca06.png
BatchWordCount.java代码如下:
package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
      // 1. 创建执行环境
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 2. 从文件读取数据 按行读取
      DataSource<String> lineDS = env.readTextFile("data/words.txt");

      // 3. 转换数据格式
      FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {

            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                String[] words = line.split(" ");

                for (String word : words) {
                  out.collect(Tuple2.of(word,1L));
                }
            }
      });

      // 4. 按照 word 进行分组
      UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

      // 5. 分组内聚合统计
      AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

      // 6. 打印结果
      sum.print();
    }
}
运行步伐,检察结果
https://img-blog.csdnimg.cn/direct/e8f2c287e8fa4995a215d5bb41b04e12.png
注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接利用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar


流处理API实现WordCount

数据源是文件

在org.example.wc包下新建Java类StreamWordCount,代码如下:
package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
      // 1. 创建流式执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 2. 读取文件
      DataStreamSource<String> lineStream = env.readTextFile("data/words.txt");

      // 3. 转换、分组、求和,得到统计结果
      SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                  @Override
                  public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {

                        String[] words = line.split(" ");

                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                  }
                }).keyBy(data -> data.f0)
                .sum(1);

      // 4. 打印
      sum.print();

      // 5. 执行
      env.execute();
    }
}
运行结果
https://img-blog.csdnimg.cn/direct/9003ec77c33c4131a523745af43bf8d4.png

与批处理步伐BatchWordCount的区别:


[*] 创建执行环境的不同,流处理步伐利用的是StreamExecutionEnvironment。
[*] 转换处理之后,得到的数据对象类型不同。
[*] 分组操纵调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
[*] 代码末尾必要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,更换成读取socket文本流的方法socketTextStream。
在org.example.wc包下新建Java类SocketStreamWordCount,代码如下:
package org.example.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketStreamWordCount {
    public static void main(String[] args) throws Exception {
      // 1. 创建流式执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号
      DataStreamSource<String> lineStream = env.socketTextStream("node2", 7777);

      // 3. 转换、分组、求和,得到统计结果
      SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                  String[] words = line.split(" ");

                  for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                  }
                }).returns(Types.TUPLE(Types.STRING, Types.LONG))
                .keyBy(data -> data.f0)
                .sum(1);

      // 4. 打印
      sum.print();

      // 5. 执行
      env.execute();
    }
}
进入node2终端,如果没有nc下令,必要先安装nc下令,安装nc下令如下:
$ sudo yum install nc -y
开启nc监听
$ nc -lk 7777
IDEA中,运行SocketStreamWordCount步伐。


往7777端口发送数据,比方发送hello world
https://img-blog.csdnimg.cn/direct/619848bdc5404c3ebd50842e8757c718.png

控制台输出
https://img-blog.csdnimg.cn/direct/72389fb8af1f411189a2a41ce4b37044.png

继续往7777端口发送数据,比方发送hello flink
https://img-blog.csdnimg.cn/direct/90184cdf6bf74a7a8a0745de6341a93b.png

控制台输出
https://img-blog.csdnimg.cn/direct/f9a5af6ef6d1471a888eb32fcf10f3a1.png

停止SocketStreamWordCount步伐。

按Ctrl+c停止nc下令。

打包

这里的打包是将写好的步伐打成jar包。
点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。
https://img-blog.csdnimg.cn/direct/d7f83039e6bd499db44e0f52a1a126a3.png

打包乐成后,看到如下输出信息,生成的jar包在项目的target目次下
https://img-blog.csdnimg.cn/direct/67ed2952f87f40f1b323e55eacff0892.png

提交到集群运行

把jar包提交到flink集群运行有两种方式:
1.通过下令行提交作业   
2.通过Web UI提交作业

下令行提交作业

将jar包上传Linux

https://img-blog.csdnimg.cn/direct/ce4d4efcdcae4878a020c3b3dc9707be.png
启动flink集群

$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.

开启nc监听

$ nc -lk 7777

下令提交作业

开启另一个node2终端,利用flink run下令提交作业到flink集群
$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar https://img-blog.csdnimg.cn/direct/cec70b75fe2d439ab55cb03c45a07faf.png
-m指定提交到的JobManager,-c指定步伐入口类。

发送测试数据

在nc监听终端,往7777端口发送数据
https://img-blog.csdnimg.cn/direct/b5f91dbdf229435fab4ac690e61590db.png
检察结果

Web UI检察结果

浏览器访问
node2:8081 看到正在运行的作业如下
https://img-blog.csdnimg.cn/direct/2f8cd71b2a174d36a7900536474cd3ba.png

检察结果
https://img-blog.csdnimg.cn/direct/becb0cf3165f4e7998c4abd4dc305dcd.png
https://img-blog.csdnimg.cn/direct/0c3d2b5155944289946b4e8018b35771.png

继续发送测试数据

在nc终端继续发送数据
https://img-blog.csdnimg.cn/direct/442384511ec946b5867607fade465a62.png

Web UI刷新结果

https://img-blog.csdnimg.cn/direct/c2f368f925804df098fe49c4bc7617b7.png

下令行检察结果

打开新的node2终端,检察结果
$ cd $FLINK_HOME/log
$ ls
flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out
flink-hadoop-standalonesession-0-node2.log  flink-hadoop-taskexecutor-0-node2.log
flink-hadoop-standalonesession-0-node2.log.1flink-hadoop-taskexecutor-0-node2.log.1
flink-hadoop-standalonesession-0-node2.log.2flink-hadoop-taskexecutor-0-node2.log.2
flink-hadoop-standalonesession-0-node2.log.3flink-hadoop-taskexecutor-0-node2.log.3
flink-hadoop-standalonesession-0-node2.log.4flink-hadoop-taskexecutor-0-node2.log.4
flink-hadoop-standalonesession-0-node2.log.5flink-hadoop-taskexecutor-0-node2.out
$ cat flink-hadoop-taskexecutor-0-node2.out
(hello,1)
(flink,1)
(hello,2)
(world,1)
​ https://img-blog.csdnimg.cn/direct/e675903a61e146ddb5a1e0382e26d07d.png

取消flink作业

https://img-blog.csdnimg.cn/direct/1c7e4dbcb6c44ef699f066ab2e6b59e2.png

点击Cancel Job取消作业 
https://img-blog.csdnimg.cn/direct/4f0b1bb41aa24967a74c38ed9efcca02.png
https://img-blog.csdnimg.cn/direct/12f99e1da8034c2899307a0cbb3cae1c.png

停止nc监听

按Ctrl+c停止nc下令

Web UI提交作业


开启nc监听

开启nc监听发送数据
$ nc -lk 7777
Web UI提交作业

浏览器访问
node2:8081
点击Submit New Job
https://img-blog.csdnimg.cn/direct/041e5d71ff0d41eb880458c3cbfea833.png
点击Add New
https://img-blog.csdnimg.cn/direct/0eed82e3a17845d6be0a1a3fb08e4427.png
选择flink作业jar包所在路径
https://img-blog.csdnimg.cn/direct/fad36ef31a234114a74422d0bf495258.png

点击jar包名称
https://img-blog.csdnimg.cn/direct/707fab351272465c90d5dccab07952f3.png

填写干系内容,点击Submit提交作业
Entry Class填写运行的主类,比方:org.example.wc.SocketStreamWordCount
Parallesim填写作业的并行度,比方:1
https://img-blog.csdnimg.cn/direct/6862eb48bf534059869a146e2e02b42a.png

提交后,在Running Jobs里看到运行的作业
https://img-blog.csdnimg.cn/direct/57701c6f9fb94f89a13c9c78c0075f30.png

发送测试数据

往7777端口发送数据
https://img-blog.csdnimg.cn/direct/1be071c4953b4e39ac0907a54bbde508.png

检察结果

https://img-blog.csdnimg.cn/direct/f669009c103f432f9cc6489571a9b459.png

继续发送测试数据

https://img-blog.csdnimg.cn/direct/3d7e2f55a8ea4cf591bd262eceab531d.png

刷新结果

https://img-blog.csdnimg.cn/direct/062c44feef45406bb2fb417e4f6feb9d.png

取消作业

https://img-blog.csdnimg.cn/direct/782f0d0d99054e80b43827a8e7fa5e53.png

停止nc监听

按住Ctrl+c停止nc下令

关闭flink集群

$ stop-cluster.sh
Stopping taskexecutor daemon (pid: 2283) on host node2.
Stopping taskexecutor daemon (pid: 1827) on host node3.
Stopping taskexecutor daemon (pid: 1829) on host node4.
Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee


登录gitee

https://gitee.com/ 注意:如果还没有gitee账号,必要先注册;如果之前没有设置过SSH公钥,必要先设置SSH公钥。

创建仓库

https://img-blog.csdnimg.cn/direct/d2836d437fc24e03b906dc4c7362b05c.png



https://img-blog.csdnimg.cn/direct/783b57a957c341668a3aafeea531d8a2.png


https://img-blog.csdnimg.cn/direct/27a7a5d9df4e463dab732e233871cd28.png

提交接码

利用IDEA提交接码
https://img-blog.csdnimg.cn/direct/43242cf17d2d4f4cbf87746fe8c71ba2.png

https://img-blog.csdnimg.cn/direct/bee4aa8ebcd64e249f84d98ebbc6cf00.png

提示有告诫,忽略告诫,继续提交
https://img-blog.csdnimg.cn/direct/b6d052e7f6f1461a80f361c43d1e3af8.png

https://img-blog.csdnimg.cn/direct/0875a788ede04fceab476e0af8e65038.png

https://img-blog.csdnimg.cn/direct/c851d5c8ee4649048dcc534b8e32727d.png
https://img-blog.csdnimg.cn/direct/e3d5dd4da88b4701a417ef75c392b446.png

https://img-blog.csdnimg.cn/direct/77f5003ff47f4945a5f24fd33d176e0b.png

https://img-blog.csdnimg.cn/direct/ae4d60b6b5b54e998dcd195d541d60eb.png
https://img-blog.csdnimg.cn/direct/5d29ecd8794b404f81a8e3c1c7f9e796.png

提交乐成后,IDEA显示如下
https://img-blog.csdnimg.cn/direct/c98430763a4e45deb7f1dde083ef735d.png

刷新浏览器检察gitee界面,看到代码已上传乐成
https://img-blog.csdnimg.cn/direct/ae7d6865466447cf9be3a016548304ba.png



完成!enjoy it!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink WordCount实践