FlinkX概述
FlinkX是在袋鼠云内部广泛利用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁徙。
FlinkX是一个数据同步工具,既可以采集静态的数据,好比MySQL,HDFS等,也可以采集实时变化的数据,好比MySQL ,binlog,Kafka等
FlinkX的安装
1.上传并解压
unzip flinkX-1.10.zip -d /usr/local/soft/
2.设置环境变量
3.给bin/flinkX这个文件加上实行权限
chmod a+x flinkx
4.修改设置文件,设置运行端口
vim flinkconf/flink-conf.yaml
5.web服务端口,不指定会随机生成一个
rest.bind-port:8888
FlinkX的简单利用
https://github.com/oceanos/flinkx/blob/1.8_release/README_OLD.
在flinkX官网中README_CH.md里面查看先容文档
MySqlToHdfs
1.设置文件
- {
- "job": {
- "content": [
- {
- "reader": {
- "parameter": {
- "username": "root",
- "password": "12345678",
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://master:3306/students?useUnicode=true&characterEncoding=utf8&useSSL=false"
- ],
- "table": [
- "biaomin"
- ]
- }
- ],
- "column": [
- "*"
- ],
- "where": "id > 90",
- "requestAccumulatorInterval": 2
- },
- "name": "mysqlreader"
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "path": "hdfs://master:9000/bigdata30/flinkx/out1",
- "defaultFS": "hdfs://master:9000",
- "column": [
- {
- "name": "col1",
- "index": 0,
- "type": "string"
- },{
- "name": "col2",
- "index": 1,
- "type": "string"
- },{
- "name": "col3",
- "index": 2,
- "type": "string"
- },{
- "name": "col4",
- "index": 3,
- "type": "string"
- },{
- "name": "col1",
- "index": 4,
- "type": "string"
- },{
- "name": "col2",
- "index": 5,
- "type": "string"
- }
- ],
- "fieldDelimiter": ",",
- "fileType": "text",
- "writeMode": "append"
- }
- }
- }
- ],
- "setting": {
- "restore": {
- "isRestore": false,
- "isStream": false
- },
- "errorLimit": {},
- "speed": {
- "channel": 1
- }:wq
- }
- }
- }
复制代码 2.启动任务
- flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/
复制代码 3.监听日记
flinkx 任务启动后,会在实行命令的目次下生成一个nohup.out文件
通过web界面查看任务运行环境
MysqlToHive
启动hiveserver2
- nohup hive --service metastore &
- nohup hiveserver2 &
复制代码 1.设置文件
- {
- "job": {
- "content": [
- {
- "reader": {
- "parameter": {
- "username": "root",
- "password": "12345678",
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
- ],
- "table": [
- "jd_goods"
- ]
- }
- ],
- "column": [
- "*"
- ],
- "where": "id > 90",
- "requestAccumulatorInterval": 1
- },
- "name": "mysqlreader"
- },
- "writer": {
- "name": "hivewriter",
- "parameter": {
- "jdbcUrl": "jdbc:hive2://master:10000/bigdata30",
- "username": "",
- "password": "",
- "fileType": "text",
- "fieldDelimiter": ",",
- "writeMode": "overwrite",
- "charsetName": "UTF-8",
- "tablesColumn": "",
- "defaultFS": "hdfs://master:9000"
- }
- }
- }
- ],
- "setting": {
- "restore": {
- "isRestore": false,
- "isStream": false
- },
- "errorLimit": {},
- "speed": {
- "channel": 1
- }
- }
- }
- }
复制代码 2.在hive中创建testflinX数据库,并创建分区表
- create database testflinkx;
- use testflinkx;
- CREATE TABLE `bigdata30`.`datax_tb1`(
- `id` STRING,
- `gname` STRING,
- `price` STRING,
- `commit` STRING,
- `shop` STRING,
- `icons` STRING)
- PARTITIONED BY (
- `pt` string)
- ROW FORMAT DELIMITED
- FIELDS TERMINATED BY ',';
复制代码 3.启动任务
- flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
复制代码
MysqlToHbase
1.设置文件
- {
- "job": {
- "content": [
- {
- "reader": {
- "parameter": {
- "username": "root",
- "password": "12345678",
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
- ],
- "table": [
- "jd_goods"
- ]
- }
- ],
- "column": [
- "*"
- ],
- "where": "id > 90",
- "requestAccumulatorInterval": 2
- },
- "name": "mysqlreader"
- },
- "writer": {
- "name": "hbasewriter",
- "parameter": {
- "hbaseConfig": {
- "hbase.zookeeper.property.clientPort": "2181",
- "hbase.rootdir": "hdfs://master:9000/hbase",
- "hbase.cluster.distributed": "true",
- "hbase.zookeeper.quorum": "master,node1,node2",
- "zookeeper.znode.parent": "/hbase"
- },
- "table": "flinkx_jd_goods",
- "rowkeyColumn": "$(cf1:id)",
- "column": [
- {
- "name": "cf1:id",
- "type": "string"
- },
- {
- "name": "cf1:gname",
- "type": "string"
- },
- {
- "name": "cf1:price",
- "type": "string"
- },
- {
- "name": "cf1:commit",
- "type": "string"
- },
- {
- "name": "cf1:shop",
- "type": "string"
- },
- {
- "name": "cf1:icons",
- "type": "string"
- }
- ]
- }
- }
- }
- ],
- "setting": {
- "restore": {
- "isRestore": false,
- "isStream": false
- },
- "errorLimit": {},
- "speed": {
- "channel": 1
- }
- }
- }
- }
复制代码 2.启动hbase 并创建flinkx表
3.启动任务
- flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
复制代码 4.查看日记
MysqlToMysql
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": [
- {
- "name": "id",
- "type": "int"
- },
- {
- "name": "name",
- "type": "string"
- },
- {
- "name": "age",
- "type": "int"
- },
- {
- "name": "gender",
- "type": "string"
- },
- {
- "name": "clazz",
- "type": "string"
- }
- ],
- "username": "root",
- "password": "123456",
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://master:3306/student?useSSL=false"
- ],
- "table": [
- "student"
- ]
- }
- ]
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "username": "root",
- "password": "123456",
- "connection": [
- {
- "jdbcUrl": "jdbc:mysql://master:3306/student?useSSL=false",
- "table": [
- "student2"
- ]
- }
- ],
- "writeMode": "insert",
- "column": [
- {
- "name": "id",
- "type": "int"
- },
- {
- "name": "name",
- "type": "string"
- },
- {
- "name": "age",
- "type": "int"
- },
- {
- "name": "gender",
- "type": "string"
- },
- {
- "name": "clazz",
- "type": "string"
- }
- ]
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": 1,
- "bytes": 0
- }
- }
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |