Flink -2-Flink 算子和java代码简单利用

打印 上一主题 下一主题

主题 495|帖子 495|积分 1485

Flink和Spark类似,也是一种一站式处置惩罚的框架;既可以进行批处置惩罚(DataSet),也可以进行实时处置惩罚(DataStream)。
以是下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。
DataSet 批处置惩罚算子

一、Source算子

1. fromCollection

fromCollection:从本地集合读取数据
例:
  1.         public static void main(String[] args) {        
  2.         StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStreamSource<String> stringDataStreamSource = streamEnv.fromCollection(Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六"));
  4.     }
复制代码
2. readTextFile

readTextFile:从文件中读取
  1. StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStreamSource<String> stringDataStreamSource = streamEnv.readTextFile("/data/a.txt");
复制代码
3. readTextFile:读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别而且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
压缩方法文件扩展名是否可并行读取DEFLATE.deflatenoGZip.gz .gzipnoBzip2.bz2noXZ.xzno
  1. StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStreamSource<String> stringDataStreamSource = streamEnv.readTextFile("/data/file.gz");
复制代码
二、Transform转换算子

因为Transform算子基于Source算子操纵,以是起首构建Flink执行环境及Source算子,后续Transform算子操纵基于此:
  1.         public static void main(String[] args) {        
  2.         StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         DataStreamSource<String> stringDataStreamSource = streamEnv.fromCollection(Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六"));
  4.     }
复制代码
1: map

将DataSet中的每一个元素转换为另外一个元素
  1.         import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. public class MapOperatorExample {
  5.     public static void main(String[] args) throws Exception {
  6.         // 获取 ExecutionEnvironment,用于创建数据集
  7.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  8.         // 假设已经有一个包含整数的数据集
  9.         DataSet<Integer> inputDataSet = env.fromElements(1, 2, 3, 4, 5);
  10.         // 使用 Map 算子对数据集进行转换,将每个整数加上 10,并输出结果
  11.         DataSet<Integer> outputDataSet = inputDataSet.map(new MapFunction<Integer, Integer>() {
  12.             @Override
  13.             public Integer map(Integer value) throws Exception {
  14.                 // 对每个整数都加上 10,并将结果作为新的数据集
  15.                 return value + 10;
  16.             }
  17.         });
  18.         // 输出转换后的数据集
  19.         outputDataSet.print();
  20.     }
  21. }
复制代码
2:flatMap



  • 将DataSet中的每一个元素转换为0…n个元素。
  • FlatMap 算子是 Flink 中的一种数据转换算子,它将输入的每个元素通过用户自界说的函数进行处置惩罚,并生成零个、一个或多个新的元素。FlatMap 算子的底层逻辑是对数据集中的每个元素应用用户界说的函数,并将函数返回的多个元素平铺成新的数据集
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.util.Collector;
  5. public class FlatMapOperatorExample {
  6.     public static void main(String[] args) throws Exception {
  7.         // 获取 ExecutionEnvironment,用于创建数据集
  8.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  9.         // 假设已经有一个包含文本行的数据集
  10.         DataSet<String> inputDataSet = env.fromElements(
  11.             "Flink is a powerful framework for stream and batch processing",
  12.             "It provides support for event time processing"
  13.         );
  14.         // 使用 FlatMap 算子对数据集进行拆分并生成单词列表
  15.         DataSet<String> wordDataSet = inputDataSet.flatMap(new FlatMapFunction<String, String>() {
  16.             @Override
  17.             public void flatMap(String value, Collector<String> out) throws Exception {
  18.                 // 按空格拆分文本行,并将拆分后的单词逐个添加到输出集合
  19.                 String[] words = value.split(" ");
  20.                 for (String word : words) {
  21.                     out.collect(word);
  22.                 }
  23.             }
  24.         });
  25.         // 输出单词列表
  26.         wordDataSet.print();
  27.     }
  28. }
复制代码
3:Filter 算子



  • Filter 算子是 Flink 中的一种数据转换算子,它通过用户自界说的条件函数对数据集中的每个元素进行过滤,只保存满足条件的元素。
  • Filter 算子的底层逻辑是对数据集中的每个元素应用用户界说的条件函数,只保存函数返回值为 true 的元素,过滤掉返回值为 false 的元素。
  1. import org.apache.flink.api.common.functions.FilterFunction;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. public class FilterOperatorExample {
  5.     public static void main(String[] args) throws Exception {
  6.         // 获取 ExecutionEnvironment,用于创建数据集
  7.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  8.         // 假设已经有一个包含整数的数据集
  9.         DataSet<Integer> inputDataSet = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  10.         // 使用 Filter 算子对数据集进行过滤,只保留偶数
  11.         DataSet<Integer> evenDataSet = inputDataSet.filter(new FilterFunction<Integer>() {
  12.             @Override
  13.             public boolean filter(Integer value) throws Exception {
  14.                 // 判断是否为偶数,保留返回 true 的元素
  15.                 return value % 2 == 0;
  16.             }
  17.         });
  18.         // 输出只包含偶数的数据集
  19.         evenDataSet.print();
  20.     }
  21. }
复制代码
4:Reduce 算子



  • 可以对一个dataset大概一个group来进行聚合盘算,终极聚合成一个元素
  • Reduce 算子是 Flink 中的一个基本聚合算子,用于对数据集中的元素进行二元聚合操纵。
  • Reduce 算子会将数据集中的元素两两配对,并利用用户提供的二元操纵函数对配对的元素进行聚合,然后将聚合结果继承与下一个元素配对,直至处置惩罚完所有元素。终极,Reduce 算子会返回一个单一的结果值。
  1. import org.apache.flink.api.common.functions.ReduceFunction;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. public class ReduceOperatorExample {
  5.     public static void main(String[] args) throws Exception {
  6.         // 获取 ExecutionEnvironment,用于创建数据集
  7.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  8.         // 假设已经有一个包含整数的数据集
  9.         DataSet<Integer> inputDataSet = env.fromElements(1, 2, 3, 4, 5);
  10.         // 使用 Reduce 算子计算数据集中所有元素的总和
  11.         DataSet<Integer> resultDataSet = inputDataSet.reduce(new ReduceFunction<Integer>() {
  12.             @Override
  13.             public Integer reduce(Integer value1, Integer value2) throws Exception {
  14.                 return value1 + value2; // 将两个元素相加得到新的结果
  15.             }
  16.         });
  17.         // 输出计算结果
  18.         resultDataSet.print();
  19.     }
  20. }
复制代码
5:Aggregations

KeyedStream → DataStream


  • Aggregations 算子是 Flink 中用于对数据集进行聚合操纵的一组函数。它可以用于盘算数据集中的最小值、最大值、求和、均匀值等统计信息。Flink 提供了一系列内置的聚合函数,如 min、max、sum、avg 等。
  1. //算平均值
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.aggregation.Aggregations;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. public class AggregationsOperatorExample {
  7.     public static void main(String[] args) throws Exception {
  8.         // 获取 ExecutionEnvironment,用于创建数据集
  9.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  10.         // 假设已经有一个包含学生姓名和成绩的数据集
  11.         DataSet<Tuple2<String, Double>> inputDataSet = env.fromElements(
  12.                 new Tuple2<>("Alice", 85.0),
  13.                 new Tuple2<>("Bob", 78.5),
  14.                 new Tuple2<>("Cathy", 92.5),
  15.                 new Tuple2<>("David", 65.0),
  16.                 new Tuple2<>("Eva", 88.5)
  17.         );
  18.         // 使用 Aggregations 算子计算成绩的平均值
  19.         double avgScore = inputDataSet.aggregate(Aggregations.SUM, 1).div(inputDataSet.count());
  20.         // 输出计算结果
  21.         System.out.println("平均成绩:" + avgScore);
  22.     }
  23. }
复制代码
重要的聚合方法:


  • keyedStream.sum(0);
  • keyedStream.sum(“key”);
  • keyedStream.min(0);
  • keyedStream.min(“key”);
  • keyedStream.max(0);
  • keyedStream.max(“key”);
  • keyedStream.minBy(0);
  • keyedStream.minBy(“key”);
  • keyedStream.maxBy(0);
  • keyedStream.maxBy(“key”);
6:Distinct 算子



  • Distinct 算子是 Flink 中的一个转换算子,它用于去除输入流中重复的元素,并将去重后的结果作为输出流。Distinct 算子是在整个数据流上进行去重操纵,不需要进行分组。
  • 在底层,Distinct 算子通过维护一个状态来记录已经出现过的元素,当新的元素到达时,会与状态中的元素进行比较,假如状态中不存在该元素,则将其输出,并将其添加到状态中,以便后续去重。
Distinct 算子在很多场景下都很有用,比方:


  • 数据去重:在流式盘算中,常常会有重复的数据到达,而我们只关心每个数据的第一次出现,可以利用 Distinct 算子往复重。
  • 实时数据摘要:有时候需要根据某个字段的特性选择每个分组的代表元素,比方选择每个用户的初次登录信息作为数据摘要。
  • 数据洗濯:在处置惩罚实时数据流时,可能会有一些无效或异常的数据需要洗濯,Distinct 算子可以帮助去除重复的无效数据。
  1. package com.wenge.datagroup.storage;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.DistinctOperator;
  5. /**
  6. * @author wangkanglu
  7. * @version 1.0
  8. * @description
  9. * @date 2024-07-24 16:58
  10. */
  11. public class TestFlink {
  12.     public static void main(String[] args) throws Exception {
  13.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  14.         // 模拟输入流,包含重复的元素
  15.         DataSet<Integer> inputStream = env.fromElements(1, 2, 3, 2, 4, 5, 1, 6, 7, 5);
  16.         // 对输入流进行去重操作
  17.         DistinctOperator<Integer> distinct = inputStream.distinct();
  18.         distinct.print();
  19.         env.execute("Distinct Example");
  20.     }
  21. }
复制代码
7:First 算子

First 算子是 Flink 中的一个转换算子,它用于从输入流中选择每个 Key 的第一个元素,并将其作为输出流中的结果。在流式盘算中,常常需要根据某个特定的字段进行分组,并选择每个分组中的第一个元素,这时可以利用 First 算子来实现这个功能。First 算子是 KeyedStream 上的操纵,以是在利用之前,需要先将数据流进行分组。
First 算子在许多场景下都很有用,比方:


  • 数据去重:假如数据流中可能包含重复的元素,而我们只关心每个元素的第一次出现,可以利用 First 算子往复重。
  • 实时数据摘要:在流式盘算中,有时需要根据某个字段的特性选择每个分组的代表元素,比方选择每个用户的初次登录信息作为数据摘要。
  • 时间窗口操纵:在流式盘算中,常常需要对窗口内的数据进行处置惩罚,而 First 算子可以用于选择每个窗口的起始元素。
  1. package com.wenge.datagroup.storage;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.GroupReduceOperator;
  5. /**
  6. * @author wangkanglu
  7. * @version 1.0
  8. * @description
  9. * @date 2024-07-24 16:58
  10. */
  11. public class TestFlink {
  12.     public static void main(String[] args) throws Exception {
  13.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  14.         // 模拟输入流,包含重复的元素
  15.         DataSet<Integer> inputStream = env.fromElements(1, 2, 3, 2, 4, 5, 1, 6, 7, 5);
  16.         // 对输入流进行去重操作
  17.         GroupReduceOperator<Integer, Integer> first = inputStream.first(2);
  18.         //取前两个数
  19.         first.print();
  20.         env.execute("Distinct Example");
  21.     }
  22. }
复制代码
8:Join 算子

Join 算子是 Flink 中用于将两个数据集进行连接操纵的一种算子。它通过指定连接的键(Key)将两个数据集中的元素按照某种条件进行关联,从而生成一个包含连接结果的新数据集。
应用场景


  • Join 算子适用于需要将两个数据集进行关联的场景。常见的应用场景包罗关联用户信息和订单信息、关联商品信息和贩卖信息等。
  1. import org.apache.flink.api.java.DataSet;
  2. import org.apache.flink.api.java.ExecutionEnvironment;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple3;
  5. public class JoinOperatorExample {
  6.     public static void main(String[] args) throws Exception {
  7.         // 获取 ExecutionEnvironment,用于创建数据集
  8.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  9.         // 假设已经有包含订单信息的数据集
  10.         DataSet<Tuple3<String, String, Integer>> orders = env.fromElements(
  11.                 new Tuple3<>("Order001", "Product001", 2),
  12.                 new Tuple3<>("Order002", "Product002", 1),
  13.                 new Tuple3<>("Order003", "Product001", 3),
  14.                 new Tuple3<>("Order004", "Product003", 5)
  15.         );
  16.         // 假设已经有包含商品信息的数据集
  17.         DataSet<Tuple2<String, String>> products = env.fromElements(
  18.                 new Tuple2<>("Product001", "Apple"),
  19.                 new Tuple2<>("Product002", "Banana"),
  20.                 new Tuple2<>("Product003", "Orange")
  21.         );
  22.         // 使用 Join 算子将订单信息和商品信息按照商品编号进行连接
  23.         DataSet<Tuple3<String, String, Integer>> result = orders.join(products)
  24.                 .where(new OrderProductJoinKeySelector())
  25.                 .equalTo(0)
  26.                 .projectFirst(0, 1)
  27.                 .projectSecond(1);
  28.         // 输出连接结果
  29.         result.print();
  30.     }
  31.     // 自定义 KeySelector,用于指定连接的键(商品编号)
  32.     public static class OrderProductJoinKeySelector implements KeySelector<Tuple3<String, String, Integer>, String> {
  33.         @Override
  34.         public String getKey(Tuple3<String, String, Integer> value) {
  35.             return value.f1; // 商品编号是连接的键
  36.         }
  37.     }
  38. }
复制代码
  在上面的代码中,我们起首导入了 Flink 的相干类,然后创建了一个 ExecutionEnvironment 对象 env,用于创建数据集。接着,利用 env.fromElements() 方法创建了包含订单信息的数据集 orders 和包含商品信息的数据集 products。
  然后,我们利用 join 方法对 orders 和 products 数据集进行连接。在 join 方法中,我们需要通过自界说 KeySelector 对象 OrderProductJoinKeySelector 指定连接的键(商品编号)。接着,我们通过 equalTo(0) 方法指定连接条件,体现连接键在 orders 数据集中的位置是 0,在 products 数据集中的位置也是 0。
末了,我们利用 projectFirst(0, 1) 方法和 projectSecond(1) 方法分别指定连接后要输出的字段,从而生成包含连接结果的新数据集 result。
终极,我们输出连接结果,即关联后的订单信息和商品名称。在本例中,输出结果为:
(Order001, Product001, 2) Apple
(Order002, Product002, 1) Banana
(Order003, Product001, 3) Apple
(Order004, Product003, 5) Orange
即订单信息和商品信息已按照商品编号进行连接,并输出了关联后的订单信息和商品名称。
9:Outer Join 算子

Outer Join 是 Flink 中用于进行外连接操纵的算子。外连接是关系型数据库中的概念,在 Flink 中,它允许将两个数据流中的元素按照指定的键进行连接,并返回所有的元素,包罗那些在其中一个数据流中存在而在另一个数据流中不存在的元素。外连接操纵可以分为左外连接、右外连接和全外连接。
在底层,Outer Join 算子会维护一个状态来记录两个数据流中的匹配关系,并根据指定的键进行匹配。对于左外连接和右外连接,当某个数据流中的元素找不到匹配项时,会生成一个空值大概指定的默认值。对于全外连接,无论两个数据流中是否存在匹配项,都会输出所有的元素。
应用场景


  • 外连接是一种常用的数据合并和关联操纵,适用于以下场景:
  • 合并数据:将两个数据流中的数据按照指定的键进行合并,可以用于数据的联合分析和展示。
  • 增补缺失信息:在关联操纵中,可能会有一些数据流中的元素在另一个数据流中找不到匹配项,利用外连接可以添补缺失信息。
  • 数据洗濯:有时候需要对两个数据流进行关联,去除不匹配的数据大概添加默认值。
  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class OuterJoinExample {
  4.     public static void main(String[] args) throws Exception {
  5.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6.         // 模拟两个数据流
  7.         DataStream<Tuple2<String, Integer>> stream1 = env.fromElements(
  8.                 new Tuple2<>("A", 1),
  9.                 new Tuple2<>("B", 2),
  10.                 new Tuple2<>("C", 3)
  11.         );
  12.         DataStream<Tuple2<String, String>> stream2 = env.fromElements(
  13.                 new Tuple2<>("A", "X"),
  14.                 new Tuple2<>("B", "Y"),
  15.                 new Tuple2<>("D", "Z")
  16.         );
  17.         // 对两个数据流进行外连接操作,连接键为第一个元素
  18.         DataStream<Tuple3<String, Integer, String>> result = stream1
  19.                 .leftOuterJoin(stream2)
  20.                 .where(tuple -> tuple.f0) // 第一个数据流的连接键
  21.                 .equalTo(tuple -> tuple.f0) // 第二个数据流的连接键
  22.                 .with((tuple1, tuple2) -> { // 匹配成功的处理逻辑
  23.                     if (tuple2 == null) { // 若tuple2为空,表示匹配失败,使用默认值"UNKNOWN"
  24.                         return new Tuple3<>(tuple1.f0, tuple1.f1, "UNKNOWN");
  25.                     } else {
  26.                         return new Tuple3<>(tuple1.f0, tuple1.f1, tuple2.f1);
  27.                     }
  28.                 });
  29.         result.print();
  30.         env.execute("Outer Join Example");
  31.     }
  32. }
复制代码
  在上面的示例中,我们利用 env.fromElements 方法创建了两个模仿数据流 stream1 和 stream2,分别包含差别的元素。然后,我们调用 leftOuterJoin 方法对这两个数据流进行外连接操纵,连接键为第一个元素。在 with 方法中,我们界说了匹配成功的处置惩罚逻辑:当第二个数据流中找不到匹配项时,利用默认值"UNKNOWN"添补;否则,输出匹配成功的元素。在输出结果中,我们可以看到两个数据流中的元素都被连接在一起,而且在匹配失败的环境下添补了默认值"UNKNOWN"。
  10:Cross 算子



  • Cross 是 Flink 中的一个算子,用于将两个数据流中的所有元素进行两两组合,产生所有可能的组合结果。在底层,Cross 算子会维护两个数据流的状态,并对其中的每个元素进行遍历,将两个数据流的所有元素进行两两组合,并输出所有可能的组合结果。
  • 交叉操纵,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集
  • 和join类似,但是这种交叉操纵会产生笛卡尔积,在数据比较大的时候,是非常斲丧内存的操纵
应用场景
Cross 算子在现实应用中相对较少,因为它会产生较大的输出结果。但是在某些特定场景下,它仍然有一些用途,比方:


  • 分列组合:在某些场景下,需要对两个数据流中的元素进行分列组合,生成所有可能的组合结果。
  • 笛卡尔积:对于两个数据流之间的笛卡尔积操纵,可以利用 Cross 算子进行实现。
  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. public class CrossExample {
  4.     public static void main(String[] args) throws Exception {
  5.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6.         // 模拟两个数据流
  7.         DataStream<Integer> stream1 = env.fromElements(1, 2, 3);
  8.         DataStream<String> stream2 = env.fromElements("A", "B");
  9.         // 对两个数据流进行笛卡尔积操作
  10.         DataStream<String> result = stream1.cross(stream2)
  11.                 .with((num, str) -> num + "-" + str);
  12.         result.print();
  13.         env.execute("Cross Example");
  14.     }
  15. }
复制代码
  在上面的示例中,我们利用 env.fromElements 方法创建了两个模仿数据流 stream1 和 stream2,其中 stream1 包含整数 1、2 和 3,stream2 包含字符串 “A” 和 “B”。然后,我们调用 cross 方法对这两个数据流进行笛卡尔积操纵,并在 with 方法中界说了组合结果的逻辑:将整数和字符串进行组合,用"-"分隔。在输出结果中,我们可以看到所有可能的组合结果,即整数和字符串之间的所有组合。请注意,Cross 算子会产生较大的输出结果,因此在现实应用中需要谨慎利用。
  11:Union 算子

Union 算子是 Flink 中用于将多个数据集合并成一个新数据集的算子。它将多个数据集的元素合并在一起,形成一个新的数据集。
应用场景


  • Union 算子适用于需要将多个数据集合并在一起的场景。比方,在流处置惩罚中,可能需要将多个数据流合并为一个数据流进行后续处置惩罚;在批处置惩罚中,可能需要将多个数据集合并在一起进行并行处置惩罚。
  1. import org.apache.flink.api.java.DataSet;
  2. import org.apache.flink.api.java.ExecutionEnvironment;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. public class UnionOperatorExample {
  5.     public static void main(String[] args) throws Exception {
  6.         // 获取 ExecutionEnvironment,用于创建数据集
  7.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  8.         // 假设已经有两个数据集,包含整数和字符串
  9.         DataSet<Integer> dataSet1 = env.fromElements(1, 2, 3);
  10.         DataSet<Integer> dataSet2 = env.fromElements(4, 5, 6);
  11.         DataSet<String> dataSet3 = env.fromElements("A", "B", "C");
  12.         // 使用 Union 算子将两个数据集合并成一个新数据集
  13.         DataSet<Integer> mergedDataSet = dataSet1.union(dataSet2);
  14.         // 使用 Union 算子将三个数据集合并成一个新数据集
  15.         DataSet<Tuple2<Integer, String>> combinedDataSet = dataSet1.union(dataSet2).union(dataSet3);
  16.         // 输出合并结果
  17.         mergedDataSet.print();
  18.         combinedDataSet.print();
  19.     }
  20. }
复制代码
三、Sink算子

1. collect

将数据输出到本地集合
  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.util.Collector;
  6. public class TransformationsFlatmap {
  7.     public static void main(String []arv) throws Exception
  8.     {
  9.         StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
  10.         DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);
  11.         //函数式
  12. //输入spark,hive,hbase
  13.         DataStream ds1=dsSocket.flatMap(new FlatMapFunction<String,Tuple2<String,Integer>>() {
  14.             @Override
  15.             public void flatMap(String value, Collector<Tuple2<String,Integer>> collector) throws Exception {
  16.                 String [] words=value.split(",");
  17.                 for(String word:words){
  18.                     collector.collect(Tuple2.of(word,1));
  19.                 }
  20.             }
  21.         });
  22.          ds1.print();
  23.          env.execute("TransformationsMap");
  24.     }
  25. }
复制代码
2. writeAsText

将数据输出到文件


  • Flink支持多种存储装备上的文件,包罗本地文件,hdfs文件等
  • Flink支持多种文件的存储格式,包罗text文件,CSV文件等
  1. // 将数据写入本地文件
  2. result.writeAsText("/data/a", WriteMode.OVERWRITE)
  3. // 将数据写入HDFS
  4. result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)
复制代码
DataStream流处置惩罚算子

和DataSet一样,DataStream也包罗一系列的Transformation操纵
一、Source算子

Flink可以利用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然我们也可以通过实现 SourceFunction 来自界说非并行的source大概实现 ParallelSourceFunction 接口大概扩展 RichParallelSourceFunction 来自界说并行的 source。
Flink在流处置惩罚上的source和在批处置惩罚上的source基本划一。大致有4大类:


  • 基于本地集合的source(Collection-based-source)
  • 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
  • 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
  • 自界说的source(Custom-source)
1:基于文件



  • readTextFile(path) - 读取文本文件,比方遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的差别,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),大概处置惩罚一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。利用 pathFilter,用户可以进一步排除正在处置惩罚的文件。
  1. import org.apache.flink.api.common.RuntimeExecutionMode;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. /**
  5. * @author alanchan
  6. *
  7. */
  8. public class Source_File {
  9.         /**
  10.          * 一般用于学习测试 env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
  11.          *
  12.          * @param args
  13.          * @throws Exception
  14.          */
  15.         public static void main(String[] args) throws Exception {
  16.                 // env
  17.                 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18.                 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  19.                 // source
  20.                 DataStream<String> ds1 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
  21.                 DataStream<String> ds2 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/input/distribute_cache_student");
  22.                 DataStream<String> ds3 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.tar.gz");
  23.                 DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");
  24.                 // transformation
  25.                 // sink
  26.                 ds1.print();
  27.                 ds2.print();
  28.                 ds3.print();
  29.                 ds4.print();
  30.                 // execute
  31.                 env.execute();
  32.         }
  33. }
复制代码
2:基于套接字

socketTextStream - 从套接字读取。元素可以由分隔符分隔。
  1. import org.apache.flink.api.common.RuntimeExecutionMode;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. /**
  9. * @author alanchan
  10. *         在server2上使用nc -lk 9999 向指定端口发送数据
  11. *         nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据
  12. *         如果没有该命令可以下安装 yum install -y nc
  13. *         
  14. */
  15. public class Source_Socket {
  16.         /**
  17.          * @param args
  18.          * @throws Exception
  19.          */
  20.         public static void main(String[] args) throws Exception {
  21.                 //env
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  24.         //source
  25.         DataStream<String> lines = env.socketTextStream("server2", 9999);
  26.         
  27.       //transformation
  28.         /*SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  29.             @Override
  30.             public void flatMap(String value, Collector<String> out) throws Exception {
  31.                 String[] arr = value.split(" ");
  32.                 for (String word : arr) {
  33.                     out.collect(word);
  34.                 }
  35.             }
  36.         });
  37.         words.map(new MapFunction<String, Tuple2<String,Integer>>() {
  38.             @Override
  39.             public Tuple2<String, Integer> map(String value) throws Exception {
  40.                 return Tuple2.of(value,1);
  41.             }
  42.         });*/
  43.         //注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回
  44. //        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  45. //            @Override
  46. //            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  47. //                String[] arr = value.split(" ");
  48. //                for (String word : arr) {
  49. //                    out.collect(Tuple2.of(word, 1));
  50. //                }
  51. //            }
  52. //        });
  53. //
  54. //        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);
  55.         //sink
  56.         lines.print();
  57.         //execute
  58.         env.execute();
  59.         }
  60. }
复制代码
3:基于集合



  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
  • fromElements(T …) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
  • generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。
  1. import java.util.Arrays;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. /**
  6. * @author alanchan
  7. *
  8. */
  9. public class Source_Collection {
  10.         /**
  11.          * 一般用于学习测试时编造数据时使用
  12.          * 1.env.fromElements(可变参数);
  13.          * 2.env.fromColletion(各种集合);
  14.          * 3.env.generateSequence(开始,结束);
  15.          * 4.env.fromSequence(开始,结束);
  16.          *
  17.          * @param args 基于集合
  18.          * @throws Exception
  19.          */
  20.         public static void main(String[] args) throws Exception  {
  21.                 // env
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  24.         // source
  25.         DataStream<String> ds1 = env.fromElements("i am alanchan", "i like flink");
  26.         DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan", "i like flink"));
  27.         DataStream<Long> ds3 = env.generateSequence(1, 10);//已过期,使用fromSequence方法
  28.         DataStream<Long> ds4 = env.fromSequence(1, 100);
  29.         // transformation
  30.         // sink
  31.         ds1.print();
  32.         ds2.print();
  33.         ds3.print();
  34.         ds4.print();
  35.         // execute
  36.         env.execute();
  37.         }
  38. }
复制代码
4:自界说

addSource - 关联一个新的 source function。比方,你可以利用 addSource(new FlinkKafkaConsumer<>(…)) 来从 Apache Kafka 获取数据。
kafka

  1. package com.wenge.datagroup.storage;
  2. import com.wenge.datagroup.storage.common.ArgsConstants;
  3. import com.wenge.datagroup.storage.utils.ConfigUtil;
  4. import com.wenge.datagroup.storage.utils.Funnel;
  5. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  10. import org.apache.kafka.clients.consumer.ConsumerConfig;
  11. import java.util.Properties;
  12. /**
  13. * @author wangkanglu
  14. * @version 1.0
  15. * @description
  16. * @date 2024-07-24 16:58
  17. */
  18. public class TestFlink {
  19.     public static void main(String[] args) {
  20.         StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  21.         int sourceParallelism =  1;
  22.         String topic = "topic_name";
  23.         Properties properties = getParameters();
  24.         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
  25.                 topic,
  26.                 new SimpleStringSchema(),
  27.                 properties
  28.         );
  29.         DataStreamSource<String> kafkaDataStreamSource = streamEnv.addSource(consumer);
  30.         DataStream<String> dataStream = kafkaDataStreamSource.setParallelism(sourceParallelism).name("KafkaSource-" + topic);
  31.         dataStream .print();
  32.         // execute
  33.                 streamEnv .execute();
  34.     }
  35.    
  36.     private static Properties getParameters(){
  37.         Properties properties = new Properties();
  38.         // 集群地址
  39.         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092");
  40.         // 消费者组id
  41.         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "wkl_test");
  42.         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  43.         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  44.         properties.setProperty("partition.discovery.interval.ms", "10000");//消费者定期发现动态创建的Kafka主题和分区的时间间隔
  45.         // latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
  46.         // earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
  47.         properties.setProperty("auto.offset.reset", "latest");
  48. //        properties.setProperty("enable.auto.commit", "true");
  49. //        properties.setProperty("auto.commit.interval.ms", "1000");//自动提交的时间间隔
  50.         properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交的时间间隔
  51.         properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "20000");//自动提交的时间间隔
  52.         //每次从kafka中获取的数量
  53.         properties.setProperty("max.poll.records", "2");
  54.         return properties;
  55.     }
  56.       
  57. }
复制代码
mysql

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.PreparedStatement;
  4. import java.sql.ResultSet;
  5. import org.apache.flink.api.common.RuntimeExecutionMode;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  10. import org.source_transformation_sink.bean.User;
  11. /**
  12. * @author alanchan
  13. * 自定义数据源-MySQL
  14. */
  15. public class Source_MySQL {
  16.         /**
  17.          * @param args
  18.          * @throws Exception
  19.          */
  20.         public static void main(String[] args) throws Exception {
  21.                 // env
  22.                 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.                 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  24.                 // source
  25.                 DataStream<User> studentDS = env.addSource(new MySQLSource()).setParallelism(1);
  26.                 // transformation
  27.                 // sink
  28.                 studentDS.print();
  29.                 // execute
  30.                 env.execute();
  31.         }
  32.         private static class MySQLSource extends RichParallelSourceFunction<User> {
  33.                 private boolean flag = true;
  34.                 private Connection conn = null;
  35.                 private PreparedStatement ps = null;
  36.                 private ResultSet rs = null;
  37.                 // open只执行一次,适合开启资源
  38.                 @Override
  39.                 public void open(Configuration parameters) throws Exception {
  40.                         conn = DriverManager.getConnection("jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
  41.                         String sql = "select id,name,pwd,email,age,balance from user";
  42.                         ps = conn.prepareStatement(sql);
  43.                 }
  44.                 @Override
  45.                 public void run(SourceContext<User> ctx) throws Exception {
  46.                         while (flag) {
  47.                                 rs = ps.executeQuery();
  48.                                 while (rs.next()) {
  49.                                         User user = new User(
  50.                                                 rs.getInt("id"),
  51.                                                 rs.getString("name"),
  52.                                                 rs.getString("pwd"),
  53.                                                 rs.getString("email"),
  54.                                                 rs.getInt("age"),
  55.                                                 rs.getDouble("balance")
  56.                                         );
  57.                                         ctx.collect(user);
  58.                                 }
  59.                                 Thread.sleep(5000);
  60.                         }
  61.                 }
  62.                 // 接收到cancel命令时取消数据生成
  63.                 @Override
  64.                 public void cancel() {
  65.                         flag = false;
  66.                 }
  67.                 // close里面关闭资源
  68.                 @Override
  69.                 public void close() throws Exception {
  70.                         if (conn != null)
  71.                                 conn.close();
  72.                         if (ps != null)
  73.                                 ps.close();
  74.                         if (rs != null)
  75.                                 rs.close();
  76.                 }
  77.         }
  78. }
  79. import lombok.AllArgsConstructor;
  80. import lombok.Data;
  81. import lombok.NoArgsConstructor;
  82. /**
  83. * @author alanchan
  84. *
  85. */
  86. @Data
  87. @AllArgsConstructor
  88. @NoArgsConstructor
  89. public class User {
  90.         private int id;
  91.         private String name;
  92.         private String pwd;
  93.         private String email;
  94.         private int age;
  95.         private double balance;
  96. }
复制代码
二、Transform转换算子

Flink算子,实在就是”数据转换算子“,对数据进行处置惩罚的方法大概程序封装就是算子

1: map

Map算子,就是映射算子,将一个数据映射为另一个数据,与Java8 stream 流式操纵中的map划一
  1. public static void main(String[] args) throws Exception {
  2.     List<String> stringList = Arrays.asList("a", "b", "c");
  3.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.     // 设置并行度为1 (1个线程执行,以便观察)
  5.     env.setParallelism(1);
  6.     env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  7.     // 加载数据源
  8.     DataStreamSource<String> stream = env.fromCollection(stringList);
  9.     // 使用map算子
  10.     SingleOutputStreamOperator<String> source = stream.map(new MapFunction<String, String>() {
  11.         @Override
  12.         public String map(String value) throws Exception {
  13.             return value.toUpperCase()+ "aa";
  14.         }
  15.     });
  16.     // Aaa Baa Caa
  17.     source.print();
  18.     env.execute();
  19. }
复制代码
2:FlatMap算子

FlatMap算子,可以将数据进行摊平化处置惩罚 比方 原本每一个元素都是集合大概数数组,我们利用FlatMap后,可以将(集合,数组)进行再次拆解取出其中的数据,再新组合为集合,与Java8 stream 流式操纵中的Flatmap功能划一
  1. public static void main(String[] args) throws Exception {
  2.     List<String> str1 = Arrays.asList("a", "b", "c");
  3.     List<String> str2 = Arrays.asList("关羽","张飞","马超","黄忠","赵云");
  4.     List<List<String>> originalData = new ArrayList<>();
  5.     originalData.add(str1);
  6.     originalData.add(str2);
  7.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8.     // 设置并行度为1 (1个线程执行,以便观察)
  9.     env.setParallelism(1);
  10.     env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  11.     // 加载数据源
  12.     DataStreamSource<List<String>> stream = env.fromCollection(originalData);
  13.     // 使用flatMap算子
  14.     SingleOutputStreamOperator<String> source = stream.flatMap(new FlatMapFunction<List<String>, String>() {
  15.         @Override
  16.         public void flatMap(List<String> value, Collector<String> out) throws Exception {
  17.             for (String s : value) {
  18.                 out.collect(s);
  19.             }
  20.         }
  21.     });
  22.     // a b c 关羽 张飞 马超 黄忠 赵云
  23.     source.print();
  24.     env.execute();
  25. }
复制代码
3:Filter算子

Filter为筛选(过滤)算子,可以根据条件过滤数据源中数据,比方现有数据源 1,2,3,4,5 现在要过滤大于3的数据,过滤后,数据源中仅有 4 5 数据了,与Java8 stream 流式操纵中的filter功能划一
  1. public static void main(String[] args) throws Exception {
  2.     List<Integer> str1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
  3.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.     // 设置并行度为1 (1个线程执行,以便观察)
  5.     env.setParallelism(1);
  6.     env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  7.     // 加载数据源
  8.     DataStreamSource<Integer> stream = env.fromCollection(str1);
  9.     // 使用filter算子
  10.     SingleOutputStreamOperator<Integer> source = stream.filter(new FilterFunction<Integer>() {
  11.         @Override
  12.         public boolean filter(Integer value) throws Exception {
  13.             return value > 3;
  14.         }
  15.     });
  16.     // 4 5 6 7 8
  17.     source.print();
  18.     env.execute();
  19. }
复制代码
4:KeyBy算子

分组算子,根据数据源中元素某一特性进行分组,与Java8 stream 流式操纵中的groupBy功能划一
  1. public static void main(String[] args) throws Exception {
  2.         List<User> users = Arrays.asList(
  3.                 new User("张三", 12),
  4.                 new User("张三", 18),
  5.                 new User("李四", 22),
  6.                 new User("麻子", 35));
  7.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8.         // 设置并行度为1 (1个线程执行,以便观察)
  9.         env.setParallelism(1);
  10.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  11.         // 加载数据源
  12.         DataStreamSource<User> stream = env.fromCollection(users);
  13.         // 使用filter算子
  14.         KeyedStream<User, String> keyedStream = stream.keyBy(new KeySelector<User, String>() {
  15.             @Override
  16.             public String getKey(User value) throws Exception {
  17.                 return value.getName();
  18.             }
  19.         });
  20.         keyedStream.print();
  21.         env.execute();
  22.     }
复制代码
5:Union算子

union :联合算子, 利用此算子,可对多个数据源进行合并操纵(数据源数据必须类型必须相同),其可合并多个,合并后可直接对数据进行处置惩罚 (盘算或输出)
  1. public static void main(String[] args) throws Exception {
  2.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.     env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  4.     env.setParallelism(1);
  5.     DataStreamSource<String> source = env.fromElements("zs", "li", "we");
  6.     DataStreamSource<String> source2 = env.fromElements("zs2", "li2", "we2");
  7.     DataStreamSource<String> source3 = env.fromElements("zs3", "li3", "we3");
  8.     //此操作将 source 、source2 、source3 三个数据源的数据联合起来了
  9.     DataStream<String> union = source.union(source2, source3);
  10.     SingleOutputStreamOperator<String> streamOperator = union.map(new MapFunction<String, String>() {
  11.         @Override
  12.         public String map(String value) throws Exception {
  13.             return value.toUpperCase();
  14.         }
  15.     });
  16.     streamOperator.print("union").setParallelism(1);
  17.     env.execute();
  18. }
复制代码
6:Connect算子

connect与union算子一样,都可以进行数据源合并处置惩罚,但与union差别的是,connect 可以合并差别类型的数据源,但最多只能合并两个数据流,且合并后无法直接操纵(盘算 输出),需要对连接流进行数据处置惩罚(选择终极合并后的数据类型,不符合终极数据类型的转换)
  1. public static void main(String[] args) throws Exception {
  2.     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.     env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  4.     env.setParallelism(1);
  5.     DataStreamSource<String> source = env.fromElements("zs", "li", "we");
  6.     DataStreamSource<Integer> source2 = env.fromElements(1, 2, 3, 4, 5, 6, 7);
  7.     ConnectedStreams<String, Integer> connect = source.connect(source2);
  8.     // 我这里是将最终合并类型定为String.
  9.     SingleOutputStreamOperator<String> streamOperator = connect.map(new CoMapFunction<String, Integer, String>() {
  10.         @Override
  11.         public String map1(String value) {
  12.             return value + "是字符串类型,直接加后缀";
  13.         }
  14.         @Override
  15.         public String map2(Integer value) {
  16.             return "原本是Integer类型:" + value + "现在也变为String";
  17.         }
  18.     });
  19.     streamOperator.print("connect");
  20.     env.execute();
  21. }
复制代码
7:算子链式调用

  1. public static void main(String[] args) throws Exception {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  4.         env.setParallelism(1);
  5.         DataStreamSource<String> streamSource = env.socketTextStream("xx", 9999);
  6.         SingleOutputStreamOperator<Tuple2<String, Integer>> result = streamSource.flatMap(new FlatMapFunction<String, String>() {
  7.             @Override
  8.             public void flatMap(String s, Collector<String> collector) throws Exception {
  9.                 for (String s1 : s.split(",")) {
  10.                     collector.collect(s1);
  11.                 }
  12.             }
  13.         }).filter(s -> !s.equals("sb"))
  14.                 .map(new MapFunction<String, String>() {
  15.                     @Override
  16.                     public String map(String s) throws Exception {
  17.                         return s.toUpperCase();
  18.                     }
  19.                 }).map(new MapFunction<String, Tuple2<String, Integer>>() {
  20.                     @Override
  21.                     public Tuple2<String, Integer> map(String s) throws Exception {
  22.                         return Tuple2.of(s, 1);
  23.                     }
  24.                 }).keyBy(tp -> tp.f0)
  25.                 .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
  26.                     @Override
  27.                     public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
  28.                         return Tuple2.of(t1.f0, t1.f1 + stringIntegerTuple2.f1);
  29.                     }
  30.                 });
  31.         result.print();
  32.         env.execute();
  33.     }
复制代码
8:异步IO调用

  1. package com.wenge.datagroup.storage.process;
  2. import cn.hutool.http.HttpRequest;
  3. import com.alibaba.fastjson.JSON;
  4. import com.alibaba.fastjson.JSONObject;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.apache.flink.api.common.functions.RichFilterFunction;
  8. import org.apache.flink.api.common.functions.RichMapFunction;
  9. import org.apache.flink.configuration.Configuration;
  10. import org.apache.flink.streaming.api.datastream.AsyncDataStream;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  13. import org.apache.flink.streaming.api.functions.async.ResultFuture;
  14. import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
  15. import java.util.Collections;
  16. import java.util.Objects;
  17. import java.util.concurrent.CompletableFuture;
  18. import java.util.concurrent.ExecutorService;
  19. import java.util.concurrent.Executors;
  20. import java.util.concurrent.TimeUnit;
  21. import java.util.function.Supplier;
  22. @Slf4j
  23. public class AnalyerAsyncIOProcess   {
  24.     public static DataStream<JSONObject> process(DataStream<String> dataStream) {
  25.         //线程数
  26.         int asyncNum = 10;
  27.         //过滤不需要的数据
  28.         SingleOutputStreamOperator<JSONObject> filterDataStream = dataStream.filter(new RichFilterFunction<String>() {
  29.             @Override
  30.             public boolean filter(String str) {
  31.                 //TODO:根据业务逻辑进行判断
  32.                 JSONObject data = null;
  33.                 try {
  34.                     data = JSON.parseObject(str);
  35.                     String id = data.getString("id");
  36.                     if (Objects.isNull(id)) {
  37.                         return false;
  38.                     }else {
  39.                         return true;
  40.                     }
  41.                 } catch (Exception e) {
  42.                     return false;
  43.                 }
  44.             }
  45.         }).name("aysncIOFilter").setParallelism(1).map(new RichMapFunction<String, JSONObject>() {
  46.             @Override
  47.             public JSONObject map(String str) throws Exception {
  48.                 JSONObject data = null;
  49.                 try {
  50.                     data = JSON.parseObject(str);
  51.                     return data;
  52.                 } catch (Exception e) {
  53.                     log.error("数据处理异常:{}", str);
  54.                     return null;
  55.                 }
  56.             }
  57.         }).name("aysncIOMAP").setParallelism(2);
  58.         // 异步IO
  59.         RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {
  60.             private transient ExecutorService executorService;
  61.             @Override
  62.             public void open(Configuration parameters) {
  63.                 // 加载配置文件,每一个并行度只执行一次
  64.                 log.error("加载配置文件base");
  65.                 this.executorService = Executors.newFixedThreadPool(asyncNum);
  66.             }
  67.             @Override
  68.             public void close() throws Exception {
  69.                 // 关闭线程池
  70.                 if (executorService != null) {
  71.                     executorService.shutdown();
  72.                 }
  73.                 log.error("----------------------------线程池关闭----------------------");
  74.             }
  75.             @Override
  76.             public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
  77.                 log.error("------------------------数据超时----------------------:{}", input);
  78.                 JSONObject data = input;
  79.                 //对超时数据进行处理
  80.                 resultFuture.complete(Collections.singleton(data));
  81.             }
  82.             @Override
  83.             public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {
  84.                 CompletableFuture.supplyAsync(new Supplier<JSONObject>() {
  85.                     @Override
  86.                     public JSONObject get() {
  87.                         //识别语种
  88.                         String postResult = new String();
  89.                         String id = json.getString("id");
  90.                         long start = System.currentTimeMillis();
  91.                         try {
  92.                             //TODO: 根据业务逻辑进行处理
  93.                             log.error("异步处理数据base:{}", json.getString("id"));
  94.                             postResult = HttpRequest.post("http://127.0.0.1:8080").body(new JSONObject().toJSONString()).execute().body();
  95.                             if (StringUtils.isNotBlank(postResult)) {
  96.                                 json.put("postResult", postResult);
  97.                             }
  98.                             long end = System.currentTimeMillis();
  99.                             log.error("id:{},请求接口:{},耗时:{} ms", id, postResult, (end - start));
  100.                             return json;
  101.                         } catch (Exception e) {
  102.                             log.error("----------语种识别异步IO处理异常:{},数据:{}", id, e);
  103.                             return json;
  104.                         }
  105.                     }
  106.                 }, executorService).thenAccept((JSONObject dbResult) -> {
  107.                     resultFuture.complete(Collections.singleton(dbResult));
  108.                 });
  109.             }
  110.         };
  111.         DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
  112.                 filterDataStream,
  113.                 richAsyncFunction,
  114.                 5000,
  115.                 TimeUnit.MILLISECONDS,
  116.                 asyncNum).name("IO").setParallelism(2);
  117.         return downloadStream;
  118.     }
  119. }
复制代码
9:异步IO调用 sql

  1. package com.wenge.datagroup.storage.process;
  2. import com.alibaba.druid.pool.DruidDataSource;
  3. import com.alibaba.fastjson.JSONObject;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.apache.flink.configuration.Configuration;
  7. import org.apache.flink.streaming.api.datastream.AsyncDataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.functions.async.ResultFuture;
  10. import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
  11. import javax.sql.DataSource;
  12. import java.sql.Connection;
  13. import java.sql.PreparedStatement;
  14. import java.sql.ResultSet;
  15. import java.sql.SQLException;
  16. import java.util.Collections;
  17. import java.util.concurrent.CompletableFuture;
  18. import java.util.concurrent.ExecutorService;
  19. import java.util.concurrent.Executors;
  20. import java.util.concurrent.TimeUnit;
  21. import java.util.function.Supplier;
  22. @Slf4j
  23. public class AnalyerAsyncIOSQL1 {
  24.     public static DataStream<JSONObject> process(DataStream<JSONObject> dataStream) {
  25.         //sql 连接数
  26.         int dataAsyncNum =10;
  27.         //并行度
  28.         int asyncNum =10;
  29.         String databaseUrl  = "jdbc:mysql://127.0.0.1:3306/test_data?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";
  30.         String databaseUsername  = "root";
  31.         String databasePassword  = "123456";
  32.         // 异步IO
  33.         RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {
  34.             private transient ExecutorService executorService;
  35.             private DruidDataSource dataSource;
  36.             @Override
  37.             public void open(Configuration parameters) {
  38.                 // 重新加载配置文件
  39.                 log.error("重新加载配置文件-SQL");
  40.                 this.executorService = Executors.newFixedThreadPool(dataAsyncNum);
  41.                 dataSource = new DruidDataSource();
  42.                 dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
  43.                 dataSource.setUrl(databaseUrl);
  44.                 dataSource.setUsername(databaseUsername);
  45.                 dataSource.setPassword(databasePassword);
  46.                 dataSource.setMaxActive(dataAsyncNum);
  47.             }
  48.             @Override
  49.             public void close() throws Exception {
  50.                 // 关闭线程池
  51.                 executorService.shutdown();
  52.                 dataSource.close();
  53.                 log.error("----------------------------线程池关闭----------------------");
  54.             }
  55.             @Override
  56.             public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
  57.                 log.error("------------------------数据超时----------------------:{}", input);
  58.                 JSONObject data = input;
  59.                 //对超时数据进行处理
  60.                 resultFuture.complete(Collections.singleton(data));
  61.             }
  62.             @Override
  63.             public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {
  64.                 CompletableFuture.supplyAsync(new Supplier<JSONObject>() {
  65.                     @Override
  66.                     public JSONObject get() {
  67.                         String id = json.getString("id");
  68.                         long start =System.currentTimeMillis();
  69.                         try {
  70.                             //TODO: 根据业务逻辑进行处理
  71.                             String country_id = new AnalyerAsyncIOSQL1().queryFromMySql("name", dataSource);
  72.                             log.error("----------SQL,id:{},数据:{}", id, country_id);
  73.                             if(StringUtils.isNotBlank(country_id)){
  74.                                 json.put("country_id",country_id);
  75.                             }
  76.                             long end =System.currentTimeMillis();
  77.                             log.error("id:{},sql,耗时:{} ms", id,(end-start));
  78.                             return json;
  79.                         } catch (Exception e) {
  80.                             log.error("----------SQL异步IO处理异常:{},数据:{}", id, e);
  81.                             return json;
  82.                         }
  83.                     }
  84.                 }, executorService).thenAccept((JSONObject dbResult) -> {
  85.                     resultFuture.complete(Collections.singleton(dbResult));
  86.                 });
  87.             }
  88.         };
  89.         DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
  90.                 dataStream,
  91.                 richAsyncFunction,
  92.                 5000,
  93.                 TimeUnit.MILLISECONDS,
  94.                 dataAsyncNum).name("IO_SQL").setParallelism(asyncNum);
  95.         return downloadStream;
  96.     }
  97.     /**
  98.      * SQL 查询代码实现
  99.      */
  100.     public String queryFromMySql(String name,DataSource dataSource) throws SQLException {
  101.         String sql = "select id,name,country_id ,status from nation_info where status =1 and name = ?";
  102.         String result = null;
  103.         Connection connection = null;
  104.         PreparedStatement stmt = null;
  105.         ResultSet rs = null;
  106.         try {
  107.             connection = dataSource.getConnection();
  108.             stmt = connection.prepareStatement(sql);
  109.             stmt.setString(1, name);
  110.             rs = stmt.executeQuery();
  111.             while (rs.next()) {
  112.                 result = rs.getString("country_id");
  113.             }
  114.         } finally {
  115.             if (rs != null) {
  116.                 rs.close();
  117.             }
  118.             if (stmt != null) {
  119.                 stmt.close();
  120.             }
  121.             if (connection != null) {
  122.                 connection.close();
  123.             }
  124.         }
  125.         return result;
  126.     }
  127. }
复制代码
三、Data Sinks

Data sinks 利用 DataStream 并将它们转发到文件、套接字、外部体系或打印它们。Flink 自带了多种内置的输特别式,这些格式相干的实现封装在 DataStreams 的算子里:


  • writeAsText() / TextOutputFormat - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(…) / CsvOutputFormat - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在尺度输出/尺度错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分差别的 print 调用。假如并行度大于1,输出结果将附带输出任务标识符的前缀。
  • writeUsingOutputFormat() / FileOutputFormat - 自界说文件输出的方法和基类。支持自界说 object 到 byte 的转换。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
    addSink - 调用自界说 sink function。Flink 捆绑了连接到其他体系(比方 Apache Kafka)的连接器,这些连接器被实现为 sink functions。
注意,DataStream 的 write*() 方法重要用于调试目的。它们不到场 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。革新到目的体系的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立刻显示在目的体系中。别的,在失败的环境下,这些记录可能会丢失。
为了将流可靠地、精准一次地传输到文件体系中,请利用 FileSink。别的,通过 .addSink(…) 方法调用的自界说实现也可以到场 Flink 的 checkpointing,以实现精准一次的语义。
kafka

  1. import org.apache.kafka.common.serialization.ByteArrayDeserializer;
  2. import java.util.Properties;
  3. import org.apache.flink.api.common.RuntimeExecutionMode;
  4. import org.apache.flink.api.common.functions.FilterFunction;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  9. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  10. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
  11. /**
  12. * @author alanchan
  13. *
  14. */
  15. public class SinkKafka {
  16.         public static void main(String[] args) throws Exception {
  17.                 // env
  18.                 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.                 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  20.                 // source
  21.                 // 准备kafka连接参数
  22.                 Properties props = new Properties();
  23.                 // 集群地址
  24.                 props.setProperty("bootstrap.servers", "server1:9092");
  25.                 // 消费者组id
  26.                 props.setProperty("group.id", "flink");
  27.                 // latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
  28.                 // earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
  29.                 props.setProperty("auto.offset.reset", "latest");
  30.                 // 会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
  31.                 props.setProperty("flink.partition-discovery.interval-millis", "5000");
  32.                 // 自动提交
  33.                 props.setProperty("enable.auto.commit", "true");
  34.                 // 自动提交的时间间隔
  35.                 props.setProperty("auto.commit.interval.ms", "2000");
  36.                 // 使用连接参数创建FlinkKafkaConsumer/kafkaSource
  37.                 FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(
  38.                 "t_kafkasource",
  39.                 new SimpleStringSchema(),
  40.                 props);
  41.                
  42.                 // 使用kafkaSource
  43.                 DataStream<String> kafkaDS = env.addSource(kafkaSource);
  44.                 // transformation算子,业务计算
  45.                 //以alan作为结尾
  46.                 SingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction<String>() {
  47.                         @Override
  48.                         public boolean filter(String value) throws Exception {
  49.                                 return value.contains("alan");
  50.                         }
  51.                 });
  52.                 // sink
  53.                 etlDS.print();
  54.                 Properties props2 = new Properties();
  55.                 props2.setProperty("bootstrap.servers", "server1:9092");
  56.                 props2.setProperty(ConsumerConfig.GROUP_ID_CONFIG, grouId);
  57.         props2.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  58.         props2.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  59.         props2.setProperty("enable.idempotence", "false");
  60.         
  61.                 FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
  62.                 "send_topic",
  63.                 new SimpleStringSchema(),
  64.                 props2);
  65.                
  66.                 etlDS.addSink(kafkaSink);
  67.                 // execute
  68.                 env.execute();
  69.         }
  70. }
复制代码
flie

  1. import org.apache.flink.api.common.RuntimeExecutionMode;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. /**
  5. * @author alanchan
  6. */
  7. public class SinkDemo {
  8.         public static void main(String[] args) throws Exception {
  9.                 // env
  10.                 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11.                 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  12.                 // source
  13.                 DataStream<String> ds = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
  14.                 System.setProperty("HADOOP_USER_NAME", "alanchan");
  15.                 // transformation
  16.                 // sink
  17. //                ds.print();
  18. //                ds.print("输出标识");
  19. //                ds.printToErr();// 会在控制台上以红色输出
  20. //                ds.printToErr("输出标识");// 会在控制台上以红色输出
  21.                 // 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件
  22. //                ds.writeAsText("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/output/result1").setParallelism(1);
  23.                 ds.writeAsText("hdfs://server2:8020///flinktest/words").setParallelism(2);
  24.                 // execute
  25.                 env.execute();
  26.         }
  27. }
复制代码
mysql

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.PreparedStatement;
  4. import org.apache.flink.api.common.RuntimeExecutionMode;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  9. import org.source_transformation_sink.bean.User;
  10. /**
  11. * @author alanchan
  12. *
  13. */
  14. public class SinkToMySQL {
  15.         public static void main(String[] args) throws Exception {
  16.                 // 0.env
  17.                 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18.                 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  19.                 // 1.source
  20.                 DataStream<User> studentDS = env.fromElements(new User(1, "alanchan", "sink mysql", "alan.chan.chn@163.com", 19, 800));
  21.                 // 2.transformation
  22.                
  23.                 // 3.sink
  24.                 studentDS.addSink(new MySQLSink());
  25.                 // 4.execute
  26.                 env.execute();
  27.         }
  28.         private static class MySQLSink extends RichSinkFunction<User> {
  29.                 private Connection conn = null;
  30.                 private PreparedStatement ps = null;
  31.                 @Override
  32.                 public void open(Configuration parameters) throws Exception {
  33.                         conn = DriverManager.getConnection(
  34.                                         "jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "123456");
  35. //                        private int id;
  36. //                        private String name;
  37. //                        private String pwd;
  38. //                        private String email;
  39. //                        private int age;
  40. //                        private double balance;
  41.                         String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
  42.                         ps = conn.prepareStatement(sql);
  43.                 }
  44.                 @Override
  45.                 public void invoke(User value, Context context) throws Exception {
  46.                         // 设置?占位符参数值
  47.                         ps.setString(1, value.getName());
  48.                         ps.setString(2, value.getPwd());
  49.                         ps.setString(3, value.getEmail());
  50.                         ps.setInt(4, value.getAge());
  51.                         ps.setDouble(5, value.getBalance());
  52.                         // 执行sql
  53.                         ps.executeUpdate();
  54.                 }
  55.                 @Override
  56.                 public void close() throws Exception {
  57.                         if (conn != null)
  58.                                 conn.close();
  59.                         if (ps != null)
  60.                                 ps.close();
  61.                 }
  62.         }
  63. }
  64. import lombok.AllArgsConstructor;
  65. import lombok.Data;
  66. import lombok.NoArgsConstructor;
  67. /**
  68. * @author alanchan
  69. *
  70. */
  71. @Data
  72. @AllArgsConstructor
  73. @NoArgsConstructor
  74. public class User {
  75.         private int id;
  76.         private String name;
  77.         private String pwd;
  78.         private String email;
  79.         private int age;
  80.         private double balance;
  81. }
复制代码
ES

  1. package com.wenge.datagroup.storage;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.wenge.datagroup.storage.utils.*;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.apache.flink.api.common.RuntimeExecutionMode;
  6. import org.apache.flink.api.common.functions.RuntimeContext;
  7. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
  11. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  12. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  13. import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
  14. import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
  15. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  16. import org.apache.http.HttpHost;
  17. import org.apache.kafka.clients.consumer.ConsumerConfig;
  18. import org.elasticsearch.action.update.UpdateRequest;
  19. import java.net.MalformedURLException;
  20. import java.net.URL;
  21. import java.util.ArrayList;
  22. import java.util.List;
  23. import java.util.Properties;
  24. /**
  25. * @author wangkanglu
  26. * @version 1.0
  27. * @description
  28. * @date 2024-07-24 16:58
  29. */
  30. public class TestFlink {
  31.     static String  topic = "test_topic";
  32.     static Integer bulkSize = 100;
  33.     static Integer sinkParallelism = 2;
  34.     public static void main(String[] args) {
  35.         // env
  36.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  37.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  38.         // source
  39.         // 准备kafka连接参数
  40.         Properties props = getParameters();
  41.         // 使用连接参数创建FlinkKafkaConsumer/kafkaSource
  42.         FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(
  43.                 topic,
  44.                 new SimpleStringSchema(),
  45.                 props);
  46.         // 使用kafkaSource
  47.         DataStream<String> kafkaDS = env.addSource(kafkaSource);
  48.         //更新进ES
  49.         List<HttpHost> esAddresses = null;
  50.         try {
  51.             esAddresses = getEsAddresses("192.168.1.1:9200,192.168.1.2:9200");
  52.         } catch (MalformedURLException e) {
  53.             log.error("解析ES地址报错", e);
  54.             e.printStackTrace();
  55.         }
  56.         ElasticsearchSinkFunction<JSONObject> elasticsearchSinkUpdateFunction = new ElasticsearchSinkFunction<JSONObject>() {
  57.             @Override
  58.             public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
  59.                 //TODO:这里按照发布时间存到不同的索引,需要根据项目自行修改
  60.                 String index = "writeIndex";
  61.                 String uuid = jsonObject.getString("uuid");
  62.                 jsonObject.remove("_index");
  63.                 jsonObject.put("topic_flag",topic);
  64.                 jsonObject.put("es_insert_time", DateUtils.getCurrentDateTime());
  65.                 if(StringUtils.isEmpty(index)){
  66.                     log.error("UUID:{} index为空,不再更新ES", uuid);
  67.                     return;
  68.                 }else {
  69.                     log.error("索引:{}中更新数据:UUID:{} ",index, uuid);
  70.                 }
  71.                 UpdateRequest updateRequest = new UpdateRequest(index, uuid)
  72.                         .docAsUpsert(true)
  73.                         .doc(jsonObject)
  74.                         //版本冲突中的重试次数
  75.                         .retryOnConflict(5);
  76.                 requestIndexer.add(updateRequest);
  77.             }
  78.         };
  79.         ESSinkUtil.addUpdateSink(esAddresses, bulkSize, sinkParallelism, kafkaDS, elasticsearchSinkUpdateFunction, "saveEs");
  80.     }
  81.     /**
  82.      * 解析配置文件的 es hosts
  83.      *
  84.      * @param hosts hosts字符串
  85.      * @throws MalformedURLException 地址异常
  86.      */
  87.     public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {
  88.         String[] hostList = hosts.split(",");
  89.         List<HttpHost> addresses = new ArrayList<>();
  90.         for (String host : hostList) {
  91.             if (host.startsWith("http")) {
  92.                 URL url = new URL(host);
  93.                 addresses.add(new HttpHost(url.getHost(), url.getPort()));
  94.             } else {
  95.                 String[] parts = host.split(":", 2);
  96.                 if (parts.length > 1) {
  97.                     addresses.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
  98.                 } else {
  99.                     throw new MalformedURLException("invalid elasticsearch hosts format");
  100.                 }
  101.             }
  102.         }
  103.         return addresses;
  104.     }
  105.     /**
  106.      * es sink
  107.      *
  108.      * @param hosts               es hosts
  109.      * @param bulkFlushMaxActions bulk flush size
  110.      * @param parallelism         并行数
  111.      * @param dataStream          数据
  112.      * @param func                es写入方法
  113.      * @param <T>                 泛型
  114.      * @param sinkName            算子名称
  115.      */
  116.     public static <T> void addUpdateSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
  117.                                          DataStream<T> dataStream, ElasticsearchSinkFunction<T> func,
  118.                                          String sinkName) {
  119.         ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
  120.         esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
  121.         new RetryRequestFailureHandler();
  122.         RestClientFactory restClientFactory = ESConstant.restClientFactory;
  123.         //设置用户名密码
  124.         esSinkBuilder.setRestClientFactory(restClientFactory);
  125.         esSinkBuilder.setFailureHandler(new UpdateRetryRequestFailureHandler());
  126.         //Bulk刷新间隔
  127.         esSinkBuilder.setBulkFlushInterval(1000);
  128.         //重试次数
  129.         esSinkBuilder.setBulkFlushBackoffRetries(10);
  130.         //重试间隔
  131.         esSinkBuilder.setBulkFlushBackoffDelay(5000);
  132.         //重试类型
  133.         esSinkBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
  134.         dataStream.addSink(esSinkBuilder.build()).name(sinkName).setParallelism(parallelism);
  135.     }
  136.     private static Properties getParameters(){
  137.         Properties properties = new Properties();
  138.         // 集群地址
  139.         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092");
  140.         // 消费者组id
  141.         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "wkl_test");
  142.         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  143.         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  144.         properties.setProperty("partition.discovery.interval.ms", "10000");//消费者定期发现动态创建的Kafka主题和分区的时间间隔
  145.         // latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
  146.         // earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
  147.         properties.setProperty("auto.offset.reset", "latest");
  148. //        properties.setProperty("enable.auto.commit", "true");
  149. //        properties.setProperty("auto.commit.interval.ms", "1000");//自动提交的时间间隔
  150.         properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交的时间间隔
  151.         properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "20000");//自动提交的时间间隔
  152.         //每次从kafka中获取的数量
  153.         properties.setProperty("max.poll.records", "2");
  154.         return properties;
  155.     }
  156.    
  157. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表