使用Apache Flink实现MySQL数据读取和写入的完备指南

打印 上一主题 下一主题

主题 554|帖子 554|积分 1662

1. 导言:

Apache Flink是一款功能强大的流式处理引擎,可用于实时处理大规模数据。本文将先容怎样使用Flink与MySQL数据库进行交互,以洗濯股票数据为例。
2. 情况准备:

起首,确保已安装Apache Flink并配置好MySQL数据库。导入相关依赖包,并创建必要的Table。同时必要提前创建好mysql表,一行source表,一张sink表。
  1. CREATE TABLE `re_stock_code_price` (
  2.   `id` bigint NOT NULL AUTO_INCREMENT,
  3.   `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  4.   `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  5.   `close` double DEFAULT NULL COMMENT '最新价',
  6.   `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  7.   `change` double DEFAULT NULL COMMENT '涨跌额',
  8.   `volume` double DEFAULT NULL COMMENT '成交量(手)',
  9.   `amount` double DEFAULT NULL COMMENT '成交额',
  10.   `amplitude` double DEFAULT NULL COMMENT '振幅',
  11.   `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  12.   `peration` double DEFAULT NULL COMMENT '市盈率',
  13.   `volume_rate` double DEFAULT NULL COMMENT '量比',
  14.   `hign` double DEFAULT NULL COMMENT '最高',
  15.   `low` double DEFAULT NULL COMMENT '最低',
  16.   `open` double DEFAULT NULL COMMENT '今开',
  17.   `previous_close` double DEFAULT NULL COMMENT '昨收',
  18.   `pb` double DEFAULT NULL COMMENT '市净率',
  19.   `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  20.   `rise` int NOT NULL,
  21.   PRIMARY KEY (`id`)
  22. ) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
  23. CREATE TABLE `t_stock_code_price` (
  24.   `id` bigint NOT NULL AUTO_INCREMENT,
  25.   `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
  26.   `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
  27.   `close` double DEFAULT NULL COMMENT '最新价',
  28.   `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
  29.   `change` double DEFAULT NULL COMMENT '涨跌额',
  30.   `volume` double DEFAULT NULL COMMENT '成交量(手)',
  31.   `amount` double DEFAULT NULL COMMENT '成交额',
  32.   `amplitude` double DEFAULT NULL COMMENT '振幅',
  33.   `turnover_rate` double DEFAULT NULL COMMENT '换手率',
  34.   `peration` double DEFAULT NULL COMMENT '市盈率',
  35.   `volume_rate` double DEFAULT NULL COMMENT '量比',
  36.   `hign` double DEFAULT NULL COMMENT '最高',
  37.   `low` double DEFAULT NULL COMMENT '最低',
  38.   `open` double DEFAULT NULL COMMENT '今开',
  39.   `previous_close` double DEFAULT NULL COMMENT '昨收',
  40.   `pb` double DEFAULT NULL COMMENT '市净率',
  41.   `create_time` varchar(64) NOT NULL COMMENT '写入时间',
  42.   PRIMARY KEY (`id`)
  43. ) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
复制代码
  1. package org.east;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;
  5. object TableETL {
  6.     def main(args: Array[String]): Unit = {
  7.         val senv = StreamExecutionEnvironment.getExecutionEnvironment
  8.             .setRuntimeMode(RuntimeExecutionMode.STREAMING)
  9.         val tEnv = StreamTableEnvironment.create(senv)
  10.         // 定义源表
  11.         val source_table =
  12.             """
  13.             CREATE TEMPORARY TABLE t_stock_code_price (
  14.                 id BIGINT NOT NULL,
  15.                 code STRING NOT NULL,
  16.                 -- 其他字段...
  17.                 create_time STRING NOT NULL,
  18.                 PRIMARY KEY (id) NOT ENFORCED
  19.             ) WITH (
  20.                 'connector' = 'jdbc',
  21.                 'url' = 'jdbc:mysql://localhost:3306/mydb',
  22.                 'driver' = 'com.mysql.cj.jdbc.Driver',
  23.                 'table-name' = 't_stock_code_price',
  24.                 'username' = 'root',
  25.                 'password' = '12345678'
  26.             )
  27.             """.stripMargin
  28.         // 定义目标表
  29.         val sink_table =
  30.             """
  31.             CREATE TEMPORARY TABLE re_stock_code_price (
  32.                 id BIGINT NOT NULL,
  33.                 code STRING NOT NULL,
  34.                 -- 其他字段...
  35.                 create_time STRING NOT NULL,
  36.                 rise INT,
  37.                 PRIMARY KEY (id) NOT ENFORCED
  38.             ) WITH (
  39.                 'connector' = 'jdbc',
  40.                 'url' = 'jdbc:mysql://localhost:3306/mydb',
  41.                 'driver' = 'com.mysql.cj.jdbc.Driver',
  42.                 'table-name' = 're_stock_code_price',
  43.                 'username' = 'root',
  44.                 'password' = '12345678'
  45.             )
  46.             """.stripMargin
  47.         tEnv.executeSql(source_table)
  48.         tEnv.executeSql(sink_table)
复制代码
在这段代码中,我们起首创建了Flink的流式执行情况和StreamTableEnvironment。然后,我们定义了两个临时表,用于存储原始股票数据和洗濯后的数据。
3. 数据洗濯:

接下来,我们执行数据洗濯操纵,并将结果写入目的表。
  1.         // 执行清洗操作,并将结果写入目标表
  2.         tEnv.executeSql("INSERT INTO re_stock_code_price " +
  3.             "SELECT *, CASE WHEN change_percent > 0 THEN 1 ELSE 0 END AS rise FROM t_stock_code_price")
复制代码
在这里,我们计算了股票涨跌情况,并将结果写入到目的表中。在这个例子中,我们假设change_percent字段表示股票代价的变革百分比,rise字段为1表示股票上涨,为0表示股票下跌。
4. 结果展示:

末了,我们查询目的表并打印结果。


5. 完备代码:

下面是完备的代码:
  1. package org.east;
  2. import org.apache.flink.api.common.RuntimeExecutionMode
  3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  4. import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
  5. object TableETL {
  6.   def main(args: Array[String]): Unit = {
  7.     val senv = StreamExecutionEnvironment.getExecutionEnvironment
  8.       .setRuntimeMode(RuntimeExecutionMode.STREAMING)
  9.     val tEnv = StreamTableEnvironment.create(senv)
  10.     val source_table =
  11.       """
  12.         |CREATE TEMPORARY TABLE t_stock_code_price (
  13.         |  id BIGINT NOT NULL,
  14.         |  code STRING NOT NULL,
  15.         |  name STRING NOT NULL,
  16.         |  `close` DOUBLE,
  17.         |  change_percent DOUBLE,
  18.         |  change DOUBLE,
  19.         |  volume DOUBLE,
  20.         |  amount DOUBLE,
  21.         |  amplitude DOUBLE,
  22.         |  turnover_rate DOUBLE,
  23.         |  peration DOUBLE,
  24.         |  volume_rate DOUBLE,
  25.         |  hign DOUBLE,
  26.         |  low DOUBLE,
  27.         |  `open` DOUBLE,
  28.         |  previous_close DOUBLE,
  29.         |  pb DOUBLE,
  30.         |  create_time STRING NOT NULL,
  31.         |  PRIMARY KEY (id) NOT ENFORCED
  32.         |) WITH (
  33.         |   'connector' = 'jdbc',
  34.         |   'url' = 'jdbc:mysql://localhost:3306/mydb',
  35.         |   'driver' = 'com.mysql.cj.jdbc.Driver',
  36.         |   'table-name' = 't_stock_code_price',
  37.         |   'username' = 'root',
  38.         |   'password' = '12345678'
  39.         |)
  40.         |""".stripMargin
  41.     val sink_table =
  42.       """
  43.         |CREATE TEMPORARY TABLE re_stock_code_price (
  44.         |  id BIGINT NOT NULL,
  45.         |  code STRING NOT NULL,
  46.         |  name STRING NOT NULL,
  47.         |  `close` DOUBLE,
  48.         |  change_percent DOUBLE,
  49.         |  change DOUBLE,
  50.         |  volume DOUBLE,
  51.         |  amount DOUBLE,
  52.         |  amplitude DOUBLE,
  53.         |  turnover_rate DOUBLE,
  54.         |  peration DOUBLE,
  55.         |  volume_rate DOUBLE,
  56.         |  hign DOUBLE,
  57.         |  low DOUBLE,
  58.         |  `open` DOUBLE,
  59.         |  previous_close DOUBLE,
  60.         |  pb DOUBLE,
  61.         |  create_time STRING NOT NULL,
  62.         |  rise int,
  63.         |  PRIMARY KEY (id) NOT ENFORCED
  64.         |) WITH (
  65.         |   'connector' = 'jdbc',
  66.         |   'url' = 'jdbc:mysql://localhost:3306/mydb',
  67.         |   'driver' = 'com.mysql.cj.jdbc.Driver',
  68.         |   'table-name' = 're_stock_code_price',
  69.         |   'username' = 'root',
  70.         |   'password' = '12345678'
  71.         |)
  72.         |""".stripMargin
  73.     tEnv.executeSql(source_table)
  74.     tEnv.executeSql(sink_table)
  75.     tEnv.executeSql("insert into re_stock_code_price select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")
  76.     val user_DS = tEnv.executeSql("select * from re_stock_code_price")
  77.     user_DS.print()
  78.   }
  79. }
复制代码
如有遇到问题可以找小编沟通交换哦。别的小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型练习等。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莫张周刘王

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表