Flink实时统计单词【入门】

打印 上一主题 下一主题

主题 1011|帖子 1011|积分 3033

前言

单词统计【word count】是flink的最最最基础的入门案例,就犹如学习java的第一堂课是运行一个hello world程序同样告急。
这里通过使用netcat发送数据来模仿无界数据流。
代码案例

在代码案例之前,我们必要先介绍一下netcat这个工具。
netcat介绍

netcat(简称nc)是一个强盛的网络工具,被称之为网络瑞士军刀。他能通过TCP/UDP协议进行数据传输,支持一下核心功能


  • 端口扫描:测试端口是否开放
  • 数据传输:作为客户端、服务端发送或接收数据
  • 网络调试:模仿socket服务端或客户端
这个案例中,我们就使用netcat来模仿socket服务端来发送数据。
大多数的linux发行版本默认预装了netcat,直接使用nc命令就行了
  1. # 安装(如未预装)
  2. sudo apt install netcat  # Debian/Ubuntu
  3. sudo yum install nc      # CentOS/RHEL
  4. # 启动服务端(监听端口 9999)
  5. nc -lk 9999
复制代码
笔者使用的是腾讯云轻量级服务器ubuntu,已经预装了netcat,记得要在开放对应的端口权限,比如

创建maven工程

创建一个简单的maven工程,对应的pom文件如下
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.   <modelVersion>4.0.0</modelVersion>
  4.   <groupId>com.tml</groupId>
  5.   <artifactId>flink-demo</artifactId>
  6.   <version>1.0-SNAPSHOT</version>
  7.   <packaging>jar</packaging>
  8.   <name>flink-demo</name>
  9.   <url>http://maven.apache.org</url>
  10.   <properties>
  11.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12.     <flink.version>1.18.0</flink.version> <!-- 根据你的 Flink 版本进行调整 -->
  13.     <maven.compiler.source>1.8</maven.compiler.source>
  14.     <maven.compiler.target>1.8</maven.compiler.target>
  15.   </properties>
  16.   <dependencies>
  17.     <dependency>
  18.       <groupId>junit</groupId>
  19.       <artifactId>junit</artifactId>
  20.       <version>3.8.1</version>
  21.       <scope>test</scope>
  22.     </dependency>
  23.     <dependency>
  24.       <groupId>org.apache.flink</groupId>
  25.       <artifactId>flink-java</artifactId>
  26.       <version>${flink.version}</version>
  27.     </dependency>
  28.     <!-- Flink Streaming API -->
  29.     <dependency>
  30.       <groupId>org.apache.flink</groupId>
  31.       <artifactId>flink-streaming-java</artifactId>
  32.       <version>${flink.version}</version>
  33.     </dependency>
  34.     <!-- Flink Table API and SQL -->
  35.     <dependency>
  36.       <groupId>org.apache.flink</groupId>
  37.       <artifactId>flink-clients</artifactId>
  38.       <version>${flink.version}</version>
  39.     </dependency>
  40.   </dependencies>
  41. </project>
复制代码
代码案例 

这里的需求就是,每来了一条消息,我就按照空字符进行切分,进行统计,比如hello world如许一条消息,我会根据空格来进行切分,对hello、world分别计数为1,最后进行累加,从而实时统计单词的数量
  1. package com.tml;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. /**
  7. * 从socket流读取数据
  8. */
  9. public class WordCountFromSocket {
  10.     public static void main(String[] args) throws Exception {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         env.setParallelism(1);
  13.         DataStream<String> socketTextStream = env.socketTextStream("xxx", 9999);
  14.         //数据流处理
  15.         SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketTextStream.flatMap(new Tokenizer()).keyBy(t -> t.f0).sum(1);
  16.         sum.print();
  17.         env.execute("Socket Stream WordCount~");
  18.     }
  19. }
复制代码

真正的逻辑处理是在Tokenizer类中,对应的代码如下
  1. package com.tml;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.util.Collector;
  5. /**
  6. * 自定义 FlatMapFunction 实现单词拆分
  7. */
  8. public class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> {
  9.     @Override
  10.     public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  11.         //按照空格或者制表符分割单词
  12.         String[] words = s.split("\\s+");
  13.         for (String word : words) {
  14.             collector.collect(new Tuple2<>(word, 1));
  15.         }
  16.     }
  17. }
复制代码
 运行案例

先启动服务端



再启动客户端

运行main主类,发现程序再阻塞中,等待数据的流入,从这里也可以看出,flink是基于变乱驱动的。

模仿socket输入


运行结果查看


 从控制台的结果输出可以看到,flink的实时统计的结果是没有题目的!
总结

flink入门级别的案例,从运行过程到运行结果,可以感受到flink实时计算的强盛!完整的代码已上传至github【flink-demo】,欢迎围观!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

道家人

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表