基于Flink CDC实现ElasticSearch同步MySQL环境搭建条记

打印 上一主题 下一主题

主题 889|帖子 889|积分 2667

目录
1. 目的
2. 体系架构
3. 环境搭建
3.1 ElasticSearch部署 
3.2 数据同步
3.2.1 Flink部署
3.2.2 创建同步任务
3.3 环境验证
4. 后续



1. 目的

实现将差别MySQL Schema实时同步至同一数据源以供其他数据分析应用作为数据源调用。
搭建范围包括:供数据分析应用调用的数据源搭建以及MySQL数据同步
2. 体系架构

Flink+ElasticSearch的部署设置为本文重点。

序号产品版本备注
1CentOS7.8 64bit移动云ECS
2MySQL8.0移动云DB
3ElasticSearch8.15.3
4Flink1.20.0
5 Flink CDC
3.2.0生成Flink Job
3. 环境搭建

3.1 ElasticSearch部署 

参考ElasticSearch8.13.0安装步骤
   Step1. 创建ES群组、用户(ElasticSearch8 不允许root用户启动)
  1. groupadd es
  2. useradd es -g es -p XXXXXXX
复制代码
Step2. 下载ElasticSearch8.15.3安装包 (ElasticSearch官方安装步骤)
  1. wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz
  2. wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512
  3. shasum -a 512 -c elasticsearch-8.15.3-linux-x86_64.tar.gz.sha512
复制代码
盼望结果:
  

  如果提示shasum command not found, 安装per-Digest-SHA后在执行
  1. yum install -y perl-Digest-SHA
复制代码
Step3. 解压并赋权
  1. tar -xzf elasticsearch-8.15.3-linux-x86_64.tar.gz
  2. chown -R es:es elasticsearch-8.15.3
  3. cd elasticsearch-8.15.3/
复制代码
Step4. 设置elasticsearch.yml 
  1. # 允许外部访问
  2. network.host: 0.0.0.0
  3. # 关闭安全组件
  4. xpack.security.enabled: false
复制代码
Step5. 设置vm.max_map_count参数 (体系虚拟内存默认最大映射数为65530,无法满足ES体系要求,需要调解为262144以上。)
  1. vim /etc/sysctl.conf
  2. # 添加添加参数
  3. vm.max_map_count = 262144
  4. # 重新加载/etc/sysctl.conf配置\
  5. sysctl -p
复制代码
Step6. 启动elasticsearch
  1. su es
  2. # 首次运行建议使用如下命令,以发现启动问题
  3. bin/elasticsearch
  4. # 后台运行
  5. bin/elasticsearch -d -p pid
复制代码
首次启动记录elastic初始暗码,以供后续利用
  

  
  Step7. 验证启动乐成
  网页输入: https://ip:9200 提示输入用户名、暗码则启动乐成
  
  Step8. 设置默认动态模板,关闭date_detection (当存在date字段,但该字段不是必填项时,开启date_detection可能会发生“cannot parse empty datetime”的报错)
  1. curl -XPUT  [uesr]:[password]@[ip]:[port]/_template/template_default?pretty -H "Content-Type: application/json" --data-binary template_default.json
  2. # template_defualt.json
  3. {
  4.     "index_patterns" : ["*"],
  5.     "mappings" : {
  6.         "date_detection": false
  7.     }
  8. }
复制代码

  3.2 数据同步

3.2.1 Flink部署

   Step1 下载flink安装包
  1. wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
  2. wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz.sha512
  3. shasum -a 512 -c flink-1.20.0-bin-scala_2.12.tgz.sha512
复制代码
Step2 解压安装包
  1. tar -xzf flink-1.20.0-bin-scala_2.12.tgz
  2. cd flink-1.20.0-bin-scala_2.12/
复制代码
Step3 设置config.yml
  1. jobmanager:
  2.   bind-host: 0.0.0.0
  3. 。。。
  4.   memory:
  5.     process:
  6.       size: 2600m
  7. 。。。
  8. taskmanager:
  9.   bind-host: 0.0.0.0
  10.   host: 0.0.0.0
  11.   numberOfTaskSlots: 2
  12.   memory:
  13.     process:
  14.       size: 2728m
  15.     flink:
  16.       size: 2280m
  17. 。。。
  18. execution:
  19.   checkpointing:
  20.     interval: 3min
  21. 。。。
  22. rest:
  23.   address: 0.0.0.0
  24.   bind-address: 0.0.0.0
复制代码
Step4 启动flink
  1. bin/start-cluster.sh
复制代码
注:启动flink,需安装JAVA,环境变量JAVA_HOME不为空。
  3.2.2 创建同步任务

   Step1. 下载flink cdc3.2.0
  1. wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz
  2. wget https://dlcdn.apache.org/flink/flink-cdc-3.2.0/flink-cdc-3.2.0-bin.tar.gz.sha512
  3. shasum -a 512 flink-cdc-3.2.0-bin.tar.gz.sha512
复制代码
Step2. 解压文件
  1. tar -zxf flink-cdc-3.2.0-bin.tar.gz
  2. cd flink-cdc-3.2.0/
复制代码
Step3. 编写CDC YAML文件
  1. ################################################################################
  2. # Description: Sync MySQL all tables to ElasticSearch
  3. ################################################################################
  4. source:
  5.   type: mysql
  6.   hostname: [DB_IP]
  7.   port: [DB_PORT,DEFAULT 3306]
  8.   username: [DB_USER]
  9.   password: [DB_PASSWORD]
  10.   tables: [同步的表,全库[schema].\.*]
  11.   server-id: 5400-5404
  12.   server-time-zone: Asia/Shanghai
  13. sink:
  14.   type: elasticsearch
  15.   name: ES Sink
  16.   hosts: http://[ES_IP]:[ES_PORT]
  17.   username: [ES_USER]
  18.   password: [ES_PASSWORD]
  19.   version: 8
  20.   batch.size.max: 5
  21.   inflight.requests.max: 1
  22.   buffered.requests.max: 10
  23.   batch.size.max.bytes: 52428800
  24.   buffer.time.max.ms:  1000
  25.   record.size.max.bytes: 10485760
  26. pipeline:
  27.   name: Sync MySQL Database to ElasticSearch
  28.   parallelism: 1
复制代码
Step4. 上传JAR包
  将flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar,flink-cdc-pipeline-connector-elasticsearch-3.2.0.jar 上传至 FLINK_CDC_HOME/lib目录下
  将mysql-connector-java-8.0.27.jar 上传至FLINK_HOME/lib 目录下
  Step5. 创建同步任务
  1. bin/flink-cdc.sh MySQLToES.yaml --flink-home /data/flink-1.20.0
复制代码
盼望结果:
  

  3.3 环境验证

   1. Flink任务验证
  Jobs -> Running Jobs 找到之前创建的JOB,运行状态为Running
  

  等候3min,切换至checkpoints页面,checkpoints history为completed
  

  2. 查察ElasticSearch日记 
  首次同步会存在create index,create mapping日记
  

  修改Mysql数据库表内容, ES对应INDEX下的记录有被同步修改
  4. 后续

   1. Flink CDC 设置文件 elasticsearch相关设置的含义
  1.   batch.size.max: 5
  2.   inflight.requests.max: 1
  3.   buffered.requests.max: 10
  4.   batch.size.max.bytes: 52428800
  5.   buffer.time.max.ms:  1000
  6.   record.size.max.bytes: 10485760
复制代码
2. elasticsearch.yml中 xpack.security.enabled: true时,外部api链接 
  
  


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表