大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
点一下关注吧!!!非常感谢!!连续更新!!!现在已经更新到了:
[*]Hadoop(已更完)
[*]HDFS(已更完)
[*]MapReduce(已更完)
[*]Hive(已更完)
[*]Flume(已更完)
[*]Sqoop(已更完)
[*]Zookeeper(已更完)
[*]HBase(已更完)
[*]Redis (已更完)
[*]Kafka(已更完)
[*]Spark(已更完)
[*]Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
[*]Sink 的基本概念等内容
[*]Sink的相关信息 配置与利用
[*]Sink案例写入Redis
https://i-blog.csdnimg.cn/direct/42194e4d58384d128aa9c1352de0f794.png
JDBC Sink
在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。
Flink JDBC Sink 简介
Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包罗 MySQL。在利用 JDBC Sink 时,必要提供数据库毗连信息和 SQL 语句,通过这些信息,Flink 将数据流中的纪录插入或更新到 MySQL 表中。
Flink 到 MySQL 的基本步调
将数据流写入 MySQL 的步调主要包罗以下几点:
[*]依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常必要配置 Maven 或 Gradle。
[*]界说数据源和数据流:创建并处理数据流。
[*]配置 JDBC Sink:提供数据库的毗连信息和插入 SQL 语句。
[*]启动使命:将数据流写入 MySQL。
优化建议
在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:
[*]批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提拔写入性能。
[*]毗连池:对于高并发的写入操作,建议利用毗连池来减少数据库毗连开销。
[*]索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速率,因此必要权衡。
[*]数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。
案例:流数据下沉到MySQL
添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
编写代码
一个Person的类,对应MySQL中的一张表的字段。
模拟几条数据流,写入到 MySQL中。
package icu.wzk;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class SinkSqlTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Person> data = env.getJavaEnv().fromElements(
new Person("wzk", 18, 1),
new Person("icu", 20, 1),
new Person("wzkicu", 13, 2)
);
data.addSink(new MySqlSinkFunction());
env.execute();
}
public static class MySqlSinkFunction extends RichSinkFunction<Person> {
private PreparedStatement preparedStatement = null;
private Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
String username = "hive";
String password = "hive@wzk.icu";
connection = DriverManager.getConnection(url, username, password);
String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";
preparedStatement = connection.prepareStatement(sql);
}
@Override
public void invoke(Person value, Context context) throws Exception {
preparedStatement.setString(1, value.getName());
preparedStatement.setInt(2, value.getAge());
preparedStatement.setInt(3, value.getSex());
preparedStatement.executeUpdate();
}
@Override
public void close() throws Exception {
if (null != connection) {
connection.close();
}
if (null != preparedStatement) {
preparedStatement.close();
}
}
}
public static class Person {
private String name;
private Integer age;
private Integer sex;
public Person() {
}
public Person(String name, Integer age, Integer sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Integer getSex() {
return sex;
}
public void setSex(Integer sex) {
this.sex = sex;
}
}
}
数据库配置
我们新建一张表出来,person表,里边有我们必要的字段。
https://i-blog.csdnimg.cn/direct/a0e59e5804fb42bc93005ed6704e7a44.png
运行代码
我们运行代码,等候运行结束。
https://i-blog.csdnimg.cn/direct/800059a310034cd5ba3e40d78b57151e.png
查察结果
查察数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。
https://i-blog.csdnimg.cn/direct/97cbec4fe59d4c858149c5214b1a97ab.png
案例:写入到Kafka
编写代码
package icu.wzk;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class SinkKafkaTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);
String brokerList = "h121.wzk.icu:9092";
String topic = "flink_test";
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
data.addSink(producer);
env.execute("SinkKafkaTest");
}
}
运行代码
启动一个 nc
nc -lk 9999
我们通过回车的方式,可以发送数据。
https://i-blog.csdnimg.cn/direct/14f6bef9651b4aaabddf77d632df3c7a.png
Java 步伐中等候
https://i-blog.csdnimg.cn/direct/e77cfe04e2bd4f9f9031fc10f7f0142a.png
查察结果
我们登录到服务器查察信息
./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning
可以看到刚才的数据已经写入了:
https://i-blog.csdnimg.cn/direct/ec796a8f85f149d9ae15f71d94a5407c.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]