FlinkX安装与利用

打印 上一主题 下一主题

主题 891|帖子 891|积分 2673

FlinkX概述

FlinkX是在袋鼠云内部广泛利用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁徙。
FlinkX是一个数据同步工具,既可以采集静态的数据,好比MySQL,HDFS等,也可以采集实时变化的数据,好比MySQL ,binlog,Kafka等
FlinkX的安装

1.上传并解压
unzip flinkX-1.10.zip -d /usr/local/soft/
2.设置环境变量
3.给bin/flinkX这个文件加上实行权限
chmod a+x flinkx
4.修改设置文件,设置运行端口
vim flinkconf/flink-conf.yaml
5.web服务端口,不指定会随机生成一个
rest.bind-port:8888

FlinkX的简单利用

https://github.com/oceanos/flinkx/blob/1.8_release/README_OLD.
在flinkX官网中README_CH.md里面查看先容文档
MySqlToHdfs

1.设置文件
  1. {
  2.    "job": {
  3.        "content": [
  4.            {
  5.                "reader": {
  6.                    "parameter": {
  7.                        "username": "root",
  8.                        "password": "12345678",
  9.                        "connection": [
  10.                            {
  11.                                "jdbcUrl": [
  12.                                    "jdbc:mysql://master:3306/students?useUnicode=true&characterEncoding=utf8&useSSL=false"
  13.                                ],
  14.                                "table": [
  15.                                    "biaomin"
  16.                                ]
  17.                            }
  18.                        ],
  19.                        "column": [
  20.                            "*"
  21.                        ],
  22.                        "where": "id > 90",
  23.                        "requestAccumulatorInterval": 2
  24.                    },
  25.                    "name": "mysqlreader"
  26.                },
  27.                "writer": {
  28.                    "name": "hdfswriter",
  29.                    "parameter": {
  30.                        "path": "hdfs://master:9000/bigdata30/flinkx/out1",
  31.                        "defaultFS": "hdfs://master:9000",
  32.                        "column": [
  33.                            {
  34.                                "name": "col1",
  35.                                "index": 0,
  36.                                "type": "string"
  37.                            },{
  38.                                "name": "col2",
  39.                                "index": 1,
  40.                                "type": "string"
  41.                            },{
  42.                                "name": "col3",
  43.                                "index": 2,
  44.                                "type": "string"
  45.                            },{
  46.                                "name": "col4",
  47.                                "index": 3,
  48.                                "type": "string"
  49.                            },{
  50.                                "name": "col1",
  51.                                "index": 4,
  52.                                "type": "string"
  53.                            },{
  54.                                "name": "col2",
  55.                                "index": 5,
  56.                                "type": "string"
  57.                            }
  58.                        ],
  59.                        "fieldDelimiter": ",",
  60.                        "fileType": "text",
  61.                        "writeMode": "append"
  62.                    }
  63.                }
  64.            }
  65.        ],
  66.        "setting": {
  67.            "restore": {
  68.                "isRestore": false,
  69.                "isStream": false
  70.            },
  71.            "errorLimit": {},
  72.            "speed": {
  73.                "channel": 1
  74.            }:wq
  75.        }
  76.    }
  77. }
复制代码
2.启动任务
  1. flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/
复制代码
3.监听日记
flinkx 任务启动后,会在实行命令的目次下生成一个nohup.out文件
  1. tail -F nohup.out
复制代码
通过web界面查看任务运行环境
  1. http://master:8888
复制代码

MysqlToHive

启动hiveserver2
  1. nohup hive --service metastore &
  2. nohup hiveserver2 &
复制代码
1.设置文件
  1. {
  2.    "job": {
  3.        "content": [
  4.            {
  5.                "reader": {
  6.                    "parameter": {
  7.                        "username": "root",
  8.                        "password": "12345678",
  9.                        "connection": [
  10.                            {
  11.                                "jdbcUrl": [
  12.                                    "jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
  13.                                ],
  14.                                "table": [
  15.                                    "jd_goods"
  16.                                ]
  17.                            }
  18.                        ],
  19.                        "column": [
  20.                            "*"
  21.                        ],
  22.                        "where": "id > 90",
  23.                        "requestAccumulatorInterval": 1
  24.                    },
  25.                    "name": "mysqlreader"
  26.                },
  27.                "writer": {
  28.                    "name": "hivewriter",
  29.                    "parameter": {
  30.                        "jdbcUrl": "jdbc:hive2://master:10000/bigdata30",
  31.                        "username": "",
  32.                        "password": "",
  33.                        "fileType": "text",
  34.                        "fieldDelimiter": ",",
  35.                        "writeMode": "overwrite",
  36.                        "charsetName": "UTF-8",
  37.                            "tablesColumn": "",
  38.                        "defaultFS": "hdfs://master:9000"
  39.                    }
  40.                }
  41.            }
  42.        ],
  43.        "setting": {
  44.            "restore": {
  45.                "isRestore": false,
  46.                "isStream": false
  47.            },
  48.            "errorLimit": {},
  49.            "speed": {
  50.                "channel": 1
  51.            }
  52.        }
  53.    }
  54. }
复制代码
2.在hive中创建testflinX数据库,并创建分区表
  1. create database testflinkx;
  2. use testflinkx;
  3. CREATE TABLE `bigdata30`.`datax_tb1`(
  4.    `id` STRING,
  5.    `gname` STRING,
  6.    `price` STRING,
  7.    `commit` STRING,
  8.    `shop` STRING,
  9.    `icons` STRING)
  10. PARTITIONED BY (
  11.   `pt` string)
  12. ROW FORMAT DELIMITED
  13.   FIELDS TERMINATED BY ',';
复制代码
3.启动任务
  1. flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
复制代码

MysqlToHbase

1.设置文件
  1. {
  2.    "job": {
  3.        "content": [
  4.            {
  5.                "reader": {
  6.                    "parameter": {
  7.                        "username": "root",
  8.                        "password": "12345678",
  9.                        "connection": [
  10.                            {
  11.                                "jdbcUrl": [
  12.                                    "jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
  13.                                ],
  14.                                "table": [
  15.                                    "jd_goods"
  16.                                ]
  17.                            }
  18.                        ],
  19.                        "column": [
  20.                            "*"
  21.                        ],
  22.                        "where": "id > 90",
  23.                        "requestAccumulatorInterval": 2
  24.                    },
  25.                    "name": "mysqlreader"
  26.                },
  27.                "writer": {
  28.                    "name": "hbasewriter",
  29.                    "parameter": {
  30.                        "hbaseConfig": {
  31.                            "hbase.zookeeper.property.clientPort": "2181",
  32.                            "hbase.rootdir": "hdfs://master:9000/hbase",
  33.                            "hbase.cluster.distributed": "true",
  34.                            "hbase.zookeeper.quorum": "master,node1,node2",
  35.                            "zookeeper.znode.parent": "/hbase"
  36.                        },
  37.                        "table": "flinkx_jd_goods",
  38.                        "rowkeyColumn": "$(cf1:id)",
  39.                        "column": [
  40.                            {
  41.                                "name": "cf1:id",
  42.                                "type": "string"
  43.                            },
  44.                            {
  45.                                "name": "cf1:gname",
  46.                                "type": "string"
  47.                            },
  48.                            {
  49.                                "name": "cf1:price",
  50.                                "type": "string"
  51.                            },
  52.                            {
  53.                                "name": "cf1:commit",
  54.                                "type": "string"
  55.                            },
  56.                            {
  57.                                "name": "cf1:shop",
  58.                                "type": "string"
  59.                            },
  60.                            {
  61.                                "name": "cf1:icons",
  62.                                "type": "string"
  63.                            }
  64.                        ]
  65.                    }
  66.                }
  67.            }
  68.        ],
  69.        "setting": {
  70.            "restore": {
  71.                "isRestore": false,
  72.                "isStream": false
  73.            },
  74.            "errorLimit": {},
  75.            "speed": {
  76.                "channel": 1
  77.            }
  78.        }
  79.    }
  80. }
复制代码
2.启动hbase 并创建flinkx表
  1. create 'flinkX','lie1'
复制代码
3.启动任务
  1. flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
复制代码
4.查看日记
  1. tail -F nohup.out
复制代码

MysqlToMysql

  1. {
  2.    "job": {
  3.      "content": [
  4.        {
  5.          "reader": {
  6.            "name": "mysqlreader",
  7.            "parameter": {
  8.              "column": [
  9.                {
  10.                  "name": "id",
  11.                  "type": "int"
  12.                },
  13.                {
  14.                  "name": "name",
  15.                  "type": "string"
  16.                },
  17.                {
  18.                  "name": "age",
  19.                  "type": "int"
  20.                },
  21.                {
  22.                  "name": "gender",
  23.                  "type": "string"
  24.                },
  25.                {
  26.                  "name": "clazz",
  27.                  "type": "string"
  28.                }
  29.              ],
  30.              "username": "root",
  31.              "password": "123456",
  32.              "connection": [
  33.                {
  34.                  "jdbcUrl": [
  35.                    "jdbc:mysql://master:3306/student?useSSL=false"
  36.                  ],
  37.                  "table": [
  38.                    "student"
  39.                  ]
  40.                }
  41.              ]
  42.            }
  43.          },
  44.          "writer": {
  45.            "name": "mysqlwriter",
  46.            "parameter": {
  47.              "username": "root",
  48.              "password": "123456",
  49.              "connection": [
  50.                {
  51.                  "jdbcUrl": "jdbc:mysql://master:3306/student?useSSL=false",
  52.                  "table": [
  53.                    "student2"
  54.                  ]
  55.                }
  56.              ],
  57.              "writeMode": "insert",
  58.              "column": [
  59.                {
  60.                    "name": "id",
  61.                    "type": "int"
  62.                  },
  63.                  {
  64.                    "name": "name",
  65.                    "type": "string"
  66.                  },
  67.                  {
  68.                    "name": "age",
  69.                    "type": "int"
  70.                  },
  71.                  {
  72.                    "name": "gender",
  73.                    "type": "string"
  74.                  },
  75.                  {
  76.                    "name": "clazz",
  77.                    "type": "string"
  78.                  }
  79.              ]
  80.            }
  81.          }
  82.        }
  83.      ],
  84.      "setting": {
  85.        "speed": {
  86.          "channel": 1,
  87.          "bytes": 0
  88.        }
  89.      }
  90.    }
  91.   }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

九天猎人

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表