学习笔记
Flink作为数据处置惩罚框架,终极还是要把计算处置惩罚的效果写入外部存储,为外部应用提供支持。
连接到外部体系
Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource雷同,addSink方法对应着一个“Sink”算子,紧张就是用来实现与外部体系连接、并将数据提交写入的;Flink步伐中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12从前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样必要传入一个参数,实现的是SinkFunction接口。在这个接口中只必要重写一个方法invoke(),用来将指定的值写入到外部体系中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
固然,Sink多数情况下同样并不必要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表现将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方体系连接器:
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/connectors/datastream/overview/
我们可以看到,像Kafka之类流式体系,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储体系,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方体系与Flink的连接器。
除此以外,就必要用户自定义实现sink连接器了。
输出到文件
Flink专门提供了一个流式文件体系的连接器:FileSink,为批处置惩罚和流处置惩罚提供了一个同一的Sink,它可以将分区文件写入Flink支持的文件体系。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
- 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
- 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
- public class SinkFile {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 每个目录中,都有 并行度个数的 文件在写入
- env.setParallelism(2);
-
- // 必须开启checkpoint,否则一直都是 .inprogress
- env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
- DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
- new GeneratorFunction<Long, String>() {
- @Override
- public String map(Long value) throws Exception {
- return "Number:" + value;
- }
- },
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(1000),
- Types.STRING
- );
- DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");
- // 输出到文件系统
- FileSink<String> fieSink = FileSink
- // 输出行式存储的文件,指定路径、指定编码
- .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
- // 输出文件的一些配置: 文件名的前缀、后缀
- .withOutputFileConfig(
- OutputFileConfig.builder()
- .withPartPrefix("atguigu-")
- .withPartSuffix(".log")
- .build()
- )
- // 按照目录分桶:如下,就是每个小时一个目录
- .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
- // 文件滚动策略: 1分钟 或 1m
- .withRollingPolicy(
- DefaultRollingPolicy.builder()
- .withRolloverInterval(Duration.ofMinutes(1))
- .withMaxPartSize(new MemorySize(1024*1024))
- .build()
- )
- .build();
- dataGen.sinkTo(fieSink);
- env.execute();
- }
- }
复制代码 输出到 Kafka
(1)添加Kafka 连接器依靠
由于我们已经测试过从Kafka数据源读取数据,连接器相关依靠已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码
- public class SinkKafka {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- // 如果是精准一次,必须开启checkpoint(后续章节介绍)
- env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
- SingleOutputStreamOperator<String> sensorDS = env
- .socketTextStream("hadoop102", 7777);
- /**
- * Kafka Sink:
- * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
- * 1、开启checkpoint(后续介绍)
- * 2、设置事务前缀
- * 3、设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟
- */
- KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
- // 指定 kafka 的地址和端口
- .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
- // 指定序列化器:指定Topic名称、具体的序列化
- .setRecordSerializer(
- KafkaRecordSerializationSchema.<String>builder()
- .setTopic("ws")
- .setValueSerializationSchema(new SimpleStringSchema())
- .build()
- )
- // 写到kafka的一致性级别: 精准一次、至少一次
- .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- // 如果是精准一次,必须设置 事务的前缀
- .setTransactionalIdPrefix("atguigu-")
- // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
- .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
- .build();
- sensorDS.sinkTo(kafkaSink);
- env.execute();
- }
- }
复制代码 自定义序列化器,实现带key的record:
- public class SinkKafkaWithKey {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
- env.setRestartStrategy(RestartStrategies.noRestart());
- SingleOutputStreamOperator<String> sensorDS = env
- .socketTextStream("hadoop102", 7777);
- /**
- * 如果要指定写入kafka的key,可以自定义序列化器:
- * 1、实现 一个接口,重写 序列化 方法
- * 2、指定key,转成 字节数组
- * 3、指定value,转成 字节数组
- * 4、返回一个 ProducerRecord对象,把key、value放进去
- */
- KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
- .setBootstrapServers("hadoop102:9092,hadoop103:9092,
- hadoop104:9092")
- .setRecordSerializer(
- new KafkaRecordSerializationSchema<String>() {
- @Nullable
- @Override
- public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
- String[] datas = element.split(",");
- byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
- byte[] value = element.getBytes(StandardCharsets.UTF_8);
- return new ProducerRecord<>("ws", key, value);
- }
- }
- )
- .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
- .setTransactionalIdPrefix("atguigu-")
- .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
- .build();
- sensorDS.sinkTo(kafkaSink);
- env.execute();
- }
- }
复制代码 输出到 mysql
写入数据的MySQL的测试步骤如下。
(1)添加依靠
添加MySQL驱动:
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.27</version>
- </dependency>
复制代码 官方还未提供flink-connector-jdbc的1.17.0的正式依靠,暂时从apache snapshot堆栈下载,pom文件中指定堆栈路径:
- <repositories>
- <repository>
- <id>apache-snapshots</id>
- <name>apache snapshots</name>
- <url>https://repository.apache.org/content/repositories/snapshots/</url>
- </repository>
- </repositories>
复制代码 添加依靠:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc</artifactId>
- <version>1.17-SNAPSHOT</version>
- </dependency>
复制代码 假如不见效,还必要修改本地maven的配置文件,mirrorOf中添加如下标红内容:
- <mirror>
- <id>aliyunmaven</id>
- <mirrorOf>*,!apache-snapshots</mirrorOf>
- <name>阿里云公共仓库</name>
- <url>https://maven.aliyun.com/repository/public</url>
- </mirror>
复制代码 (2)启动MySQL,在test库下建表ws
- mysql>
- CREATE TABLE ws (
- id varchar(100) NOT NULL,
- ts bigint(20) DEFAULT NULL,
- vc int(11) DEFAULT NULL,
- PRIMARY KEY (id)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8
复制代码 (3)编写输出到MySQL的示例代码
- public class SinkMySQL {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- SingleOutputStreamOperator<WaterSensor> sensorDS = env
- .socketTextStream("hadoop102", 7777)
- .map(new WaterSensorMapFunction());
- /**
- * TODO 写入mysql
- * 1、只能用老的sink写法: addsink
- * 2、JDBCSink的4个参数:
- * 第一个参数: 执行的sql,一般就是 insert into
- * 第二个参数: 预编译sql, 对占位符填充值
- * 第三个参数: 执行选项 ---》 攒批、重试
- * 第四个参数: 连接选项 ---》 url、用户名、密码
- */
- SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
- "insert into ws values(?,?,?)",
- new JdbcStatementBuilder<WaterSensor>() {
- @Override
- public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
- //每收到一条WaterSensor,如何去填充占位符
- preparedStatement.setString(1, waterSensor.getId());
- preparedStatement.setLong(2, waterSensor.getTs());
- preparedStatement.setInt(3, waterSensor.getVc());
- }
- },
- JdbcExecutionOptions.builder()
- .withMaxRetries(3) // 重试次数
- .withBatchSize(100) // 批次的大小:条数
- .withBatchIntervalMs(3000) // 批次的时间
- .build(),
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
- .withUsername("root")
- .withPassword("000000")
- .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
- .build()
- );
- sensorDS.addSink(jdbcSink);
- env.execute();
- }
- }
复制代码 (4)运行代码,用客户端连接MySQL,检察是否成功写入数据。
自定义 sink
假如我们想将数据存储到我们自己的存储装备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source雷同,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction());
在实现SinkFunction的时候,必要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储体系都有效;不过自定义Sink想要实现状态一致性并不容易,以是一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |