大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka ...

打印 上一主题 下一主题

主题 832|帖子 832|积分 2496

点一下关注吧!!!非常感谢!!连续更新!!!

现在已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)
章节内容

上节我们完成了如下的内容:


  • Sink 的基本概念等内容
  • Sink的相关信息 配置与利用
  • Sink案例写入Redis

JDBC Sink

在 Apache Flink 中,通过 JDBC Sink,可以将处理后的数据写入到 MySQL 数据库中。这对于将实时处理的数据持久化或与其他系统进行集成非常有用。
Flink JDBC Sink 简介

Flink 提供了 JdbcSink,它是基于 JDBC 协议的 Sink,可以将数据写入各种关系型数据库,包罗 MySQL。在利用 JDBC Sink 时,必要提供数据库毗连信息和 SQL 语句,通过这些信息,Flink 将数据流中的纪录插入或更新到 MySQL 表中。
Flink 到 MySQL 的基本步调

将数据流写入 MySQL 的步调主要包罗以下几点:


  • 依赖库配置:确保在项目中引入了 Flink 和 MySQL 相关的依赖库,通常必要配置 Maven 或 Gradle。
  • 界说数据源和数据流:创建并处理数据流。
  • 配置 JDBC Sink:提供数据库的毗连信息和插入 SQL 语句。
  • 启动使命:将数据流写入 MySQL。
优化建议

在实际项目中,向 MySQL 插入大量数据时,应考虑以下优化策略:


  • 批量插入:通过 JdbcExecutionOptions 配置批量插入,可以大幅提拔写入性能。
  • 毗连池:对于高并发的写入操作,建议利用毗连池来减少数据库毗连开销。
  • 索引优化:为插入的表配置合适的索引,可以提高查询性能,但在大量写入时,索引可能会降低- 插入速率,因此必要权衡。
  • 数据分片:对于非常大规模的数据,可以考虑将数据分片并行写入不同的 MySQL 实例或分区表中。
案例:流数据下沉到MySQL

添加依赖

  1. <dependency>
  2.     <groupId>mysql</groupId>
  3.     <artifactId>mysql-connector-java</artifactId>
  4.     <version>8.0.28</version>
  5. </dependency>
复制代码
编写代码

一个Person的类,对应MySQL中的一张表的字段。
模拟几条数据流,写入到 MySQL中。
  1. package icu.wzk;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  5. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
  6. import java.sql.Connection;
  7. import java.sql.DriverManager;
  8. import java.sql.PreparedStatement;
  9. public class SinkSqlTest {
  10.     public static void main(String[] args) {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         DataStreamSource<Person> data = env.getJavaEnv().fromElements(
  13.                 new Person("wzk", 18, 1),
  14.                 new Person("icu", 20, 1),
  15.                 new Person("wzkicu", 13, 2)
  16.         );
  17.         data.addSink(new MySqlSinkFunction());
  18.         env.execute();
  19.     }
  20.     public static class MySqlSinkFunction extends RichSinkFunction<Person> {
  21.         private PreparedStatement preparedStatement = null;
  22.         private Connection connection = null;
  23.         @Override
  24.         public void open(Configuration parameters) throws Exception {
  25.             String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
  26.             String username = "hive";
  27.             String password = "hive@wzk.icu";
  28.             connection = DriverManager.getConnection(url, username, password);
  29.             String sql = "INSERT INTI PERSON(name, age, sex) VALUES(?, ?, ?)";
  30.             preparedStatement = connection.prepareStatement(sql);
  31.         }
  32.         @Override
  33.         public void invoke(Person value, Context context) throws Exception {
  34.             preparedStatement.setString(1, value.getName());
  35.             preparedStatement.setInt(2, value.getAge());
  36.             preparedStatement.setInt(3, value.getSex());
  37.             preparedStatement.executeUpdate();
  38.         }
  39.         @Override
  40.         public void close() throws Exception {
  41.             if (null != connection) {
  42.                 connection.close();
  43.             }
  44.             if (null != preparedStatement) {
  45.                 preparedStatement.close();
  46.             }
  47.         }
  48.     }
  49.     public static class Person {
  50.         private String name;
  51.         private Integer age;
  52.         private Integer sex;
  53.         public Person() {
  54.         }
  55.         public Person(String name, Integer age, Integer sex) {
  56.             this.name = name;
  57.             this.age = age;
  58.             this.sex = sex;
  59.         }
  60.         public String getName() {
  61.             return name;
  62.         }
  63.         public void setName(String name) {
  64.             this.name = name;
  65.         }
  66.         public Integer getAge() {
  67.             return age;
  68.         }
  69.         public void setAge(Integer age) {
  70.             this.age = age;
  71.         }
  72.         public Integer getSex() {
  73.             return sex;
  74.         }
  75.         public void setSex(Integer sex) {
  76.             this.sex = sex;
  77.         }
  78.     }
  79. }
复制代码
数据库配置

我们新建一张表出来,person表,里边有我们必要的字段。

运行代码

我们运行代码,等候运行结束。

查察结果

查察数据库中的数据,我们可以看到刚才模拟的数据已经成功写入了。

案例:写入到Kafka

编写代码

  1. package icu.wzk;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.scala.DataStream;
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  6. public class SinkKafkaTest {
  7.     public static void main(String[] args) {
  8.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9.         DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);
  10.         String brokerList = "h121.wzk.icu:9092";
  11.         String topic = "flink_test";
  12.         FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
  13.         data.addSink(producer);
  14.         env.execute("SinkKafkaTest");
  15.     }
  16. }
复制代码
运行代码

启动一个 nc
  1. nc -lk 9999
复制代码
我们通过回车的方式,可以发送数据。

Java 步伐中等候

查察结果

我们登录到服务器查察信息
  1. ./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning
复制代码
可以看到刚才的数据已经写入了:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

吴旭华

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表