梦见你的名字 发表于 2024-9-5 00:05:15

Flink -2-Flink 算子和java代码简单利用

Flink和Spark类似,也是一种一站式处置惩罚的框架;既可以进行批处置惩罚(DataSet),也可以进行实时处置惩罚(DataStream)。
以是下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。
DataSet 批处置惩罚算子

一、Source算子

1. fromCollection

fromCollection:从本地集合读取数据
例:
        public static void main(String[] args) {      
      StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStreamSource<String> stringDataStreamSource = streamEnv.fromCollection(Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六"));
    }
2. readTextFile

readTextFile:从文件中读取
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = streamEnv.readTextFile("/data/a.txt");
3. readTextFile:读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别而且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
压缩方法文件扩展名是否可并行读取DEFLATE.deflatenoGZip.gz .gzipnoBzip2.bz2noXZ.xzno StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stringDataStreamSource = streamEnv.readTextFile("/data/file.gz");
二、Transform转换算子

因为Transform算子基于Source算子操纵,以是起首构建Flink执行环境及Source算子,后续Transform算子操纵基于此:
        public static void main(String[] args) {      
      StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStreamSource<String> stringDataStreamSource = streamEnv.fromCollection(Arrays.asList("1,张三", "2,李四", "3,王五", "4,赵六"));
    }
1: map

将DataSet中的每一个元素转换为另外一个元素
        import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class MapOperatorExample {

    public static void main(String[] args) throws Exception {
      // 获取 ExecutionEnvironment,用于创建数据集
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 假设已经有一个包含整数的数据集
      DataSet<Integer> inputDataSet = env.fromElements(1, 2, 3, 4, 5);

      // 使用 Map 算子对数据集进行转换,将每个整数加上 10,并输出结果
      DataSet<Integer> outputDataSet = inputDataSet.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                // 对每个整数都加上 10,并将结果作为新的数据集
                return value + 10;
            }
      });

      // 输出转换后的数据集
      outputDataSet.print();
    }
}
2:flatMap



[*]将DataSet中的每一个元素转换为0…n个元素。
[*]FlatMap 算子是 Flink 中的一种数据转换算子,它将输入的每个元素通过用户自界说的函数进行处置惩罚,并生成零个、一个或多个新的元素。FlatMap 算子的底层逻辑是对数据集中的每个元素应用用户界说的函数,并将函数返回的多个元素平铺成新的数据集
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlatMapOperatorExample {

    public static void main(String[] args) throws Exception {
      // 获取 ExecutionEnvironment,用于创建数据集
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 假设已经有一个包含文本行的数据集
      DataSet<String> inputDataSet = env.fromElements(
            "Flink is a powerful framework for stream and batch processing",
            "It provides support for event time processing"
      );

      // 使用 FlatMap 算子对数据集进行拆分并生成单词列表
      DataSet<String> wordDataSet = inputDataSet.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                // 按空格拆分文本行,并将拆分后的单词逐个添加到输出集合
                String[] words = value.split(" ");
                for (String word : words) {
                  out.collect(word);
                }
            }
      });

      // 输出单词列表
      wordDataSet.print();
    }
}
3:Filter 算子



[*]Filter 算子是 Flink 中的一种数据转换算子,它通过用户自界说的条件函数对数据集中的每个元素进行过滤,只保存满足条件的元素。
[*]Filter 算子的底层逻辑是对数据集中的每个元素应用用户界说的条件函数,只保存函数返回值为 true 的元素,过滤掉返回值为 false 的元素。
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class FilterOperatorExample {

    public static void main(String[] args) throws Exception {
      // 获取 ExecutionEnvironment,用于创建数据集
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 假设已经有一个包含整数的数据集
      DataSet<Integer> inputDataSet = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

      // 使用 Filter 算子对数据集进行过滤,只保留偶数
      DataSet<Integer> evenDataSet = inputDataSet.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                // 判断是否为偶数,保留返回 true 的元素
                return value % 2 == 0;
            }
      });

      // 输出只包含偶数的数据集
      evenDataSet.print();
    }
}
4:Reduce 算子



[*]可以对一个dataset大概一个group来进行聚合盘算,终极聚合成一个元素
[*]Reduce 算子是 Flink 中的一个基本聚合算子,用于对数据集中的元素进行二元聚合操纵。
[*]Reduce 算子会将数据集中的元素两两配对,并利用用户提供的二元操纵函数对配对的元素进行聚合,然后将聚合结果继承与下一个元素配对,直至处置惩罚完所有元素。终极,Reduce 算子会返回一个单一的结果值。
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class ReduceOperatorExample {

    public static void main(String[] args) throws Exception {
      // 获取 ExecutionEnvironment,用于创建数据集
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 假设已经有一个包含整数的数据集
      DataSet<Integer> inputDataSet = env.fromElements(1, 2, 3, 4, 5);

      // 使用 Reduce 算子计算数据集中所有元素的总和
      DataSet<Integer> resultDataSet = inputDataSet.reduce(new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2; // 将两个元素相加得到新的结果
            }
      });

      // 输出计算结果
      resultDataSet.print();
    }
}
5:Aggregations

KeyedStream → DataStream


[*]Aggregations 算子是 Flink 中用于对数据集进行聚合操纵的一组函数。它可以用于盘算数据集中的最小值、最大值、求和、均匀值等统计信息。Flink 提供了一系列内置的聚合函数,如 min、max、sum、avg 等。
//算平均值
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;

public class AggregationsOperatorExample {

    public static void main(String[] args) throws Exception {
      // 获取 ExecutionEnvironment,用于创建数据集
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 假设已经有一个包含学生姓名和成绩的数据集
      DataSet<Tuple2<String, Double>> inputDataSet = env.fromElements(
                new Tuple2<>("Alice", 85.0),
                new Tuple2<>("Bob", 78.5),
                new Tuple2<>("Cathy", 92.5),
                new Tuple2<>("David", 65.0),
                new Tuple2<>("Eva", 88.5)
      );

      // 使用 Aggregations 算子计算成绩的平均值
      double avgScore = inputDataSet.aggregate(Aggregations.SUM, 1).div(inputDataSet.count());

      // 输出计算结果
      System.out.println("平均成绩:" + avgScore);
    }
}
重要的聚合方法:


[*]keyedStream.sum(0);
[*]keyedStream.sum(“key”);
[*]keyedStream.min(0);
[*]keyedStream.min(“key”);
[*]keyedStream.max(0);
[*]keyedStream.max(“key”);
[*]keyedStream.minBy(0);
[*]keyedStream.minBy(“key”);
[*]keyedStream.maxBy(0);
[*]keyedStream.maxBy(“key”);
6:Distinct 算子



[*] Distinct 算子是 Flink 中的一个转换算子,它用于去除输入流中重复的元素,并将去重后的结果作为输出流。Distinct 算子是在整个数据流上进行去重操纵,不需要进行分组。
[*] 在底层,Distinct 算子通过维护一个状态来记录已经出现过的元素,当新的元素到达时,会与状态中的元素进行比较,假如状态中不存在该元素,则将其输出,并将其添加到状态中,以便后续去重。
Distinct 算子在很多场景下都很有用,比方:


[*] 数据去重:在流式盘算中,常常会有重复的数据到达,而我们只关心每个数据的第一次出现,可以利用 Distinct 算子往复重。
[*] 实时数据摘要:有时候需要根据某个字段的特性选择每个分组的代表元素,比方选择每个用户的初次登录信息作为数据摘要。
[*] 数据洗濯:在处置惩罚实时数据流时,可能会有一些无效或异常的数据需要洗濯,Distinct 算子可以帮助去除重复的无效数据。
package com.wenge.datagroup.storage;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DistinctOperator;

/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/
public class TestFlink {

    public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 模拟输入流,包含重复的元素
      DataSet<Integer> inputStream = env.fromElements(1, 2, 3, 2, 4, 5, 1, 6, 7, 5);

      // 对输入流进行去重操作
      DistinctOperator<Integer> distinct = inputStream.distinct();

      distinct.print();

      env.execute("Distinct Example");
    }


}

7:First 算子

First 算子是 Flink 中的一个转换算子,它用于从输入流中选择每个 Key 的第一个元素,并将其作为输出流中的结果。在流式盘算中,常常需要根据某个特定的字段进行分组,并选择每个分组中的第一个元素,这时可以利用 First 算子来实现这个功能。First 算子是 KeyedStream 上的操纵,以是在利用之前,需要先将数据流进行分组。
First 算子在许多场景下都很有用,比方:


[*]数据去重:假如数据流中可能包含重复的元素,而我们只关心每个元素的第一次出现,可以利用 First 算子往复重。
[*]实时数据摘要:在流式盘算中,有时需要根据某个字段的特性选择每个分组的代表元素,比方选择每个用户的初次登录信息作为数据摘要。
[*]时间窗口操纵:在流式盘算中,常常需要对窗口内的数据进行处置惩罚,而 First 算子可以用于选择每个窗口的起始元素。
package com.wenge.datagroup.storage;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.GroupReduceOperator;

/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/
public class TestFlink {

    public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 模拟输入流,包含重复的元素
      DataSet<Integer> inputStream = env.fromElements(1, 2, 3, 2, 4, 5, 1, 6, 7, 5);

      // 对输入流进行去重操作
      GroupReduceOperator<Integer, Integer> first = inputStream.first(2);
      //取前两个数
      first.print();

      env.execute("Distinct Example");
    }


}

8:Join 算子

Join 算子是 Flink 中用于将两个数据集进行连接操纵的一种算子。它通过指定连接的键(Key)将两个数据集中的元素按照某种条件进行关联,从而生成一个包含连接结果的新数据集。
应用场景


[*]Join 算子适用于需要将两个数据集进行关联的场景。常见的应用场景包罗关联用户信息和订单信息、关联商品信息和贩卖信息等。
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;

public class JoinOperatorExample {

    public static void main(String[] args) throws Exception {
      // 获取 ExecutionEnvironment,用于创建数据集
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 假设已经有包含订单信息的数据集
      DataSet<Tuple3<String, String, Integer>> orders = env.fromElements(
                new Tuple3<>("Order001", "Product001", 2),
                new Tuple3<>("Order002", "Product002", 1),
                new Tuple3<>("Order003", "Product001", 3),
                new Tuple3<>("Order004", "Product003", 5)
      );

      // 假设已经有包含商品信息的数据集
      DataSet<Tuple2<String, String>> products = env.fromElements(
                new Tuple2<>("Product001", "Apple"),
                new Tuple2<>("Product002", "Banana"),
                new Tuple2<>("Product003", "Orange")
      );

      // 使用 Join 算子将订单信息和商品信息按照商品编号进行连接
      DataSet<Tuple3<String, String, Integer>> result = orders.join(products)
                .where(new OrderProductJoinKeySelector())
                .equalTo(0)
                .projectFirst(0, 1)
                .projectSecond(1);

      // 输出连接结果
      result.print();
    }

    // 自定义 KeySelector,用于指定连接的键(商品编号)
    public static class OrderProductJoinKeySelector implements KeySelector<Tuple3<String, String, Integer>, String> {
      @Override
      public String getKey(Tuple3<String, String, Integer> value) {
            return value.f1; // 商品编号是连接的键
      }
    }
}
   在上面的代码中,我们起首导入了 Flink 的相干类,然后创建了一个 ExecutionEnvironment 对象 env,用于创建数据集。接着,利用 env.fromElements() 方法创建了包含订单信息的数据集 orders 和包含商品信息的数据集 products。
然后,我们利用 join 方法对 orders 和 products 数据集进行连接。在 join 方法中,我们需要通过自界说 KeySelector 对象 OrderProductJoinKeySelector 指定连接的键(商品编号)。接着,我们通过 equalTo(0) 方法指定连接条件,体现连接键在 orders 数据集中的位置是 0,在 products 数据集中的位置也是 0。
末了,我们利用 projectFirst(0, 1) 方法和 projectSecond(1) 方法分别指定连接后要输出的字段,从而生成包含连接结果的新数据集 result。
终极,我们输出连接结果,即关联后的订单信息和商品名称。在本例中,输出结果为:
(Order001, Product001, 2) Apple
(Order002, Product002, 1) Banana
(Order003, Product001, 3) Apple
(Order004, Product003, 5) Orange
即订单信息和商品信息已按照商品编号进行连接,并输出了关联后的订单信息和商品名称。
9:Outer Join 算子

Outer Join 是 Flink 中用于进行外连接操纵的算子。外连接是关系型数据库中的概念,在 Flink 中,它允许将两个数据流中的元素按照指定的键进行连接,并返回所有的元素,包罗那些在其中一个数据流中存在而在另一个数据流中不存在的元素。外连接操纵可以分为左外连接、右外连接和全外连接。
在底层,Outer Join 算子会维护一个状态来记录两个数据流中的匹配关系,并根据指定的键进行匹配。对于左外连接和右外连接,当某个数据流中的元素找不到匹配项时,会生成一个空值大概指定的默认值。对于全外连接,无论两个数据流中是否存在匹配项,都会输出所有的元素。
应用场景


[*]外连接是一种常用的数据合并和关联操纵,适用于以下场景:
[*]合并数据:将两个数据流中的数据按照指定的键进行合并,可以用于数据的联合分析和展示。
[*]增补缺失信息:在关联操纵中,可能会有一些数据流中的元素在另一个数据流中找不到匹配项,利用外连接可以添补缺失信息。
[*]数据洗濯:有时候需要对两个数据流进行关联,去除不匹配的数据大概添加默认值。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class OuterJoinExample {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 模拟两个数据流
      DataStream<Tuple2<String, Integer>> stream1 = env.fromElements(
                new Tuple2<>("A", 1),
                new Tuple2<>("B", 2),
                new Tuple2<>("C", 3)
      );

      DataStream<Tuple2<String, String>> stream2 = env.fromElements(
                new Tuple2<>("A", "X"),
                new Tuple2<>("B", "Y"),
                new Tuple2<>("D", "Z")
      );

      // 对两个数据流进行外连接操作,连接键为第一个元素
      DataStream<Tuple3<String, Integer, String>> result = stream1
                .leftOuterJoin(stream2)
                .where(tuple -> tuple.f0) // 第一个数据流的连接键
                .equalTo(tuple -> tuple.f0) // 第二个数据流的连接键
                .with((tuple1, tuple2) -> { // 匹配成功的处理逻辑
                  if (tuple2 == null) { // 若tuple2为空,表示匹配失败,使用默认值"UNKNOWN"
                        return new Tuple3<>(tuple1.f0, tuple1.f1, "UNKNOWN");
                  } else {
                        return new Tuple3<>(tuple1.f0, tuple1.f1, tuple2.f1);
                  }
                });

      result.print();

      env.execute("Outer Join Example");
    }
}

   在上面的示例中,我们利用 env.fromElements 方法创建了两个模仿数据流 stream1 和 stream2,分别包含差别的元素。然后,我们调用 leftOuterJoin 方法对这两个数据流进行外连接操纵,连接键为第一个元素。在 with 方法中,我们界说了匹配成功的处置惩罚逻辑:当第二个数据流中找不到匹配项时,利用默认值"UNKNOWN"添补;否则,输出匹配成功的元素。在输出结果中,我们可以看到两个数据流中的元素都被连接在一起,而且在匹配失败的环境下添补了默认值"UNKNOWN"。
10:Cross 算子



[*]Cross 是 Flink 中的一个算子,用于将两个数据流中的所有元素进行两两组合,产生所有可能的组合结果。在底层,Cross 算子会维护两个数据流的状态,并对其中的每个元素进行遍历,将两个数据流的所有元素进行两两组合,并输出所有可能的组合结果。
[*]交叉操纵,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集
[*]和join类似,但是这种交叉操纵会产生笛卡尔积,在数据比较大的时候,是非常斲丧内存的操纵
应用场景
Cross 算子在现实应用中相对较少,因为它会产生较大的输出结果。但是在某些特定场景下,它仍然有一些用途,比方:


[*] 分列组合:在某些场景下,需要对两个数据流中的元素进行分列组合,生成所有可能的组合结果。
[*] 笛卡尔积:对于两个数据流之间的笛卡尔积操纵,可以利用 Cross 算子进行实现。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CrossExample {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 模拟两个数据流
      DataStream<Integer> stream1 = env.fromElements(1, 2, 3);
      DataStream<String> stream2 = env.fromElements("A", "B");

      // 对两个数据流进行笛卡尔积操作
      DataStream<String> result = stream1.cross(stream2)
                .with((num, str) -> num + "-" + str);

      result.print();

      env.execute("Cross Example");
    }
}
   在上面的示例中,我们利用 env.fromElements 方法创建了两个模仿数据流 stream1 和 stream2,其中 stream1 包含整数 1、2 和 3,stream2 包含字符串 “A” 和 “B”。然后,我们调用 cross 方法对这两个数据流进行笛卡尔积操纵,并在 with 方法中界说了组合结果的逻辑:将整数和字符串进行组合,用"-"分隔。在输出结果中,我们可以看到所有可能的组合结果,即整数和字符串之间的所有组合。请注意,Cross 算子会产生较大的输出结果,因此在现实应用中需要谨慎利用。
11:Union 算子

Union 算子是 Flink 中用于将多个数据集合并成一个新数据集的算子。它将多个数据集的元素合并在一起,形成一个新的数据集。
应用场景


[*]Union 算子适用于需要将多个数据集合并在一起的场景。比方,在流处置惩罚中,可能需要将多个数据流合并为一个数据流进行后续处置惩罚;在批处置惩罚中,可能需要将多个数据集合并在一起进行并行处置惩罚。
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class UnionOperatorExample {

    public static void main(String[] args) throws Exception {
      // 获取 ExecutionEnvironment,用于创建数据集
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 假设已经有两个数据集,包含整数和字符串
      DataSet<Integer> dataSet1 = env.fromElements(1, 2, 3);
      DataSet<Integer> dataSet2 = env.fromElements(4, 5, 6);
      DataSet<String> dataSet3 = env.fromElements("A", "B", "C");

      // 使用 Union 算子将两个数据集合并成一个新数据集
      DataSet<Integer> mergedDataSet = dataSet1.union(dataSet2);

      // 使用 Union 算子将三个数据集合并成一个新数据集
      DataSet<Tuple2<Integer, String>> combinedDataSet = dataSet1.union(dataSet2).union(dataSet3);

      // 输出合并结果
      mergedDataSet.print();
      combinedDataSet.print();
    }
}
三、Sink算子

1. collect

将数据输出到本地集合
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class TransformationsFlatmap {
    public static void main(String []arv) throws Exception
    {
      StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);
      //函数式
//输入spark,hive,hbase
      DataStream ds1=dsSocket.flatMap(new FlatMapFunction<String,Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String,Integer>> collector) throws Exception {
                String [] words=value.split(",");
                for(String word:words){
                  collector.collect(Tuple2.of(word,1));
                }
            }
      });
         ds1.print();
         env.execute("TransformationsMap");
    }
}
2. writeAsText

将数据输出到文件


[*]Flink支持多种存储装备上的文件,包罗本地文件,hdfs文件等
[*]Flink支持多种文件的存储格式,包罗text文件,CSV文件等
// 将数据写入本地文件
result.writeAsText("/data/a", WriteMode.OVERWRITE)

// 将数据写入HDFS
result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)
DataStream流处置惩罚算子

和DataSet一样,DataStream也包罗一系列的Transformation操纵
一、Source算子

Flink可以利用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然我们也可以通过实现 SourceFunction 来自界说非并行的source大概实现 ParallelSourceFunction 接口大概扩展 RichParallelSourceFunction 来自界说并行的 source。
Flink在流处置惩罚上的source和在批处置惩罚上的source基本划一。大致有4大类:


[*]基于本地集合的source(Collection-based-source)
[*]基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
[*]基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
[*]自界说的source(Custom-source)
1:基于文件



[*]readTextFile(path) - 读取文本文件,比方遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
[*]readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。
[*]readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的差别,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),大概处置惩罚一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。利用 pathFilter,用户可以进一步排除正在处置惩罚的文件。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author alanchan
*
*/
public class Source_File {

        /**
       * 一般用于学习测试 env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
       *
       * @param args
       * @throws Exception
       */
        public static void main(String[] args) throws Exception {
                // env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                // source
                DataStream<String> ds1 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
                DataStream<String> ds2 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/input/distribute_cache_student");
                DataStream<String> ds3 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.tar.gz");
                DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");

                // transformation

                // sink
                ds1.print();
                ds2.print();
                ds3.print();
                ds4.print();

                // execute
                env.execute();

        }

}

2:基于套接字

socketTextStream - 从套接字读取。元素可以由分隔符分隔。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* @author alanchan
*         在server2上使用nc -lk 9999 向指定端口发送数据
*         nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据
*         如果没有该命令可以下安装 yum install -y nc
*         
*/
public class Source_Socket {

        /**
       * @param args
       * @throws Exception
       */
        public static void main(String[] args) throws Exception {
                //env
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

      //source
      DataStream<String> lines = env.socketTextStream("server2", 9999);
      
      //transformation
      /*SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split(" ");
                for (String word : arr) {
                  out.collect(word);
                }
            }
      });

      words.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value,1);
            }
      });*/

      //注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回
//      SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
//            @Override
//            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                String[] arr = value.split(" ");
//                for (String word : arr) {
//                  out.collect(Tuple2.of(word, 1));
//                }
//            }
//      });
//
//      SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);

      //sink
      lines.print();

      //execute
      env.execute();
        }

}

3:基于集合



[*]fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
[*]fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
[*]fromElements(T …) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
[*]fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
[*]generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。
import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author alanchan
*
*/
public class Source_Collection {

        /**
       * 一般用于学习测试时编造数据时使用
       * 1.env.fromElements(可变参数);
       * 2.env.fromColletion(各种集合);
       * 3.env.generateSequence(开始,结束);
       * 4.env.fromSequence(开始,结束);
       *
       * @param args 基于集合
       * @throws Exception
       */
        public static void main(String[] args) throws Exception{
                // env
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

      // source
      DataStream<String> ds1 = env.fromElements("i am alanchan", "i like flink");
      DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan", "i like flink"));
      DataStream<Long> ds3 = env.generateSequence(1, 10);//已过期,使用fromSequence方法
      DataStream<Long> ds4 = env.fromSequence(1, 100);

      // transformation

      // sink
      ds1.print();
      ds2.print();
      ds3.print();
      ds4.print();

      // execute
      env.execute();
        }

}

4:自界说

addSource - 关联一个新的 source function。比方,你可以利用 addSource(new FlinkKafkaConsumer<>(…)) 来从 Apache Kafka 获取数据。
kafka

package com.wenge.datagroup.storage;

import com.wenge.datagroup.storage.common.ArgsConstants;
import com.wenge.datagroup.storage.utils.ConfigUtil;
import com.wenge.datagroup.storage.utils.Funnel;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/
public class TestFlink {
    public static void main(String[] args) {
      StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      int sourceParallelism =1;
      String topic = "topic_name";

      Properties properties = getParameters();
      FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                topic,
                new SimpleStringSchema(),
                properties
      );
      DataStreamSource<String> kafkaDataStreamSource = streamEnv.addSource(consumer);
      DataStream<String> dataStream = kafkaDataStreamSource.setParallelism(sourceParallelism).name("KafkaSource-" + topic);
      dataStream .print();
      // execute
                streamEnv .execute();
    }

   
    private static Properties getParameters(){
      Properties properties = new Properties();

      // 集群地址
      properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092");
      // 消费者组id
      properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "wkl_test");
      properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      properties.setProperty("partition.discovery.interval.ms", "10000");//消费者定期发现动态创建的Kafka主题和分区的时间间隔
      // latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
      // earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
      properties.setProperty("auto.offset.reset", "latest");
//      properties.setProperty("enable.auto.commit", "true");
//      properties.setProperty("auto.commit.interval.ms", "1000");//自动提交的时间间隔
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交的时间间隔
      properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "20000");//自动提交的时间间隔
      //每次从kafka中获取的数量
      properties.setProperty("max.poll.records", "2");

      return properties;
    }
      
}

mysql

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.source_transformation_sink.bean.User;

/**
* @author alanchan
* 自定义数据源-MySQL
*/
public class Source_MySQL {

        /**
       * @param args
       * @throws Exception
       */
        public static void main(String[] args) throws Exception {
                // env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                // source
                DataStream<User> studentDS = env.addSource(new MySQLSource()).setParallelism(1);

                // transformation

                // sink
                studentDS.print();

                // execute
                env.execute();
        }

        private static class MySQLSource extends RichParallelSourceFunction<User> {
                private boolean flag = true;
                private Connection conn = null;
                private PreparedStatement ps = null;
                private ResultSet rs = null;

                // open只执行一次,适合开启资源
                @Override
                public void open(Configuration parameters) throws Exception {
                        conn = DriverManager.getConnection("jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
                        String sql = "select id,name,pwd,email,age,balance from user";
                        ps = conn.prepareStatement(sql);
                }

                @Override
                public void run(SourceContext<User> ctx) throws Exception {
                        while (flag) {
                                rs = ps.executeQuery();
                                while (rs.next()) {
                                        User user = new User(
                                                rs.getInt("id"),
                                                rs.getString("name"),
                                                rs.getString("pwd"),
                                                rs.getString("email"),
                                                rs.getInt("age"),
                                                rs.getDouble("balance")
                                        );
                                        ctx.collect(user);
                                }
                                Thread.sleep(5000);
                        }
                }

                // 接收到cancel命令时取消数据生成
                @Override
                public void cancel() {
                        flag = false;
                }

                // close里面关闭资源
                @Override
                public void close() throws Exception {
                        if (conn != null)
                                conn.close();
                        if (ps != null)
                                ps.close();
                        if (rs != null)
                                rs.close();
                }

        }

}

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
        private int id;
        private String name;
        private String pwd;
        private String email;
        private int age;
        private double balance;
}

二、Transform转换算子

Flink算子,实在就是”数据转换算子“,对数据进行处置惩罚的方法大概程序封装就是算子
https://i-blog.csdnimg.cn/direct/46a29cc59a694c9e97bf28e3a52fd661.png
1: map

Map算子,就是映射算子,将一个数据映射为另一个数据,与Java8 stream 流式操纵中的map划一
public static void main(String[] args) throws Exception {
    List<String> stringList = Arrays.asList("a", "b", "c");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并行度为1 (1个线程执行,以便观察)
    env.setParallelism(1);
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    // 加载数据源
    DataStreamSource<String> stream = env.fromCollection(stringList);
    // 使用map算子
    SingleOutputStreamOperator<String> source = stream.map(new MapFunction<String, String>() {
      @Override
      public String map(String value) throws Exception {
            return value.toUpperCase()+ "aa";
      }
    });
    // Aaa Baa Caa
    source.print();
    env.execute();
}

2:FlatMap算子

FlatMap算子,可以将数据进行摊平化处置惩罚 比方 原本每一个元素都是集合大概数数组,我们利用FlatMap后,可以将(集合,数组)进行再次拆解取出其中的数据,再新组合为集合,与Java8 stream 流式操纵中的Flatmap功能划一
public static void main(String[] args) throws Exception {
    List<String> str1 = Arrays.asList("a", "b", "c");
    List<String> str2 = Arrays.asList("关羽","张飞","马超","黄忠","赵云");
    List<List<String>> originalData = new ArrayList<>();
    originalData.add(str1);
    originalData.add(str2);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并行度为1 (1个线程执行,以便观察)
    env.setParallelism(1);
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    // 加载数据源
    DataStreamSource<List<String>> stream = env.fromCollection(originalData);
    // 使用flatMap算子
    SingleOutputStreamOperator<String> source = stream.flatMap(new FlatMapFunction<List<String>, String>() {
      @Override
      public void flatMap(List<String> value, Collector<String> out) throws Exception {
            for (String s : value) {
                out.collect(s);
            }
      }
    });
    // a b c 关羽 张飞 马超 黄忠 赵云
    source.print();
    env.execute();
}

3:Filter算子

Filter为筛选(过滤)算子,可以根据条件过滤数据源中数据,比方现有数据源 1,2,3,4,5 现在要过滤大于3的数据,过滤后,数据源中仅有 4 5 数据了,与Java8 stream 流式操纵中的filter功能划一
public static void main(String[] args) throws Exception {
    List<Integer> str1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并行度为1 (1个线程执行,以便观察)
    env.setParallelism(1);
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    // 加载数据源
    DataStreamSource<Integer> stream = env.fromCollection(str1);
    // 使用filter算子
    SingleOutputStreamOperator<Integer> source = stream.filter(new FilterFunction<Integer>() {
      @Override
      public boolean filter(Integer value) throws Exception {
            return value > 3;
      }
    });
    // 4 5 6 7 8
    source.print();
    env.execute();
}

4:KeyBy算子

分组算子,根据数据源中元素某一特性进行分组,与Java8 stream 流式操纵中的groupBy功能划一
public static void main(String[] args) throws Exception {
      List<User> users = Arrays.asList(
                new User("张三", 12),
                new User("张三", 18),
                new User("李四", 22),
                new User("麻子", 35));
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 设置并行度为1 (1个线程执行,以便观察)
      env.setParallelism(1);
      env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
      // 加载数据源
      DataStreamSource<User> stream = env.fromCollection(users);

      // 使用filter算子
      KeyedStream<User, String> keyedStream = stream.keyBy(new KeySelector<User, String>() {
            @Override
            public String getKey(User value) throws Exception {
                return value.getName();
            }
      });
      keyedStream.print();
      env.execute();
    }

5:Union算子

union :联合算子, 利用此算子,可对多个数据源进行合并操纵(数据源数据必须类型必须相同),其可合并多个,合并后可直接对数据进行处置惩罚 (盘算或输出)
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    env.setParallelism(1);
    DataStreamSource<String> source = env.fromElements("zs", "li", "we");
    DataStreamSource<String> source2 = env.fromElements("zs2", "li2", "we2");
    DataStreamSource<String> source3 = env.fromElements("zs3", "li3", "we3");
    //此操作将 source 、source2 、source3 三个数据源的数据联合起来了
    DataStream<String> union = source.union(source2, source3);
    SingleOutputStreamOperator<String> streamOperator = union.map(new MapFunction<String, String>() {
      @Override
      public String map(String value) throws Exception {
            return value.toUpperCase();
      }
    });
    streamOperator.print("union").setParallelism(1);
    env.execute();
}

6:Connect算子

connect与union算子一样,都可以进行数据源合并处置惩罚,但与union差别的是,connect 可以合并差别类型的数据源,但最多只能合并两个数据流,且合并后无法直接操纵(盘算 输出),需要对连接流进行数据处置惩罚(选择终极合并后的数据类型,不符合终极数据类型的转换)
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    env.setParallelism(1);
    DataStreamSource<String> source = env.fromElements("zs", "li", "we");
    DataStreamSource<Integer> source2 = env.fromElements(1, 2, 3, 4, 5, 6, 7);
    ConnectedStreams<String, Integer> connect = source.connect(source2);
    // 我这里是将最终合并类型定为String.
    SingleOutputStreamOperator<String> streamOperator = connect.map(new CoMapFunction<String, Integer, String>() {
      @Override
      public String map1(String value) {
            return value + "是字符串类型,直接加后缀";
      }

      @Override
      public String map2(Integer value) {
            return "原本是Integer类型:" + value + "现在也变为String";
      }
    });
    streamOperator.print("connect");
    env.execute();
}

7:算子链式调用

public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
      env.setParallelism(1);
      DataStreamSource<String> streamSource = env.socketTextStream("xx", 9999);
      SingleOutputStreamOperator<Tuple2<String, Integer>> result = streamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                for (String s1 : s.split(",")) {
                  collector.collect(s1);
                }
            }
      }).filter(s -> !s.equals("sb"))
                .map(new MapFunction<String, String>() {
                  @Override
                  public String map(String s) throws Exception {
                        return s.toUpperCase();
                  }
                }).map(new MapFunction<String, Tuple2<String, Integer>>() {
                  @Override
                  public Tuple2<String, Integer> map(String s) throws Exception {
                        return Tuple2.of(s, 1);
                  }
                }).keyBy(tp -> tp.f0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                  @Override
                  public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
                        return Tuple2.of(t1.f0, t1.f1 + stringIntegerTuple2.f1);
                  }
                });
      result.print();
      env.execute();

    }

8:异步IO调用

package com.wenge.datagroup.storage.process;

import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@Slf4j
public class AnalyerAsyncIOProcess   {

    public static DataStream<JSONObject> process(DataStream<String> dataStream) {

      //线程数
      int asyncNum = 10;

      //过滤不需要的数据
      SingleOutputStreamOperator<JSONObject> filterDataStream = dataStream.filter(new RichFilterFunction<String>() {
            @Override
            public boolean filter(String str) {
                //TODO:根据业务逻辑进行判断
                JSONObject data = null;
                try {
                  data = JSON.parseObject(str);
                  String id = data.getString("id");

                  if (Objects.isNull(id)) {
                        return false;
                  }else {
                        return true;
                  }
                } catch (Exception e) {
                  return false;
                }
            }
      }).name("aysncIOFilter").setParallelism(1).map(new RichMapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String str) throws Exception {
                JSONObject data = null;
                try {
                  data = JSON.parseObject(str);
                  return data;
                } catch (Exception e) {
                  log.error("数据处理异常:{}", str);
                  return null;
                }
            }
      }).name("aysncIOMAP").setParallelism(2);

      // 异步IO
      RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {
            private transient ExecutorService executorService;

            @Override
            public void open(Configuration parameters) {
                // 加载配置文件,每一个并行度只执行一次
                log.error("加载配置文件base");
                this.executorService = Executors.newFixedThreadPool(asyncNum);
            }

            @Override
            public void close() throws Exception {
                // 关闭线程池
                if (executorService != null) {
                  executorService.shutdown();
                }
                log.error("----------------------------线程池关闭----------------------");
            }

            @Override
            public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
                log.error("------------------------数据超时----------------------:{}", input);
                JSONObject data = input;
                //对超时数据进行处理
                resultFuture.complete(Collections.singleton(data));
            }

            @Override
            public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {

                CompletableFuture.supplyAsync(new Supplier<JSONObject>() {

                  @Override
                  public JSONObject get() {
                        //识别语种
                        String postResult = new String();
                        String id = json.getString("id");
                        long start = System.currentTimeMillis();
                        try {
                            //TODO: 根据业务逻辑进行处理
                            log.error("异步处理数据base:{}", json.getString("id"));

                            postResult = HttpRequest.post("http://127.0.0.1:8080").body(new JSONObject().toJSONString()).execute().body();

                            if (StringUtils.isNotBlank(postResult)) {
                              json.put("postResult", postResult);
                            }
                            long end = System.currentTimeMillis();
                            log.error("id:{},请求接口:{},耗时:{} ms", id, postResult, (end - start));
                            return json;
                        } catch (Exception e) {
                            log.error("----------语种识别异步IO处理异常:{},数据:{}", id, e);
                            return json;
                        }
                  }
                }, executorService).thenAccept((JSONObject dbResult) -> {
                  resultFuture.complete(Collections.singleton(dbResult));
                });
            }
      };
      DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
                filterDataStream,
                richAsyncFunction,
                5000,
                TimeUnit.MILLISECONDS,
                asyncNum).name("IO").setParallelism(2);

      return downloadStream;
    }


}

9:异步IO调用 sql

package com.wenge.datagroup.storage.process;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@Slf4j
public class AnalyerAsyncIOSQL1 {

    public static DataStream<JSONObject> process(DataStream<JSONObject> dataStream) {

      //sql 连接数
      int dataAsyncNum =10;
      //并行度
      int asyncNum =10;

      String databaseUrl= "jdbc:mysql://127.0.0.1:3306/test_data?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";
      String databaseUsername= "root";
      String databasePassword= "123456";

      // 异步IO
      RichAsyncFunction richAsyncFunction = new RichAsyncFunction<JSONObject, JSONObject>() {

            private transient ExecutorService executorService;
            private DruidDataSource dataSource;

            @Override
            public void open(Configuration parameters) {
                // 重新加载配置文件
                log.error("重新加载配置文件-SQL");
                this.executorService = Executors.newFixedThreadPool(dataAsyncNum);

                dataSource = new DruidDataSource();
                dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
                dataSource.setUrl(databaseUrl);
                dataSource.setUsername(databaseUsername);
                dataSource.setPassword(databasePassword);
                dataSource.setMaxActive(dataAsyncNum);
            }

            @Override
            public void close() throws Exception {
                // 关闭线程池
                executorService.shutdown();
                dataSource.close();
                log.error("----------------------------线程池关闭----------------------");
            }

            @Override
            public void timeout(JSONObject input, ResultFuture<JSONObject> resultFuture) {
                log.error("------------------------数据超时----------------------:{}", input);
                JSONObject data = input;
                //对超时数据进行处理
                resultFuture.complete(Collections.singleton(data));
            }

            @Override
            public void asyncInvoke(JSONObject json, ResultFuture<JSONObject> resultFuture) {

                CompletableFuture.supplyAsync(new Supplier<JSONObject>() {

                  @Override
                  public JSONObject get() {
                        String id = json.getString("id");
                        long start =System.currentTimeMillis();
                        try {
                            //TODO: 根据业务逻辑进行处理
                            String country_id = new AnalyerAsyncIOSQL1().queryFromMySql("name", dataSource);
                            log.error("----------SQL,id:{},数据:{}", id, country_id);
                            if(StringUtils.isNotBlank(country_id)){
                              json.put("country_id",country_id);
                            }
                            long end =System.currentTimeMillis();
                            log.error("id:{},sql,耗时:{} ms", id,(end-start));

                            return json;
                        } catch (Exception e) {
                            log.error("----------SQL异步IO处理异常:{},数据:{}", id, e);
                            return json;
                        }
                  }
                }, executorService).thenAccept((JSONObject dbResult) -> {
                  resultFuture.complete(Collections.singleton(dbResult));
                });
            }
      };

      DataStream<JSONObject> downloadStream = AsyncDataStream.unorderedWait(
                dataStream,
                richAsyncFunction,
                5000,
                TimeUnit.MILLISECONDS,
                dataAsyncNum).name("IO_SQL").setParallelism(asyncNum);

      return downloadStream;
    }


    /**
   * SQL 查询代码实现
   */
    public String queryFromMySql(String name,DataSource dataSource) throws SQLException {
      String sql = "select id,name,country_id ,status from nation_info where status =1 and name = ?";

      String result = null;
      Connection connection = null;
      PreparedStatement stmt = null;
      ResultSet rs = null;

      try {
            connection = dataSource.getConnection();
            stmt = connection.prepareStatement(sql);
            stmt.setString(1, name);

            rs = stmt.executeQuery();

            while (rs.next()) {
                result = rs.getString("country_id");
            }
      } finally {
            if (rs != null) {
                rs.close();
            }
            if (stmt != null) {
                stmt.close();
            }
            if (connection != null) {
                connection.close();
            }
      }
      return result;
    }


}

三、Data Sinks

Data sinks 利用 DataStream 并将它们转发到文件、套接字、外部体系或打印它们。Flink 自带了多种内置的输特别式,这些格式相干的实现封装在 DataStreams 的算子里:


[*]writeAsText() / TextOutputFormat - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。
[*]writeAsCsv(…) / CsvOutputFormat - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
[*]print() / printToErr() - 在尺度输出/尺度错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分差别的 print 调用。假如并行度大于1,输出结果将附带输出任务标识符的前缀。
[*]writeUsingOutputFormat() / FileOutputFormat - 自界说文件输出的方法和基类。支持自界说 object 到 byte 的转换。
[*]writeToSocket - 根据 SerializationSchema 将元素写入套接字。
addSink - 调用自界说 sink function。Flink 捆绑了连接到其他体系(比方 Apache Kafka)的连接器,这些连接器被实现为 sink functions。
注意,DataStream 的 write*() 方法重要用于调试目的。它们不到场 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。革新到目的体系的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立刻显示在目的体系中。别的,在失败的环境下,这些记录可能会丢失。
为了将流可靠地、精准一次地传输到文件体系中,请利用 FileSink。别的,通过 .addSink(…) 方法调用的自界说实现也可以到场 Flink 的 checkpointing,以实现精准一次的语义。
kafka

import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.Properties;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
* @author alanchan
*
*/
public class SinkKafka {

        public static void main(String[] args) throws Exception {
                // env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                // source
                // 准备kafka连接参数
                Properties props = new Properties();
                // 集群地址
                props.setProperty("bootstrap.servers", "server1:9092");
                // 消费者组id
                props.setProperty("group.id", "flink");
                // latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
                // earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
                props.setProperty("auto.offset.reset", "latest");

                // 会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
                props.setProperty("flink.partition-discovery.interval-millis", "5000");
                // 自动提交
                props.setProperty("enable.auto.commit", "true");
                // 自动提交的时间间隔
                props.setProperty("auto.commit.interval.ms", "2000");
                // 使用连接参数创建FlinkKafkaConsumer/kafkaSource
                FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(
                "t_kafkasource",
                new SimpleStringSchema(),
                props);
               
                // 使用kafkaSource
                DataStream<String> kafkaDS = env.addSource(kafkaSource);

                // transformation算子,业务计算
                //以alan作为结尾
                SingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction<String>() {
                        @Override
                        public boolean filter(String value) throws Exception {
                                return value.contains("alan");
                        }
                });

                // sink
                etlDS.print();

                Properties props2 = new Properties();
                props2.setProperty("bootstrap.servers", "server1:9092");
                props2.setProperty(ConsumerConfig.GROUP_ID_CONFIG, grouId);
      props2.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      props2.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      props2.setProperty("enable.idempotence", "false");
      
                FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
                "send_topic",
                new SimpleStringSchema(),
                props2);
               
                etlDS.addSink(kafkaSink);

                // execute
                env.execute();
        }

}

flie

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author alanchan
*/
public class SinkDemo {

        public static void main(String[] args) throws Exception {
                // env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                // source
                DataStream<String> ds = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
                System.setProperty("HADOOP_USER_NAME", "alanchan");
                // transformation
                // sink
//                ds.print();
//                ds.print("输出标识");
//                ds.printToErr();// 会在控制台上以红色输出
//                ds.printToErr("输出标识");// 会在控制台上以红色输出
                // 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件
//                ds.writeAsText("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/output/result1").setParallelism(1);
                ds.writeAsText("hdfs://server2:8020///flinktest/words").setParallelism(2);

                // execute
                env.execute();
        }

}

mysql

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.source_transformation_sink.bean.User;

/**
* @author alanchan
*
*/
public class SinkToMySQL {

        public static void main(String[] args) throws Exception {
                // 0.env
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                // 1.source
                DataStream<User> studentDS = env.fromElements(new User(1, "alanchan", "sink mysql", "alan.chan.chn@163.com", 19, 800));
                // 2.transformation
               
                // 3.sink
                studentDS.addSink(new MySQLSink());

                // 4.execute
                env.execute();
        }

        private static class MySQLSink extends RichSinkFunction<User> {
                private Connection conn = null;
                private PreparedStatement ps = null;

                @Override
                public void open(Configuration parameters) throws Exception {
                        conn = DriverManager.getConnection(
                                        "jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "123456");
//                        private int id;
//                        private String name;
//                        private String pwd;
//                        private String email;
//                        private int age;
//                        private double balance;
                        String sql = "INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
                        ps = conn.prepareStatement(sql);
                }

                @Override
                public void invoke(User value, Context context) throws Exception {
                        // 设置?占位符参数值
                        ps.setString(1, value.getName());
                        ps.setString(2, value.getPwd());
                        ps.setString(3, value.getEmail());
                        ps.setInt(4, value.getAge());
                        ps.setDouble(5, value.getBalance());
                        // 执行sql
                        ps.executeUpdate();
                }

                @Override
                public void close() throws Exception {
                        if (conn != null)
                                conn.close();
                        if (ps != null)
                                ps.close();
                }

        }

}

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
        private int id;
        private String name;
        private String pwd;
        private String email;
        private int age;
        private double balance;
}


ES

package com.wenge.datagroup.storage;

import com.alibaba.fastjson.JSONObject;
import com.wenge.datagroup.storage.utils.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.http.HttpHost;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.elasticsearch.action.update.UpdateRequest;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/
public class TestFlink {
    static Stringtopic = "test_topic";
    static Integer bulkSize = 100;
    static Integer sinkParallelism = 2;

    public static void main(String[] args) {
      // env
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

      // source
      // 准备kafka连接参数
      Properties props = getParameters();

      // 使用连接参数创建FlinkKafkaConsumer/kafkaSource
      FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>(
                topic,
                new SimpleStringSchema(),
                props);

      // 使用kafkaSource
      DataStream<String> kafkaDS = env.addSource(kafkaSource);

      //更新进ES
      List<HttpHost> esAddresses = null;
      try {
            esAddresses = getEsAddresses("192.168.1.1:9200,192.168.1.2:9200");
      } catch (MalformedURLException e) {
            log.error("解析ES地址报错", e);
            e.printStackTrace();
      }

      ElasticsearchSinkFunction<JSONObject> elasticsearchSinkUpdateFunction = new ElasticsearchSinkFunction<JSONObject>() {

            @Override
            public void process(JSONObject jsonObject, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                //TODO:这里按照发布时间存到不同的索引,需要根据项目自行修改
                String index = "writeIndex";
                String uuid = jsonObject.getString("uuid");
                jsonObject.remove("_index");
                jsonObject.put("topic_flag",topic);
                jsonObject.put("es_insert_time", DateUtils.getCurrentDateTime());
                if(StringUtils.isEmpty(index)){
                  log.error("UUID:{} index为空,不再更新ES", uuid);
                  return;
                }else {
                  log.error("索引:{}中更新数据:UUID:{} ",index, uuid);
                }

                UpdateRequest updateRequest = new UpdateRequest(index, uuid)
                        .docAsUpsert(true)
                        .doc(jsonObject)
                        //版本冲突中的重试次数
                        .retryOnConflict(5);

                requestIndexer.add(updateRequest);
            }


      };

      ESSinkUtil.addUpdateSink(esAddresses, bulkSize, sinkParallelism, kafkaDS, elasticsearchSinkUpdateFunction, "saveEs");


    }


    /**
   * 解析配置文件的 es hosts
   *
   * @param hosts hosts字符串
   * @throws MalformedURLException 地址异常
   */
    public static List<HttpHost> getEsAddresses(String hosts) throws MalformedURLException {
      String[] hostList = hosts.split(",");
      List<HttpHost> addresses = new ArrayList<>();
      for (String host : hostList) {
            if (host.startsWith("http")) {
                URL url = new URL(host);
                addresses.add(new HttpHost(url.getHost(), url.getPort()));
            } else {
                String[] parts = host.split(":", 2);
                if (parts.length > 1) {
                  addresses.add(new HttpHost(parts, Integer.parseInt(parts)));
                } else {
                  throw new MalformedURLException("invalid elasticsearch hosts format");
                }
            }
      }
      return addresses;
    }

    /**
   * es sink
   *
   * @param hosts               es hosts
   * @param bulkFlushMaxActions bulk flush size
   * @param parallelism         并行数
   * @param dataStream          数据
   * @param func                es写入方法
   * @param <T>               泛型
   * @param sinkName            算子名称
   */
    public static <T> void addUpdateSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
                                       DataStream<T> dataStream, ElasticsearchSinkFunction<T> func,
                                       String sinkName) {
      ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, func);
      esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);

      new RetryRequestFailureHandler();

      RestClientFactory restClientFactory = ESConstant.restClientFactory;
      //设置用户名密码
      esSinkBuilder.setRestClientFactory(restClientFactory);
      esSinkBuilder.setFailureHandler(new UpdateRetryRequestFailureHandler());
      //Bulk刷新间隔
      esSinkBuilder.setBulkFlushInterval(1000);
      //重试次数
      esSinkBuilder.setBulkFlushBackoffRetries(10);
      //重试间隔
      esSinkBuilder.setBulkFlushBackoffDelay(5000);
      //重试类型
      esSinkBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);


      dataStream.addSink(esSinkBuilder.build()).name(sinkName).setParallelism(parallelism);
    }


    private static Properties getParameters(){
      Properties properties = new Properties();

      // 集群地址
      properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092");
      // 消费者组id
      properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "wkl_test");
      properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
      properties.setProperty("partition.discovery.interval.ms", "10000");//消费者定期发现动态创建的Kafka主题和分区的时间间隔
      // latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费
      // earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
      properties.setProperty("auto.offset.reset", "latest");
//      properties.setProperty("enable.auto.commit", "true");
//      properties.setProperty("auto.commit.interval.ms", "1000");//自动提交的时间间隔
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//自动提交的时间间隔
      properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "20000");//自动提交的时间间隔
      //每次从kafka中获取的数量
      properties.setProperty("max.poll.records", "2");

      return properties;
    }



   
}


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink -2-Flink 算子和java代码简单利用