1. 导言:
Apache Flink是一款功能强大的流式处理引擎,可用于实时处理大规模数据。本文将先容怎样使用Flink与MySQL数据库进行交互,以洗濯股票数据为例。
2. 情况准备:
起首,确保已安装Apache Flink并配置好MySQL数据库。导入相关依赖包,并创建必要的Table。同时必要提前创建好mysql表,一行source表,一张sink表。
- CREATE TABLE `re_stock_code_price` (
- `id` bigint NOT NULL AUTO_INCREMENT,
- `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
- `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
- `close` double DEFAULT NULL COMMENT '最新价',
- `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
- `change` double DEFAULT NULL COMMENT '涨跌额',
- `volume` double DEFAULT NULL COMMENT '成交量(手)',
- `amount` double DEFAULT NULL COMMENT '成交额',
- `amplitude` double DEFAULT NULL COMMENT '振幅',
- `turnover_rate` double DEFAULT NULL COMMENT '换手率',
- `peration` double DEFAULT NULL COMMENT '市盈率',
- `volume_rate` double DEFAULT NULL COMMENT '量比',
- `hign` double DEFAULT NULL COMMENT '最高',
- `low` double DEFAULT NULL COMMENT '最低',
- `open` double DEFAULT NULL COMMENT '今开',
- `previous_close` double DEFAULT NULL COMMENT '昨收',
- `pb` double DEFAULT NULL COMMENT '市净率',
- `create_time` varchar(64) NOT NULL COMMENT '写入时间',
- `rise` int NOT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
- CREATE TABLE `t_stock_code_price` (
- `id` bigint NOT NULL AUTO_INCREMENT,
- `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代码',
- `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名称',
- `close` double DEFAULT NULL COMMENT '最新价',
- `change_percent` double DEFAULT NULL COMMENT '涨跌幅',
- `change` double DEFAULT NULL COMMENT '涨跌额',
- `volume` double DEFAULT NULL COMMENT '成交量(手)',
- `amount` double DEFAULT NULL COMMENT '成交额',
- `amplitude` double DEFAULT NULL COMMENT '振幅',
- `turnover_rate` double DEFAULT NULL COMMENT '换手率',
- `peration` double DEFAULT NULL COMMENT '市盈率',
- `volume_rate` double DEFAULT NULL COMMENT '量比',
- `hign` double DEFAULT NULL COMMENT '最高',
- `low` double DEFAULT NULL COMMENT '最低',
- `open` double DEFAULT NULL COMMENT '今开',
- `previous_close` double DEFAULT NULL COMMENT '昨收',
- `pb` double DEFAULT NULL COMMENT '市净率',
- `create_time` varchar(64) NOT NULL COMMENT '写入时间',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=11207 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
复制代码- package org.east;
- import org.apache.flink.api.common.RuntimeExecutionMode;
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;
- object TableETL {
- def main(args: Array[String]): Unit = {
- val senv = StreamExecutionEnvironment.getExecutionEnvironment
- .setRuntimeMode(RuntimeExecutionMode.STREAMING)
- val tEnv = StreamTableEnvironment.create(senv)
- // 定义源表
- val source_table =
- """
- CREATE TEMPORARY TABLE t_stock_code_price (
- id BIGINT NOT NULL,
- code STRING NOT NULL,
- -- 其他字段...
- create_time STRING NOT NULL,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://localhost:3306/mydb',
- 'driver' = 'com.mysql.cj.jdbc.Driver',
- 'table-name' = 't_stock_code_price',
- 'username' = 'root',
- 'password' = '12345678'
- )
- """.stripMargin
- // 定义目标表
- val sink_table =
- """
- CREATE TEMPORARY TABLE re_stock_code_price (
- id BIGINT NOT NULL,
- code STRING NOT NULL,
- -- 其他字段...
- create_time STRING NOT NULL,
- rise INT,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://localhost:3306/mydb',
- 'driver' = 'com.mysql.cj.jdbc.Driver',
- 'table-name' = 're_stock_code_price',
- 'username' = 'root',
- 'password' = '12345678'
- )
- """.stripMargin
- tEnv.executeSql(source_table)
- tEnv.executeSql(sink_table)
复制代码 在这段代码中,我们起首创建了Flink的流式执行情况和StreamTableEnvironment。然后,我们定义了两个临时表,用于存储原始股票数据和洗濯后的数据。
3. 数据洗濯:
接下来,我们执行数据洗濯操纵,并将结果写入目的表。
- // 执行清洗操作,并将结果写入目标表
- tEnv.executeSql("INSERT INTO re_stock_code_price " +
- "SELECT *, CASE WHEN change_percent > 0 THEN 1 ELSE 0 END AS rise FROM t_stock_code_price")
复制代码 在这里,我们计算了股票涨跌情况,并将结果写入到目的表中。在这个例子中,我们假设change_percent字段表示股票代价的变革百分比,rise字段为1表示股票上涨,为0表示股票下跌。
4. 结果展示:
末了,我们查询目的表并打印结果。
5. 完备代码:
下面是完备的代码:
- package org.east;
- import org.apache.flink.api.common.RuntimeExecutionMode
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
- object TableETL {
- def main(args: Array[String]): Unit = {
- val senv = StreamExecutionEnvironment.getExecutionEnvironment
- .setRuntimeMode(RuntimeExecutionMode.STREAMING)
- val tEnv = StreamTableEnvironment.create(senv)
- val source_table =
- """
- |CREATE TEMPORARY TABLE t_stock_code_price (
- | id BIGINT NOT NULL,
- | code STRING NOT NULL,
- | name STRING NOT NULL,
- | `close` DOUBLE,
- | change_percent DOUBLE,
- | change DOUBLE,
- | volume DOUBLE,
- | amount DOUBLE,
- | amplitude DOUBLE,
- | turnover_rate DOUBLE,
- | peration DOUBLE,
- | volume_rate DOUBLE,
- | hign DOUBLE,
- | low DOUBLE,
- | `open` DOUBLE,
- | previous_close DOUBLE,
- | pb DOUBLE,
- | create_time STRING NOT NULL,
- | PRIMARY KEY (id) NOT ENFORCED
- |) WITH (
- | 'connector' = 'jdbc',
- | 'url' = 'jdbc:mysql://localhost:3306/mydb',
- | 'driver' = 'com.mysql.cj.jdbc.Driver',
- | 'table-name' = 't_stock_code_price',
- | 'username' = 'root',
- | 'password' = '12345678'
- |)
- |""".stripMargin
- val sink_table =
- """
- |CREATE TEMPORARY TABLE re_stock_code_price (
- | id BIGINT NOT NULL,
- | code STRING NOT NULL,
- | name STRING NOT NULL,
- | `close` DOUBLE,
- | change_percent DOUBLE,
- | change DOUBLE,
- | volume DOUBLE,
- | amount DOUBLE,
- | amplitude DOUBLE,
- | turnover_rate DOUBLE,
- | peration DOUBLE,
- | volume_rate DOUBLE,
- | hign DOUBLE,
- | low DOUBLE,
- | `open` DOUBLE,
- | previous_close DOUBLE,
- | pb DOUBLE,
- | create_time STRING NOT NULL,
- | rise int,
- | PRIMARY KEY (id) NOT ENFORCED
- |) WITH (
- | 'connector' = 'jdbc',
- | 'url' = 'jdbc:mysql://localhost:3306/mydb',
- | 'driver' = 'com.mysql.cj.jdbc.Driver',
- | 'table-name' = 're_stock_code_price',
- | 'username' = 'root',
- | 'password' = '12345678'
- |)
- |""".stripMargin
- tEnv.executeSql(source_table)
- tEnv.executeSql(sink_table)
- tEnv.executeSql("insert into re_stock_code_price select *,case when change_percent>0 then 1 else 0 end as rise from t_stock_code_price")
- val user_DS = tEnv.executeSql("select * from re_stock_code_price")
- user_DS.print()
- }
- }
复制代码 如有遇到问题可以找小编沟通交换哦。别的小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型练习等。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |