SpringBoot集成flink

打印 上一主题 下一主题

主题 554|帖子 554|积分 1662

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
环境搭建:
①、安装flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安装Netcat
Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间创建 TCP/IP 或 UDP 连接。
用于测试网络中的端口,发送文件等操纵。
举行网络调试和探测,也可以举行加密连接和长途管理等高级网络操纵
  1. yum install -y nc # 安装nc命令
  2. nc -lk 8888 # 启动socket端口
复制代码
无界流之读取socket文本流

一、依靠
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.     <parent>
  6.         <artifactId>springboot-demo</artifactId>
  7.         <groupId>com.et</groupId>
  8.         <version>1.0-SNAPSHOT</version>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <artifactId>flink</artifactId>
  12.     <properties>
  13.         <maven.compiler.source>8</maven.compiler.source>
  14.         <maven.compiler.target>8</maven.compiler.target>
  15.     </properties>
  16.     <dependencies>
  17.         <dependency>
  18.             <groupId>org.springframework.boot</groupId>
  19.             <artifactId>spring-boot-starter-web</artifactId>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>org.springframework.boot</groupId>
  23.             <artifactId>spring-boot-autoconfigure</artifactId>
  24.         </dependency>
  25.         <dependency>
  26.             <groupId>org.springframework.boot</groupId>
  27.             <artifactId>spring-boot-starter-test</artifactId>
  28.             <scope>test</scope>
  29.         </dependency>
  30.         <!-- 添加 Flink 依赖 -->
  31.         <dependency>
  32.             <groupId>org.apache.flink</groupId>
  33.             <artifactId>flink-streaming-java</artifactId>
  34.             <version>1.17.0</version>
  35.         </dependency>
  36.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
  37.         <dependency>
  38.             <groupId>org.apache.flink</groupId>
  39.             <artifactId>flink-java</artifactId>
  40.             <version>1.17.0</version>
  41.         </dependency>
  42.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
  43.         <dependency>
  44.             <groupId>org.apache.flink</groupId>
  45.             <artifactId>flink-clients</artifactId>
  46.             <version>1.17.0</version>
  47.         </dependency>
  48.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
  49.         <dependency>
  50.             <groupId>org.apache.flink</groupId>
  51.             <artifactId>flink-connector-base</artifactId>
  52.             <version>1.17.0</version>
  53.         </dependency>
  54.         <dependency>
  55.             <groupId>org.apache.flink</groupId>
  56.             <artifactId>flink-connector-files</artifactId>
  57.             <version>1.17.0</version>
  58.         </dependency>
  59.         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
  60.         <dependency>
  61.             <groupId>org.apache.flink</groupId>
  62.             <artifactId>flink-connector-kafka</artifactId>
  63.             <version>1.17.0</version>
  64.         </dependency>
  65.         <dependency>
  66.             <groupId>org.apache.flink</groupId>
  67.             <artifactId>flink-runtime-web</artifactId>
  68.             <version>1.17.0</version>
  69.         </dependency>
  70.     </dependencies>
  71.     <build>
  72.         <plugins>
  73.             <plugin>
  74.                 <groupId>org.apache.maven.plugins</groupId>
  75.                 <artifactId>maven-shade-plugin</artifactId>
  76.                 <executions>
  77.                     <execution>
  78.                         <phase>package</phase>
  79.                         <goals>
  80.                             <goal>shade</goal>
  81.                         </goals>
  82.                         <configuration>
  83.                             <transformers>
  84.                                 <transformer
  85.                                         implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  86.                                     <resource>META-INF/spring.handlers</resource>
  87.                                 </transformer>
  88.                                 <transformer
  89.                                         implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
  90.                                     <resource>META-INF/spring.factories</resource>
  91.                                 </transformer>
  92.                                 <transformer
  93.                                         implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
  94.                                     <resource>META-INF/spring.schemas</resource>
  95.                                 </transformer>
  96.                                 <transformer
  97.                                         implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
  98.                                 <transformer
  99.                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  100.                                     <mainClass>com.et.flink.job.SocketJob</mainClass>
  101.                                 </transformer>
  102.                             </transformers>
  103.                         </configuration>
  104.                     </execution>
  105.                 </executions>
  106.             </plugin>
  107.         </plugins>
  108.     </build>
  109. </project>
复制代码
二、SoketJob
  1. public class SocketJob{
  2.        
  3.         public static void main(String[] args)throws Exception{
  4.                
  5.                 // 创建执行环境
  6.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7.         // 指定并行度,默认电脑线程数
  8.         env.setParallelism(3);
  9.         // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
  10.         DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
  11.         // 处理数据: 切换、转换、分组、聚合 得到统计结果
  12.         SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
  13.                 .flatMap(
  14.                         (String value, Collector<Tuple2<String, Integer>> out) -> {
  15.                             String[] words = value.split(" ");
  16.                             for (String word : words) {
  17.                                 out.collect(Tuple2.of(word, 1));
  18.                             }
  19.                         }
  20.                 )
  21.                 .setParallelism(2)
  22.                 // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据
  23.                 .returns(new TypeHint<Tuple2<String, Integer>>() {
  24.                 })
  25. //                .returns(Types.TUPLE(Types.STRING,Types.INT))
  26.                 .keyBy(value -> value.f0)
  27.                 .sum(1);
  28.         // 输出
  29.         sum.print();
  30.         // 执行
  31.         env.execute();
  32.         }
  33. }
复制代码
测试:
启动socket流:
  1. nc -l 8888
复制代码
本地执行:直接ideal启动main步伐,在socket流中输入
  1. abc bcd cde
  2. bcd cde fgh
  3. cde fgh hij
复制代码

集群执行:
执行maven打包,将打包的jar上传到集群中


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表