目录
1. 目的
2. 体系架构
3. 环境搭建
3.1 ElasticSearch部署
3.2 数据同步
3.2.1 Flink部署
3.2.2 创建同步任务
3.3 环境验证
4. 后续
1. 目的
实现将差别MySQL Schema实时同步至同一数据源以供其他数据分析应用作为数据源调用。
搭建范围包括:供数据分析应用调用的数据源搭建以及MySQL数据同步
2. 体系架构
Flink+ElasticSearch的部署设置为本文重点。
序号 | 产品 | 版本 | 备注 | 1 | CentOS | 7.8 64bit | 移动云ECS | 2 | MySQL | 8.0 | 移动云DB | 3 | ElasticSearch | 8.15.3 | | 4 | Flink | 1.20.0 | | 5 | Flink CDC
| 3.2.0 | 生成Flink Job | 3. 环境搭建
3.1 ElasticSearch部署
参考ElasticSearch8.13.0安装步骤
Step1. 创建ES群组、用户(ElasticSearch8 不允许root用户启动)
- groupadd es
- useradd es -g es -p XXXXXXX
复制代码 Step2. 下载ElasticSearch8.15.3安装包 (ElasticSearch官方安装步骤)
- wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz
- wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512
- shasum -a 512 -c elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512
复制代码 盼望结果:
如果提示shasum command not found, 安装per-Digest-SHA后在执行
- yum install -y perl-Digest-SHA
复制代码 Step3. 解压并赋权
- tar -xzf elasticsearch-8.15.3-linux-x86_64.tar.gz
- chown -R es:es elasticsearch-8.15.3
- cd elasticsearch-8.15.3/
复制代码 Step4. 设置elasticsearch.yml
- # 允许外部访问
- network.host: 0.0.0.0
- # 关闭安全组件
- xpack.security.enabled: false
复制代码 Step5. 设置vm.max_map_count参数 (体系虚拟内存默认最大映射数为65530,无法满足ES体系要求,需要调解为262144以上。)
- vim /etc/sysctl.conf
- # 添加添加参数
- vm.max_map_count = 262144
- # 重新加载/etc/sysctl.conf配置\
- sysctl -p
复制代码 Step6. 启动elasticsearch
- su es
- # 首次运行建议使用如下命令,以发现启动问题
- bin/elasticsearch
- # 后台运行
- bin/elasticsearch -d -p pid
复制代码 首次启动记录elastic初始暗码,以供后续利用
Step7. 验证启动乐成
网页输入: https://ip:9200 提示输入用户名、暗码则启动乐成
Step8. 设置默认动态模板,关闭date_detection (当存在date字段,但该字段不是必填项时,开启date_detection可能会发生“cannot parse empty datetime”的报错)
- curl -XPUT [uesr]:[password]@[ip]:[port]/_template/template_default?pretty -H "Content-Type: application/json" --data-binary template_default.json
- # template_defualt.json
- {
- "index_patterns" : ["*"],
- "mappings" : {
- "date_detection": false
- }
- }
复制代码
3.2 数据同步
3.2.1 Flink部署
Step1 下载flink安装包
- wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
- wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz.sha512
- shasum -a 512 -c flink-1.20.0-bin-scala_2.12.tgz.sha512
复制代码 Step2 解压安装包
- tar -xzf flink-1.20.0-bin-scala_2.12.tgz
- cd flink-1.20.0-bin-scala_2.12/
复制代码 Step3 设置config.yml
- jobmanager:
- bind-host: 0.0.0.0
- 。。。
- memory:
- process:
- size: 2600m
- 。。。
- taskmanager:
- bind-host: 0.0.0.0
- host: 0.0.0.0
- numberOfTaskSlots: 2
- memory:
- process:
- size: 2728m
- flink:
- size: 2280m
- 。。。
- execution:
- checkpointing:
- interval: 3min
- 。。。
- rest:
- address: 0.0.0.0
- bind-address: 0.0.0.0
复制代码 Step4 启动flink
注:启动flink,需安装JAVA,环境变量JAVA_HOME不为空。
3.2.2 创建同步任务
Step1. 下载flink cdc3.2.0
- wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz
- wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz.sha512
- shasum -a 512 flink-cdc-3.2.0-bin.tar.gz.sha512
复制代码 Step2. 解压文件
- tar -zxf flink-cdc-3.2.0-bin.tar.gz
- cd flink-cdc-3.2.0/
复制代码 Step3. 编写CDC YAML文件
- ################################################################################
- # Description: Sync MySQL all tables to ElasticSearch
- ################################################################################
- source:
- type: mysql
- hostname: [DB_IP]
- port: [DB_PORT,DEFAULT 3306]
- username: [DB_USER]
- password: [DB_PASSWORD]
- tables: [同步的表,全库[schema].\.*]
- server-id: 5400-5404
- server-time-zone: Asia/Shanghai
- sink:
- type: elasticsearch
- name: ES Sink
- hosts: http://[ES_IP]:[ES_PORT]
- username: [ES_USER]
- password: [ES_PASSWORD]
- version: 8
- batch.size.max: 5
- inflight.requests.max: 1
- buffered.requests.max: 10
- batch.size.max.bytes: 52428800
- buffer.time.max.ms: 1000
- record.size.max.bytes: 10485760
- pipeline:
- name: Sync MySQL Database to ElasticSearch
- parallelism: 1
复制代码 Step4. 上传JAR包
将flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar,flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar 上传至 FLINK_CDC_HOME/lib目录下
将mysql-connector-java-8.0.27.jar 上传至FLINK_HOME/lib 目录下
Step5. 创建同步任务
- bin/flink-cdc.sh MySQLToES.yaml --flink-home /data/flink-1.20.0
复制代码 盼望结果:
3.3 环境验证
1. Flink任务验证
Jobs -> Running Jobs 找到之前创建的JOB,运行状态为Running
等候3min,切换至checkpoints页面,checkpoints history为completed
2. 查察ElasticSearch日记
首次同步会存在create index,create mapping日记
修改Mysql数据库表内容, ES对应INDEX下的记录有被同步修改
4. 后续
1. Flink CDC 设置文件 elasticsearch相关设置的含义
- batch.size.max: 5
- inflight.requests.max: 1
- buffered.requests.max: 10
- batch.size.max.bytes: 52428800
- buffer.time.max.ms: 1000
- record.size.max.bytes: 10485760
复制代码 2. elasticsearch.yml中 xpack.security.enabled: true时,外部api链接
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |