本文档使用 flink-1.13.1 版本依赖
一、Flink 常用流处置处罚 API
getExecutionEnvironment
创建实行环境
- StreamExecutionEnvironment environment =
- StreamExecutionEnvironment.getExecutionEnvironment();
复制代码 setParallelism
为实行环境设置并行度
- environment.setParallelism(1);
复制代码 addSource
给实行环境指定数据来源
- // 设置 RabbitMQ 连接配置
- RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
- .setHost("192.168.117.4")
- .setPort(5672)
- .setVirtualHost("/")
- .setUserName("mix")
- .setPassword("jovision")
- .build();
- // 创建侧输出流
- OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
- // 创建 RabbitMQ 数据源,获取名为demo.in队列中的消息
- DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
- connectionConfig,
- "demo.in",
- true,
- new SimpleStringSchema()
- ));
复制代码 map
用于将吸收到的流转化成目的数据类型
- // 将所有接受的数据分出两个相同的流对象,侧输出流用于测试分组能力
- SingleOutputStreamOperator<TestDto> originStream = rabbitMQStream.map(new MapFunction<String, TestDto>() {
- @Override
- public TestDto map(String s) throws Exception {
- TestDto testDto = JSON.parseObject(s, TestDto.class);
- return testDto;
- }
- });
复制代码 OutputTag
侧输出流,可拷贝来源流并进行其他逻辑操作
- // 创建侧输出流
- OutputTag<TestDto> maxTestStream = new OutputTag<TestDto>("maxTest"){};
- // 将所有接受的数据分出两个流对象,侧输出流用于测试分组能力
- SingleOutputStreamOperator<TestDto> originStream = originStream.process(new ProcessFunction<TestDto, TestDto>() {
- @Override
- public void processElement(TestDto testDto, ProcessFunction<TestDto, TestDto>.Context context, Collector<TestDto> collector) throws Exception {
- // 通过age大小分流
- if(testDto.getAge() > 10){
- // 将流数据塞入上面创建的maxTestStream侧输出流
- context.output(maxTestStream, testDto);
- }else{
- // 将流数据依然放入当前输出流
- collector.collect(testDto);
- }
- }
- });
复制代码 print
打印流信息
- originStream.print("sumStream");
复制代码 filter
用于过滤吸收的流
- // 过滤接收的数据,只接收age=11的流数据放入sinkStream流对象
- SingleOutputStreamOperator<String> sinkStream = originStream.filter(new FilterFunction<TestDto>() {
- @Override
- public boolean filter(TestDto testDto) throws Exception {
- return testDto.getAge() == 11;
- }
- }).map(new MapFunction<TestDto, String>() {
- // 转成String
- @Override
- public String map(TestDto testDto) throws Exception {
- return JSON.toJSONString(testDto);
- }
- });
复制代码 getSideOutput
获取测输出流数据
- // 上面是在originStream流对象将流数据分给maxTestStream侧输出流的,所以需要如下调用
- DataStream<TestDto> sideOutput = originStream.getSideOutput(maxTestStream);
复制代码 keyBy
将获取到的流对指定属性分组
- // 按照对象的name进行分组
- KeyedStream<TestDto, Tuple> keyedStream = sideOutput.keyBy("name");
复制代码 max
分组后返回指定属性最大的值(返回的指定属性最大的值是正确的,但是其中的其他属性大概不准确)
- DataStream<TestDto> maxStream = keyedStream.max("age");
复制代码 maxBy
分组后返回指定属性最大的值(返回的最大值与相应的其他字段内容都是准确的)
- DataStream<TestDto> maxByStream = keyedStream.maxBy("age");
复制代码 sum
分组后返回指定属性累加值
- DataStream<TestDto> sumStream = keyedStream.sum("age");
复制代码 reduce
分组后可操作新数据及上一次数据,可进行累加、比较等逻辑操作
- // 每次收到数据会按照分组信息分组,并累加上一次的age数据后存入reduceStream流对象,也可实现其他逻辑
- SingleOutputStreamOperator<TestDto> reduceStream = keyedStream.reduce(new ReduceFunction<TestDto>() {
- @Override
- public TestDto reduce(TestDto beforeDto, TestDto nowDto) throws Exception {
- log.info("----------- reduce beforeDto:" + beforeDto);
- log.info("----------- reduce nowDto:" + nowDto);
- return beforeDto.setAge(beforeDto.getAge() + nowDto.getAge());
- }
- });
复制代码 connect
可归并两个不同数据类型的流
- // 连接两个不同数据类型的流,类型为String的流对象sinkStream与类型为TestDto的流对象sideOutput合并
- ConnectedStreams<String, TestDto> connectStream = sinkStream.connect(sideOutput);
- DataStream<Object> connectObjectStream = connectStream.map(new CoMapFunction<String, TestDto, Object>() {
- @Override
- public Object map1(String s) throws Exception {
- // 对第一个sinkStream流对象中的数据进行操作并返回
- return new Tuple2<String,String>("stringStream",s);
- }
- @Override
- public Object map2(TestDto testDto) throws Exception {
- // 对第二个sideOutput流对象中的数据进行操作并返回
- return new Tuple2<String,TestDto>("objectStream",testDto);
- }
- });
复制代码 union
可归并两个相同数据类型的流
- // union 的使用(同一种数据类型的流才能用这个方法)
- DataStream<TestDto> unionStream = sideOutput.union(originStream);
复制代码 addSink
为流对象指定数据发送目的
- // 设置 RabbitMQ 连接配置
- RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
- .setHost("192.168.117.4")
- .setPort(5672)
- .setVirtualHost("/")
- .setUserName("mix")
- .setPassword("jovision")
- .build();
- // 将数据发送至名为demo.out的队列
- sinkStream.addSink(new RMQSink<>(
- connectionConfig,
- "demo.out",
- new SimpleStringSchema()
- ));
复制代码 execute
运行实行环境(在 addSink 之后实行)
- // 执行
- environment.execute();
复制代码 二、Flink 连接器(数据源、数据写入)
在 Flink 官网中,当前自带的且常用的连接器如下:
RabbitMQ 已经有自带的【数据源连接器】以及【数据写入连接器】。
Redis、jdbc 只有自带的【数据写入连接器】,所以数据源连接器必要实现【SourceFunction】接口进行自定义。
RabbitMQ 连接器
数据源连接器(可实时斲丧消息)
- RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
- .setHost("192.168.117.4")
- .setPort(5672)
- .setVirtualHost("/")
- .setUserName("mix")
- .setPassword("jovision")
- .build();
- // 创建 RabbitMQ 数据源
- DataStream<String> rabbitMQStream = environment.addSource(new RMQSource<>(
- connectionConfig,
- "demo.in",
- true,
- new SimpleStringSchema()
- ));
复制代码 数据写入连接器
- rabbitMQStream.addSink(new RMQSink<>(
- connectionConfig,
- "demo.out",
- new SimpleStringSchema()
- ));
复制代码 Kafka 连接器
数据源连接器(可实时斲丧消息)
- Properties sourceProperties = new Properties();
- sourceProperties.setProperty("bootstrap.servers", "192.168.2.198:9092");
- sourceProperties.setProperty("group.id", "test");
- // 创建Kafka消费者
- FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
- "vse.unicom.payload.channel", // 源 topic
- new SimpleStringSchema(), // 数据序列化方式
- sourceProperties // Kafka消费者配置
- );
- DataStream<String> sourceStream = environment.addSource(consumer);
复制代码 数据写入连接器
- Properties sinkProperties = new Properties();
- sinkProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.198:9092");
- sinkProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");
- sinkProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
- sinkProperties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
- sinkProperties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
- sinkProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- sinkProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- sinkProperties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "4194304");
- FlinkKafkaProducer<String> channelProducer = new FlinkKafkaProducer<String>(
- "vse.unicom.payload.tenant", // 目标 topic
- new SimpleStringSchema(), // 序列化 schema
- sinkProperties);
- sourceStream.addSink(channelProducer);
复制代码 Redis 连接器
数据源连接器(需自定义,单次实行,不可监听 redis 数据变更)
- public static class RedisSource implements SourceFunction<Tuple3<String,String,String>> {
- @Override
- public void run(SourceContext<Tuple3<String,String,String>> sourceContext) throws Exception {
- // 连接 Redis
- Jedis jedis = new Jedis("192.168.117.4", 6379);
- // 设置密码(如果需要的话)
- jedis.auth("redis@Abc-1234");
- // 选择数据库
- jedis.select(0);
- for (int i = 0; i < 10; i++){
- String key = "flink:test" + i;
- String key2 = "flink:map" + i;
- jedis.set( key, String.valueOf(new Random().nextInt()));
- sourceContext.collect(new Tuple3<>(key,key2,jedis.get(key)));
- }
- }
- @Override
- public void cancel() {
- }
- }
复制代码 数据写入连接器
- public static class CoustomRedisSink implements RedisMapper<Tuple2<String, String>> {
- @Override
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(RedisCommand.SET);
- }
- @Override
- public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
- return stringStringTuple2._1();
- }
- @Override
- public String getValueFromData(Tuple2<String, String> stringStringTuple2) {
- return stringStringTuple2._2();
- }
- }
复制代码 jdbc 连接器
数据源连接器(需自定义,单次实行,不可监听 mysql 数据变更)
- public static class MysqlSource implements SourceFunction<MeshVcDto> {
- @Override
- public void run(SourceContext<MeshVcDto> sourceContext) throws Exception {
- // 定义数据库连接信息
- String dbURL = "jdbc:mysql://192.168.117.4:3306/jvs_tdms";
- String username = "root";
- String password = "Jo123@My";
- // 连接数据库
- Connection conn = DriverManager.getConnection(dbURL, username, password);
- // 执行查询
- String query = "SELECT id, device_id, mesh_vc, add_time FROM udms_mesh_vc_log where verification_code is null";
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(query);
- // 处理查询结果
- while (rs.next()) {
- Long id = rs.getLong("id");
- String deviceId = rs.getString("device_id");
- String meshVc = rs.getString("mesh_vc");
- Date addTime = rs.getDate("add_time");
- sourceContext.collect(new MeshVcDto().setId(id).setMeshVc(meshVc).setDeviceId(deviceId).setAddTime(addTime));
- }
- }
- @Override
- public void cancel() {
- }
- }
复制代码 数据写入连接器
- dataStream.addSink(JdbcSink.sink("update udms_mesh_vc_log set device_id = ?, mesh_vc = ? where id = ?", new JdbcStatementBuilder<MeshVcDto>() {
- @Override
- public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
- preparedStatement.setString(1, meshVcDto.getDeviceId());
- preparedStatement.setString(2, meshVcDto.getMeshVc());
- preparedStatement.setLong(3, meshVcDto.getId());
- }
- },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withDriverName("com.mysql.jdbc.Driver")
- .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
- .withUsername("root")
- .withPassword("Jo123@My")
- .build()));
- dataStream.addSink(JdbcSink.sink("insert into udms_mesh_vc_log (device_id, mesh_vc) values (?,?)", new JdbcStatementBuilder<MeshVcDto>() {
- @Override
- public void accept(PreparedStatement preparedStatement, MeshVcDto meshVcDto) throws SQLException {
- preparedStatement.setString(1, meshVcDto.getDeviceId());
- preparedStatement.setString(2, meshVcDto.getMeshVc());
- }
- },new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withDriverName("com.mysql.jdbc.Driver")
- .withUrl("jdbc:mysql://192.168.117.4:3306/jvs_tdms")
- .withUsername("root")
- .withPassword("Jo123@My")
- .build()));
复制代码 三、Flink 常用 Window
窗口计算,必要先分组,然后指定窗口类型,然后编写计算逻辑
- Window 可以分为两大类:
- CountWindow:按照指定的数据条数生产一个 Window,只有数据数目有关,分为如下两类
- 滚动计数窗口 ( Tumbling Count Window),窗口没有重叠
- 滑动计数窗口 ( Sliding Count Window),窗口有重叠
- TimeWindow:按照时间生成 Window,根据窗口实现原理的不同分为三类:
- 滚动时间窗口 ( Tumbling Time Window),窗口没有重叠
- 滑动时间窗口 ( Sliding Time Window),窗口有重叠
- 会话窗口 ( Session Window),窗口开始结束时间不固定,在一个固定的时间周期没有新的元素吸收,会自动关闭窗口
CountWindow
滚动计数窗口
- // 只有当前这个name的消息收到第3次,才会计算前三次的age和,否则不会计算并输出至mq
- SingleOutputStreamOperator<String> map1 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
- @Override
- public TestDto map(String s) throws Exception {
- return JSON.parseObject(s, TestDto.class);
- }
- }).keyBy("name")
- .countWindow(3)
- .sum("age").map(new MapFunction<TestDto, String>() {
- @Override
- public String map(TestDto testDto) throws Exception {
- return JSON.toJSONString(testDto);
- }
- });
复制代码 滑动计数窗口
- // 只有当前这个name的消息收到第2次,才会计算前5次的和,否则不会计算并输出至mq
- SingleOutputStreamOperator<String> map2 = rabbitMQStream.map(new MapFunction<String, TestDto>() {
- @Override
- public TestDto map(String s) throws Exception {
- return JSON.parseObject(s, TestDto.class);
- }
- }).keyBy("name")
- .countWindow(5, 2)
- .sum("age").map(new MapFunction<TestDto, String>() {
- @Override
- public String map(TestDto testDto) throws Exception {
- return JSON.toJSONString(testDto);
- }
- });
复制代码 TimeWindow
滚动时间窗口
留意事项:
触发计算的动作与并行度巨细有关系,一个窗口只有吸收消息的数目达到并行度之后,才气触发上个窗口的计算并输出
比方:设置为 1 并行度时,[0,15)窗口已颠末去,但是不会立刻输出计算结果,来到[15,30)窗口,只有吸收了 1 条消息,才会触发[0,15)窗口的计算并输出
- // 迟到较晚的流数据存储至本侧输出流
- OutputTag<TestDto> testDtoOutputTag = new OutputTag<TestDto>("迟到了") {};
- // 只有当前这个name的消息一直收到15s,才会计算这15s时间段的age和,否则不会计算并输出至mq
- SingleOutputStreamOperator<TestDto> name = rabbitMQStream
- // window截止后继续等2秒,将window范围内的时间加入到计算
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TestDto>(Time.seconds(2)) {
- @Override
- public long extractTimestamp(TestDto testDto) {
- return testDto.getTimeStamp() * 1000;
- }
- })
- .keyBy("name")
- .window(TumblingEventTimeWindows.of(Time.seconds(15)))
- // 迟到数据,在事件时间窗口内的消息,窗口结束后的5秒内收到的消息都被允许
- .allowedLateness(Time.seconds(5))
- // 迟到太多的数据会被放到侧输出流中进行补偿处理
- .sideOutputLateData(testDtoOutputTag)
- .aggregate(new AggregateFunction<TestDto, TestDto, TestDto>() {
- // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- @Override
- public TestDto createAccumulator() {
- return new TestDto().setAge(0);
- }
- // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。
- // 方法传入两个参数:当前新到的数据value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
- @Override
- public TestDto add(TestDto input, TestDto init) {
- return input.setAge(init.getAge() + input.getAge());
- }
- // 从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。
- // 比如之前我们提到的计算平均值,就可以把sum和count作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
- @Override
- public TestDto getResult(TestDto testDto) {
- return testDto.setTimeStamp(new Date().getTime() / 1000).setDate(LocalDateTime.now().toString());
- }
- // 合并两个累加器,并将合并后的状态作为一个累加器返回。
- // 这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
- @Override
- public TestDto merge(TestDto testDto, TestDto acc1) {
- return null;
- }
- }, new ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>() {
- // 一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出
- @Override
- public void process(Tuple tuple, ProcessWindowFunction<TestDto, TestDto, Tuple, TimeWindow>.Context context, Iterable<TestDto> iterable, Collector<TestDto> collector) throws Exception {
- iterable.forEach(item -> {
- SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- item.setWindowStartTime(context.window().getStart());
- item.setWindowEndTime(context.window().getEnd());
- log.info("窗口:[{}, {}) 当前时间:{}, item: {}", format.format(new Date(context.window().getStart())),
- format.format(new Date(context.window().getEnd())), format.format(new Date()), item);
- collector.collect(item);
- });
- }
- });
复制代码 四、Flink 摆设
服务启动
● 进入 flink 的 bin 目次,启动 start-cluster.sh
● 进入 flink 的 conf 目次,在 flink-conf.yaml 文件中检察启动端口(rest.port 参数值)
● 输入服务器 ip 及端口
● 启动完成
页面操作使命
上传 jar 文件
将使用流处置处罚 API 实现的步伐打包为 jar 文件,并上传
上传乐成
配置启动类并启动使命
配置完之后点击【Submit】即可实行使命
制止使命
下令操作使命
进入 flink 的 bin 目次下
检察正在实行的使命列表
结果展示
取消使命
● 取消正在实行的指定使命
- ./flink cancel 【正在执行的任务的id值】-s 【保存点文件夹名】
- 或
- ./flink cancel 【正在执行的任务的id值】 (不会有保存点文件)
- 或
- ./flink cancel -s 【正在执行的任务的id值】
- 例:
- ./flink cancel 0fa34b7daa017c510adb3692e55d4c96 -s 1234
- 或
- ./flink cancel 0fa34b7daa017c510adb3692e55d4c96 (不会有保存点文件)
- 或
- ./flink cancel -s 0fa34b7daa017c510adb3692e55d4c96
复制代码 结果展示
启动使命
● 无安全点文件启动使命
- ./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】
- 例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
复制代码 ● 有安全点文件启动使命(前提是取消使命时有安全点文件保存)
- ./flink run -p 【Parallelism】 -s 【执行取消任务的安全点的目录】-c 【EntryClass】[--allowNonRestoredState] 【已上传jar包的路径】 【Program Arguments】
- 注: --allowNonRestoredState 非必填,可绕过保存点恢复的错误继续启动任务,绕过错误可能会丢失数据,可先不带此配置启动,报错后再加上执行启动命令也可
- 例:./flink run -p 1 -s file:/tmp/flink/flinkpoint/savepoints/savepoint-029d78-73d8367621d7 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
复制代码 结果展示
启动使命时 jar 包路径可通过以下两种方式获取:
● 上传 jar 包至 linux 体系后直接使用
● 在页面上传文件,并通过以下步骤获取:
○ 在 flink 页面上传文件完成,检察如下内容
○ 在 linux 中检察上一步拿到的路径,该路径就是已上传的 jar 包的缓存文件
五、Flink 扩容
flink 扩容是对 TaskManager 数目的扩大,相对应将实行使命的并行度随之扩大。
flink 使命卡槽设置
- 必要在服务启动前配置
- flink-conf.yaml 中配置 key/value 的时候在“:”反面必要有一个空格,否则配置不会生效。
- 单个 TaskManager 的使命卡槽数目需根据服务器资源配置
进入 flink 的 conf 目次,在 flink-conf.yaml 文件中检察单个 TaskManager 的使命卡槽数目(taskmanager.numberOfTaskSlots 参数值):
flink 使命并行度设置
- 必要在使命启动时配置
- 修改并行度时不能实时生效,必要重启使命
页面设置
下令设置
- ./flink run -p 【Parallelism】 -c 【EntryClass】 【已上传jar包的路径】 【Program Arguments】
- 例:./flink run -p 1 -c com.jovision.pass.flink.task.DataMetering2Starter /tmp/flink/jobjar/flink-web-upload/c99eb1e1-bf9d-455f-a989-635714370802_jvs_flink-jar-with-dependencies.jar --propPath /opt/middle/jobconfig/dataMeteringJob.properties
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |