tsx81429 发表于 5 天前

Flink使用详解

本文档使用 flink-1.13.1 版本依赖
一、Flink 常用流处置处罚 API

getExecutionEnvironment

创建实行环境
StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
setParallelism

为实行环境设置并行度
environment.setParallelism(1);
addSource

给实行环境指定数据来源
// 设置 RabbitMQ 连接配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
      .setHost("192.168.117.4")
      .setPort(5672)
      .setVirtualHost("/")
      .setUserName("mix")
      .setPassword("jovision")
      .build();
// 创建侧输出流
OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};

// 创建 RabbitMQ 数据源,获取名为demo.in队列中的消息
DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
      connectionConfig,
      "demo.in",
      true,
      new SimpleStringSchema()
));
map

用于将吸收到的流转化成目的数据类型
// 将所有接受的数据分出两个相同的流对象,侧输出流用于测试分组能力
SingleOutputStreamOperator<TestDto> originStream = rabbitMQStream.map(new MapFunction<String, TestDto>() {
    @Override
    public TestDto map(String s) throws Exception {
      TestDto testDto = JSON.parseObject(s, TestDto.class);
      return testDto;
    }
});
OutputTag

侧输出流,可拷贝来源流并进行其他逻辑操作
// 创建侧输出流
OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
// 将所有接受的数据分出两个流对象,侧输出流用于测试分组能力
SingleOutputStreamOperator<TestDto> originStream = originStream.process(new ProcessFunction<TestDto, TestDto>() {
    @Override
    public void processElement(TestDto testDto, ProcessFunction<TestDto, TestDto>.Context context, Collector<TestDto> collector) throws Exception {
      // 通过age大小分流
      if(testDto.getAge() > 10){
            // 将流数据塞入上面创建的maxTestStream侧输出流
            context.output(maxTestStream, testDto);
      }else{
            // 将流数据依然放入当前输出流
            collector.collect(testDto);
      }
    }
});
print

打印流信息
originStream.print("sumStream");
filter

用于过滤吸收的流
// 过滤接收的数据,只接收age=11的流数据放入sinkStream流对象
SingleOutputStreamOperator<String> sinkStream = originStream.filter(new FilterFunction<TestDto>() {
    @Override
    public boolean filter(TestDto testDto) throws Exception {
      return testDto.getAge() == 11;
    }
}).map(new MapFunction<TestDto, String>() {
    // 转成String
    @Override
    public String map(TestDto testDto) throws Exception {
      return JSON.toJSONString(testDto);
    }
});
getSideOutput

获取测输出流数据
// 上面是在originStream流对象将流数据分给maxTestStream侧输出流的,所以需要如下调用
DataStream<TestDto> sideOutput = originStream.getSideOutput(maxTestStream);
keyBy

将获取到的流对指定属性分组
// 按照对象的name进行分组
KeyedStream<TestDto, Tuple> keyedStream = sideOutput.keyBy("name");
max

分组后返回指定属性最大的值(返回的指定属性最大的值是正确的,但是其中的其他属性大概不准确)
DataStream<TestDto> maxStream = keyedStream.max("age");
maxBy

分组后返回指定属性最大的值(返回的最大值与相应的其他字段内容都是准确的)
DataStream<TestDto> maxByStream = keyedStream.maxBy("age");
sum

分组后返回指定属性累加值
DataStream<TestDto> sumStream = keyedStream.sum("age");
reduce

分组后可操作新数据及上一次数据,可进行累加、比较等逻辑操作
// 每次收到数据会按照分组信息分组,并累加上一次的age数据后存入reduceStream流对象,也可实现其他逻辑
SingleOutputStreamOperator<TestDto> reduceStream = keyedStream.reduce(new ReduceFunction<TestDto>() {
    @Override
    public TestDto reduce(TestDto beforeDto, TestDto nowDto) throws Exception {
      log.info("----------- reduce beforeDto:" + beforeDto);
      log.info("----------- reduce nowDto:" + nowDto);
      return beforeDto.setAge(beforeDto.getAge() + nowDto.getAge());
    }
});
connect

可归并两个不同数据类型的流
// 连接两个不同数据类型的流,类型为String的流对象sinkStream与类型为TestDto的流对象sideOutput合并
ConnectedStreams<String, TestDto> connectStream = sinkStream.connect(sideOutput);
DataStream<Object> connectObjectStream = connectStream.map(new CoMapFunction<String, TestDto, Object>() {
    @Override
    public Object map1(String s) throws Exception {
      // 对第一个sinkStream流对象中的数据进行操作并返回
      return new Tuple2<String,String>("stringStream",s);
    }

    @Override
    public Object map2(TestDto testDto) throws Exception {
      // 对第二个sideOutput流对象中的数据进行操作并返回
      return new Tuple2<String,TestDto>("objectStream",testDto);
    }
});
union

可归并两个相同数据类型的流
// union 的使用(同一种数据类型的流才能用这个方法)
DataStream<TestDto> unionStream = sideOutput.union(originStream);
addSink

为流对象指定数据发送目的
// 设置 RabbitMQ 连接配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
      .setHost("192.168.117.4")
      .setPort(5672)
      .setVirtualHost("/")
      .setUserName("mix")
      .setPassword("jovision")
      .build();
// 将数据发送至名为demo.out的队列
sinkStream.addSink(new RMQSink<>(
      connectionConfig,
      "demo.out",
      new SimpleStringSchema()
));
execute

运行实行环境(在 addSink 之后实行)
// 执行
environment.execute();
二、Flink 连接器(数据源、数据写入)

在 Flink 官网中,当前自带的且常用的连接器如下:
https://i-blog.csdnimg.cn/direct/7e07046d057a4d2aab80048abebbeac4.png
RabbitMQ 已经有自带的【数据源连接器】以及【数据写入连接器】。
Redis、jdbc 只有自带的【数据写入连接器】,所以数据源连接器必要实现【SourceFunction】接口进行自定义。
RabbitMQ 连接器

数据源连接器(可实时斲丧消息)

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
      .setHost("192.168.117.4")
      .setPort(5672)
      .setVirtualHost("/")
      .setUserName("mix")
      .setPassword("jovision")
      .build();

// 创建 RabbitMQ 数据源
DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
      connectionConfig,
      "demo.in",
      true,
      new SimpleStringSchema()
));
数据写入连接器

rabbitMQStream.addSink(new RMQSink<>(
      connectionConfig,
      "demo.out",
      new SimpleStringSchema()
));
Kafka 连接器

数据源连接器(可实时斲丧消息)

Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", "192.168.2.198:9092");
sourceProperties.setProperty("group.id", "test");

// 创建Kafka消费者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
      "vse.unicom.payload.channel", // 源 topic
      new SimpleStringSchema(),   // 数据序列化方式
      sourceProperties            // Kafka消费者配置
);

DataStream<String> sourceStream = environment.addSource(consumer);
数据写入连接器

Properties sinkProperties = new Properties();
sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.198:9092");
sinkProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
sinkProperties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
sinkProperties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
sinkProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
sinkProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
sinkProperties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "4194304");
FlinkKafkaProducer<String> channelProducer = new FlinkKafkaProducer<String>(
      "vse.unicom.payload.tenant", // 目标 topic
      new SimpleStringSchema(),    // 序列化 schema
      sinkProperties);

sourceStream.addSink(channelProducer);
Redis 连接器

数据源连接器(需自定义,单次实行,不可监听 redis 数据变更)

public static class RedisSource implements SourceFunction<Tuple3<String,String,String>> {
    @Override
    public void run(SourceContext<Tuple3<String,String,String>> sourceContext) throws Exception {
      // 连接 Redis
      Jedis jedis = new Jedis("192.168.117.4", 6379);

      // 设置密码(如果需要的话)
      jedis.auth("redis@Abc-1234");
      // 选择数据库
      jedis.select(0);
      for (int i = 0; i < 10; i++){
            String key = "flink:test" + i;
            String key2 = "flink:map" + i;
            jedis.set( key, String.valueOf(new Random().nextInt()));
            sourceContext.collect(new Tuple3<>(key,key2,jedis.get(key)));
      }
    }

    @Override
    public void cancel() {

    }
}
数据写入连接器

public static class CoustomRedisSink implements RedisMapper<Tuple2<String, String>> {

    @Override
    public RedisCommandDescription getCommandDescription() {
      return new RedisCommandDescription(RedisCommand.SET);
    }

    @Override
    public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
      return stringStringTuple2._1();
    }

    @Override
    public String getValueFromData(Tuple2<String, String> stringStringTuple2) {
      return stringStringTuple2._2();
    }
}
jdbc 连接器

数据源连接器(需自定义,单次实行,不可监听 mysql 数据变更)

public static class MysqlSource implements SourceFunction<MeshVcDto> {
    @Override
    public void run(SourceContext<MeshVcDto> sourceContext) throws Exception {
      // 定义数据库连接信息
      String dbURL = "jdbc:mysql://192.168.117.4:3306/jvs_tdms";
      String username = "root";
      String password = "Jo123@My";

      // 连接数据库
      Connection conn = DriverManager.getConnection(dbURL, username, password);

      // 执行查询
      String query = "SELECT id, device_id, mesh_vc, add_time FROM udms_mesh_vc_log where verification_code is null";
      Statement stmt = conn.createStatement();
      ResultSet rs = stmt.executeQuery(query);

      // 处理查询结果
      while (rs.next()) {
            Long id = rs.getLong("id");
            String deviceId = rs.getString("device_id");
            String meshVc = rs.getString("mesh_vc");
            Date addTime = rs.getDate("add_time");
            sourceContext.collect(new MeshVcDto().setId(id).setMeshVc(meshVc).setDeviceId(deviceId).setAddTime(addTime));
      }
    }

    @Override
    public void cancel() {

    }
}
数据写入连接器

dataStream.addSink(JdbcSink.sink("update udms_mesh_vc_log set device_id = ?, mesh_vc = ? where id = ?", new JdbcStatementBuilder<MeshVcDto>() {
      @Override
      public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
            preparedStatement.setString(1, meshVcDto.getDeviceId());
            preparedStatement.setString(2, meshVcDto.getMeshVc());
            preparedStatement.setLong(3, meshVcDto.getId());
      }
    },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withDriverName("com.mysql.jdbc.Driver")
            .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
            .withUsername("root")
            .withPassword("Jo123@My")
            .build()));


dataStream.addSink(JdbcSink.sink("insert into udms_mesh_vc_log (device_id, mesh_vc) values (?,?)", new JdbcStatementBuilder<MeshVcDto>() {
      @Override
      public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
            preparedStatement.setString(1, meshVcDto.getDeviceId());
            preparedStatement.setString(2, meshVcDto.getMeshVc());
      }
    },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                  .withDriverName("com.mysql.jdbc.Driver")
                  .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
                  .withUsername("root")
                  .withPassword("Jo123@My")
                  .build()));
三、Flink 常用 Window

窗口计算,必要先分组,然后指定窗口类型,然后编写计算逻辑


[*]Window 可以分为两大类:

[*]CountWindow:按照指定的数据条数生产一个 Window,只有数据数目有关,分为如下两类

[*]滚动计数窗口 ( Tumbling Count Window),窗口没有重叠
[*]滑动计数窗口 ( Sliding Count Window),窗口有重叠

[*]TimeWindow:按照时间生成 Window,根据窗口实现原理的不同分为三类:

[*]滚动时间窗口 ( Tumbling Time Window),窗口没有重叠
[*]滑动时间窗口 ( Sliding Time Window),窗口有重叠
[*]会话窗口 ( Session Window),窗口开始结束时间不固定,在一个固定的时间周期没有新的元素吸收,会自动关闭窗口


CountWindow

滚动计数窗口

// 只有当前这个name的消息收到第3次,才会计算前三次的age和,否则不会计算并输出至mq
SingleOutputStreamOperator<String> map1 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
    @Override
    public TestDto map(String s) throws Exception {
      return JSON.parseObject(s, TestDto.class);
    }
}).keyBy("name")
.countWindow(3)
.sum("age").map(new MapFunction<TestDto, String>() {
    @Override
    public String map(TestDto testDto) throws Exception {
      return JSON.toJSONString(testDto);
    }
});
滑动计数窗口

// 只有当前这个name的消息收到第2次,才会计算前5次的和,否则不会计算并输出至mq
SingleOutputStreamOperator<String> map2 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
    @Override
    public TestDto map(String s) throws Exception {
      return JSON.parseObject(s, TestDto.class);
    }
}).keyBy("name")
.countWindow(5, 2)
.sum("age").map(new MapFunction<TestDto, String>() {
    @Override
    public String map(TestDto testDto) throws Exception {
      return JSON.toJSONString(testDto);
    }
});
TimeWindow

滚动时间窗口

   留意事项:
触发计算的动作与并行度巨细有关系,一个窗口只有吸收消息的数目达到并行度之后,才气触发上个窗口的计算并输出
比方:设置为 1 并行度时,[0,15)窗口已颠末去,但是不会立刻输出计算结果,来到[15,30)窗口,只有吸收了 1 条消息,才会触发[0,15)窗口的计算并输出
// 迟到较晚的流数据存储至本侧输出流
OutputTag<TestDto> testDtoOutputTag = new OutputTag<TestDto>("迟到了") {};
// 只有当前这个name的消息一直收到15s,才会计算这15s时间段的age和,否则不会计算并输出至mq
SingleOutputStreamOperator<TestDto> name = rabbitMQStream
      // window截止后继续等2秒,将window范围内的时间加入到计算
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TestDto>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(TestDto testDto) {
                return testDto.getTimeStamp() * 1000;
            }
      })
      .keyBy("name")
      .window(TumblingEventTimeWindows.of(Time.seconds(15)))
      // 迟到数据,在事件时间窗口内的消息,窗口结束后的5秒内收到的消息都被允许
      .allowedLateness(Time.seconds(5))
      // 迟到太多的数据会被放到侧输出流中进行补偿处理
      .sideOutputLateData(testDtoOutputTag)
      .aggregate(new AggregateFunction<TestDto, TestDto, TestDto>() {
            // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
            @Override
            public TestDto createAccumulator() {
                return new TestDto().setAge(0);
            }
            // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。
            // 方法传入两个参数:当前新到的数据value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
            @Override
            public TestDto add(TestDto input, TestDto init) {
                return input.setAge(init.getAge() + input.getAge());
            }
            // 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
            // 比如之前我们提到的计算平均值,就可以把sum和count作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
            @Override
            public TestDto getResult(TestDto testDto) {
                return testDto.setTimeStamp(new Date().getTime() / 1000).setDate(LocalDateTime.now().toString());
            }

            // 合并两个累加器,并将合并后的状态作为一个累加器返回。
            // 这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
            @Override
            public TestDto merge(TestDto testDto, TestDto acc1) {
                return null;
            }
      }, new ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>() {
            // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
            @Override
            public void process(Tuple tuple, ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>.Context context, Iterable<TestDto> iterable, Collector<TestDto> collector) throws Exception {
                iterable.forEach(item -> {
                  SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                  item.setWindowStartTime(context.window().getStart());
                  item.setWindowEndTime(context.window().getEnd());
                  log.info("窗口:[{}, {}) 当前时间:{}, item: {}", format.format(new Date(context.window().getStart())),
                            format.format(new Date(context.window().getEnd())), format.format(new Date()), item);
                  collector.collect(item);
                });
            }
      });
四、Flink 摆设

服务启动

● 进入 flink 的 bin 目次,启动 start-cluster.sh
./start-cluster.sh
● 进入 flink 的 conf 目次,在 flink-conf.yaml 文件中检察启动端口(rest.port 参数值)
vi flink-conf.yaml

● 输入服务器 ip 及端口
https://i-blog.csdnimg.cn/direct/c1ae8bc8888a441cb49be3c82a258c41.png
● 启动完成
页面操作使命

上传 jar 文件

将使用流处置处罚 API 实现的步伐打包为 jar 文件,并上传
https://i-blog.csdnimg.cn/direct/d6d36eefe89e433ab7819eefb9d8a0f0.png
上传乐成
https://i-blog.csdnimg.cn/direct/d59c8379823e43ceb359e1b795345668.png
配置启动类并启动使命

https://i-blog.csdnimg.cn/direct/5831e3b221eb4c63ad74089a95bcc4bf.png
配置完之后点击【Submit】即可实行使命
https://i-blog.csdnimg.cn/direct/b37d8d6b31074d25aea01e0f52bfc1cf.png
制止使命

https://i-blog.csdnimg.cn/direct/495ada886ffa49b4a05cc13cc9328680.png
https://i-blog.csdnimg.cn/direct/14ac3d91f9714921bb44223ab974f623.png
下令操作使命

进入 flink 的 bin 目次下
检察正在实行的使命列表

./flink list
结果展示
https://i-blog.csdnimg.cn/direct/0cf24bfce84d40b0a733984dc847d031.png
取消使命

● 取消正在实行的指定使命
./flink cancel 【正在执行的任务的id值】-s 【保存点文件夹名】

./flink cancel 【正在执行的任务的id值】 (不会有保存点文件)

./flink cancel-s 【正在执行的任务的id值】

例:
./flink cancel 0fa34b7daa017c510adb3692e55d4c96 -s 1234

./flink cancel 0fa34b7daa017c510adb3692e55d4c96 (不会有保存点文件)

./flink cancel -s 0fa34b7daa017c510adb3692e55d4c96
结果展示
https://i-blog.csdnimg.cn/direct/c235321c64874794a18b5a62aed6fc7f.png
启动使命

● 无安全点文件启动使命
./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】

例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
● 有安全点文件启动使命(前提是取消使命时有安全点文件保存)
./flink run -p 【Parallelism】 -s 【执行取消任务的安全点的目录】-c 【EntryClass】[--allowNonRestoredState] 【已上传jar包的路径】 【Program Arguments】

注: --allowNonRestoredState 非必填,可绕过保存点恢复的错误继续启动任务,绕过错误可能会丢失数据,可先不带此配置启动,报错后再加上执行启动命令也可

例:./flink run -p 1 -s file:/tmp/flink/flinkpoint/savepoints/savepoint-029d78-73d8367621d7 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
结果展示
https://i-blog.csdnimg.cn/direct/72731fceb6b34f1c99ac61c1ef49e67f.png
启动使命时 jar 包路径可通过以下两种方式获取:
● 上传 jar 包至 linux 体系后直接使用
● 在页面上传文件,并通过以下步骤获取:
○ 在 flink 页面上传文件完成,检察如下内容
https://i-blog.csdnimg.cn/direct/b54e39c094da485e99c948fb6f5b9cd6.png
○ 在 linux 中检察上一步拿到的路径,该路径就是已上传的 jar 包的缓存文件
https://i-blog.csdnimg.cn/direct/325bf367e55f42279cd9502941e72a2a.png
五、Flink 扩容

flink 扩容是对 TaskManager 数目的扩大,相对应将实行使命的并行度随之扩大。
https://i-blog.csdnimg.cn/direct/84f45d5cf1574953aff964776a78e308.png
flink 使命卡槽设置


[*]必要在服务启动前配置
[*]flink-conf.yaml 中配置 key/value 的时候在“:”反面必要有一个空格,否则配置不会生效。
[*]单个 TaskManager 的使命卡槽数目需根据服务器资源配置
进入 flink 的 conf 目次,在 flink-conf.yaml 文件中检察单个 TaskManager 的使命卡槽数目(taskmanager.numberOfTaskSlots 参数值):
vi flink-conf.yaml

flink 使命并行度设置


[*]必要在使命启动时配置
[*]修改并行度时不能实时生效,必要重启使命
页面设置

https://i-blog.csdnimg.cn/direct/e862517893b2499a8c265b850c69fd7f.png
下令设置

./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】

例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink使用详解