[flink 实时流基础] 输出算子(Sink)

莱莱  金牌会员 | 2024-8-12 06:11:35 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 574|帖子 574|积分 1722

学习笔记
Flink作为数据处置惩罚框架,终极还是要把计算处置惩罚的效果写入外部存储,为外部应用提供支持。

  

  
连接到外部体系

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource雷同,addSink方法对应着一个“Sink”算子,紧张就是用来实现与外部体系连接、并将数据提交写入的;Flink步伐中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12从前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样必要传入一个参数,实现的是SinkFunction接口。在这个接口中只必要重写一个方法invoke(),用来将指定的值写入到外部体系中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
固然,Sink多数情况下同样并不必要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表现将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方体系连接器:
   https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/

  我们可以看到,像Kafka之类流式体系,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储体系,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方体系与Flink的连接器。

除此以外,就必要用户自定义实现sink连接器了。
输出到文件

Flink专门提供了一个流式文件体系的连接器:FileSink,为批处置惩罚和流处置惩罚提供了一个同一的Sink,它可以将分区文件写入Flink支持的文件体系。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:


  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
  1. public class SinkFile {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         // 每个目录中,都有 并行度个数的 文件在写入
  5.         env.setParallelism(2);
  6.         
  7.         // 必须开启checkpoint,否则一直都是 .inprogress
  8.         env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
  9.         DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
  10.                 new GeneratorFunction<Long, String>() {
  11.                     @Override
  12.                     public String map(Long value) throws Exception {
  13.                         return "Number:" + value;
  14.                     }
  15.                 },
  16.                 Long.MAX_VALUE,
  17.                 RateLimiterStrategy.perSecond(1000),
  18.                 Types.STRING
  19.         );
  20.         DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
  21.         // 输出到文件系统
  22.         FileSink<String> fieSink = FileSink
  23.                 // 输出行式存储的文件,指定路径、指定编码
  24.                 .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
  25.                 // 输出文件的一些配置: 文件名的前缀、后缀
  26.                 .withOutputFileConfig(
  27.                         OutputFileConfig.builder()
  28.                                 .withPartPrefix("atguigu-")
  29.                                 .withPartSuffix(".log")
  30.                                 .build()
  31.                 )
  32.                 // 按照目录分桶:如下,就是每个小时一个目录
  33.                 .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
  34.                 // 文件滚动策略:  1分钟 或 1m
  35.                 .withRollingPolicy(
  36.                         DefaultRollingPolicy.builder()
  37.                                 .withRolloverInterval(Duration.ofMinutes(1))
  38.                                 .withMaxPartSize(new MemorySize(1024*1024))
  39.                                 .build()
  40.                 )
  41.                 .build();
  42.         dataGen.sinkTo(fieSink);
  43.         env.execute();
  44.     }
  45. }
复制代码
输出到 Kafka

(1)添加Kafka 连接器依靠
由于我们已经测试过从Kafka数据源读取数据,连接器相关依靠已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码
  1. public class SinkKafka {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         // 如果是精准一次,必须开启checkpoint(后续章节介绍)
  6.         env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
  7.         SingleOutputStreamOperator<String> sensorDS = env
  8.                 .socketTextStream("hadoop102", 7777);
  9.         /**
  10.          * Kafka Sink:
  11.          * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
  12.          * 1、开启checkpoint(后续介绍)
  13.          * 2、设置事务前缀
  14.          * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
  15.          */
  16.         KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  17.                 // 指定 kafka 的地址和端口
  18.                 .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
  19.                 // 指定序列化器:指定Topic名称、具体的序列化
  20.                 .setRecordSerializer(
  21.                         KafkaRecordSerializationSchema.<String>builder()
  22.                                 .setTopic("ws")
  23.                                 .setValueSerializationSchema(new SimpleStringSchema())
  24.                                 .build()
  25.                 )
  26.                 // 写到kafka的一致性级别: 精准一次、至少一次
  27.                 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  28.                 // 如果是精准一次,必须设置 事务的前缀
  29.                 .setTransactionalIdPrefix("atguigu-")
  30.                 // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
  31.                 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
  32.                 .build();
  33.         sensorDS.sinkTo(kafkaSink);
  34.         env.execute();
  35.     }
  36. }
复制代码
自定义序列化器,实现带key的record:
  1. public class SinkKafkaWithKey {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         env.setParallelism(1);
  5.         env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
  6.         env.setRestartStrategy(RestartStrategies.noRestart());
  7.         SingleOutputStreamOperator<String> sensorDS = env
  8.                 .socketTextStream("hadoop102", 7777);
  9.         /**
  10.          * 如果要指定写入kafka的key,可以自定义序列化器:
  11.          * 1、实现 一个接口,重写 序列化 方法
  12.          * 2、指定key,转成 字节数组
  13.          * 3、指定value,转成 字节数组
  14.          * 4、返回一个 ProducerRecord对象,把key、value放进去
  15.          */
  16.         KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  17.                 .setBootstrapServers("hadoop102:9092,hadoop103:9092,
  18.         hadoop104:9092")
  19.                 .setRecordSerializer(
  20.                         new KafkaRecordSerializationSchema<String>() {
  21.                             @Nullable
  22.                             @Override
  23.                             public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
  24.                                 String[] datas = element.split(",");
  25.                                 byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
  26.                                 byte[] value = element.getBytes(StandardCharsets.UTF_8);
  27.                                 return new ProducerRecord<>("ws", key, value);
  28.                             }
  29.                         }
  30.                 )
  31.                 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  32.                 .setTransactionalIdPrefix("atguigu-")
  33.                 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
  34.                 .build();
  35.         sensorDS.sinkTo(kafkaSink);
  36.         env.execute();
  37.     }
  38. }
复制代码
输出到 mysql

写入数据的MySQL的测试步骤如下。
(1)添加依靠
添加MySQL驱动:
  1. <dependency>
  2.     <groupId>mysql</groupId>
  3.     <artifactId>mysql-connector-java</artifactId>
  4.     <version>8.0.27</version>
  5. </dependency>
复制代码
官方还未提供flink-connector-jdbc的1.17.0的正式依靠,暂时从apache snapshot堆栈下载,pom文件中指定堆栈路径:
  1. <repositories>
  2.     <repository>
  3.         <id>apache-snapshots</id>
  4.         <name>apache snapshots</name>
  5. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  6.     </repository>
  7. </repositories>
复制代码
添加依靠:
  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-connector-jdbc</artifactId>
  4.     <version>1.17-SNAPSHOT</version>
  5. </dependency>
复制代码
假如不见效,还必要修改本地maven的配置文件,mirrorOf中添加如下标红内容:
  1. <mirror>
  2.     <id>aliyunmaven</id>
  3.     <mirrorOf>*,!apache-snapshots</mirrorOf>
  4.     <name>阿里云公共仓库</name>
  5.     <url>https://maven.aliyun.com/repository/public</url>
  6. </mirror>
复制代码
(2)启动MySQL,在test库下建表ws
  1. mysql>
  2. CREATE TABLE ws (
  3. id varchar(100) NOT NULL,
  4. ts bigint(20) DEFAULT NULL,
  5. vc int(11) DEFAULT NULL,
  6. PRIMARY KEY (id)
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8
复制代码
(3)编写输出到MySQL的示例代码
  1. public class SinkMySQL {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. SingleOutputStreamOperator<WaterSensor> sensorDS = env
  6. .socketTextStream("hadoop102", 7777)
  7. .map(new WaterSensorMapFunction());
  8. /**
  9.      * TODO 写入mysql
  10.      * 1、只能用老的sink写法: addsink
  11.      * 2、JDBCSink的4个参数:
  12.      *    第一个参数: 执行的sql,一般就是 insert into
  13.      *    第二个参数: 预编译sql, 对占位符填充值
  14.      *    第三个参数: 执行选项 ---》 攒批、重试
  15.      *    第四个参数: 连接选项 ---》 url、用户名、密码
  16.      */
  17. SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
  18.     "insert into ws values(?,?,?)",
  19.     new JdbcStatementBuilder<WaterSensor>() {
  20.         @Override
  21.         public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
  22.             //每收到一条WaterSensor,如何去填充占位符
  23.             preparedStatement.setString(1, waterSensor.getId());
  24.             preparedStatement.setLong(2, waterSensor.getTs());
  25.             preparedStatement.setInt(3, waterSensor.getVc());
  26.         }
  27.     },
  28.     JdbcExecutionOptions.builder()
  29.     .withMaxRetries(3) // 重试次数
  30.     .withBatchSize(100) // 批次的大小:条数
  31.     .withBatchIntervalMs(3000) // 批次的时间
  32.     .build(),
  33.     new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  34.     .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
  35.     .withUsername("root")
  36.     .withPassword("000000")
  37.     .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
  38.     .build()
  39. );
  40. sensorDS.addSink(jdbcSink);
  41. env.execute();
  42. }
  43. }
复制代码
(4)运行代码,用客户端连接MySQL,检察是否成功写入数据。
自定义 sink

假如我们想将数据存储到我们自己的存储装备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source雷同,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction());
在实现SinkFunction的时候,必要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储体系都有效;不过自定义Sink想要实现状态一致性并不容易,以是一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莱莱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表