一、Flink部署
1.1、JAVA环境
- vi /etc/profile
- export JAVA_HOME=/data/flinkcdc/jdk1.8.0_181
- export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
- export PATH=$JAVA_HOME/bin:$PATH
- source /etc/profile
- vi ~/.bash_profile
- export FLINK_HOME=/data/flinkcdc/flink-1.17.0
- export PATH=$PATH:$FLINK_HOME/bin
- source ~/.bash_profile
复制代码 1.2、设置Flink
- vim conf/flink-conf.yaml
- 添加配置:env.java.home=/data/flinkcdc/jdk1.8.0_181
- ①、localhost 修改为IP地址
- rest.port: 8088
- rest.address: 192.168.33.231
- ②、关闭防火墙
- systemctl status firewalld
- systemctl stop firewalld
复制代码 1.3、Flink CDC Jar包
CDC jar放到Flink安装包解压之后的lib目次
1.4、启动flink
- bin/start-cluster.sh
- Flink Web-UI
- http://192.168.33.231:8088
复制代码 1.5、启动 Flink SQL CLI
二、达梦数据库搭建
2.1、docker dm8
- docker run -d \
- --name dm8 \
- --restart=always \
- --privileged=true \
- -e LD_LIBRARY_PATH=/opt/dmdbms/bin \
- -e PAGE_SIZE=16 \
- -e EXTENT_SIZE=32 \
- -e LOG_SIZE=1024 \
- -e CASE_SENSITIVE=0 \
- -e UNICODE_FLAG=1 \
- -e INSTANCE_NAME=DM8_CDC \
- -e SYSDBA_PWD=SYSDBA001 \
- -v /docker/dm8_data_cdc:/opt/dmdbms/data \
- -p 5236:5236 \
- dm8_flinkcdc:dm8
复制代码 查察容器运行情况
- 查看数据库容器
- lsof -i:5236
- docker logs -f dm8
- docker exec -it dm8 bash
复制代码 2.2、开启达梦日志归档
- ##查看当前数据库是否开启归档
- select arch_mode from v$database;
- ##查询有哪些归档日志
- SELECT NAME , FIRST_TIME , NEXT_TIME , FIRST_CHANGE# , NEXT_CHANGE# FROM V$ARCHIVED_LOG;
- SELECT * FROM V$ARCH_FILE
- ##修改数据库实例的 /dmdata/DAMEGN/dm.ini文件中 ARCH_INI 参数值
- vi /dmdata/DAMENG/dm.ini
- ##将 ARCH_INI 值改为 1,保存后退出
- ARCH_INI = 1 #开启归档功能
- RLOG_APPEND_LOGIC = 1
- ##新增文件dmarch.ini
- vi /dmdata/DAMENG/dmarch.ini
- ##新增如下内容
- [ARCHIVE_LOCAL1]
- ARCH_TYPE = LOCAL
- ARCH_DEST = /dmarch
- ARCH_FILE_SIZE = 2048
- ARCH_SPACE_LIMIT = 102400
- ##最后重启数据库完成归档配置
- #DaMeng Database Archive Configuration file
- #this is comments
- ARCH_WAIT_APPLY = 0
- [ARCHIVE_LOCAL1]
- ARCH_TYPE = LOCAL
- ARCH_DEST = /opt/dmdbms/data/DAMENG/arch
- ARCH_FILE_SIZE = 1024
- ARCH_SPACE_LIMIT = 51200
- ARCH_FLUSH_BUF_SIZE = 0
- ARCH_HANG_FLAG = 1
复制代码 2.3、重启dm8数据库
三、实时同步测试
- ##达梦
- CREATE TABLE t_source_dm (
- id INT,
- name VARCHAR,
- insert_date DATE,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'dm',
- 'startupOptions' = 'Initial',
- 'hostname' = '192.168.33.231',
- 'port' = '5236',
- 'username' = 'SYSDBA',
- 'password' = 'SYSDBA001',
- 'database' = 'DM8_CDC',
- 'schema' = 'SYSDBA',
- 'table' = 'dm_flinkcdc'
- );
- ##MYSQL
- CREATE TABLE sink_mysql_test (
- id int,
- name STRING,
- insert_date date,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://192.168.33.231:3306/flinkcdc',
- 'driver' = 'com.mysql.cj.jdbc.Driver',
- 'username' = 'root',
- 'password' = 'YTP1101102233',
- 'table-name' = 'dm_to_mysql'
- );
- insert into sink_mysql_test
- select * from t_source_dm;
复制代码 四、FlinkCDC 达梦数据库 所需文件下载
4.1、所需Jar包
4.2、支持JAVA步伐和SQL
4.3、完成步伐和说明文档下载地址
https://download.csdn.net/download/ytp552200ytp/90103896
如果着实没有CSDN积分,配景接洽我留下邮箱,我看到后私信发到邮箱,支持共享。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |