Flink CDC 1.18.1 Oracle 数据同步到postgresql

打印 上一主题 下一主题

主题 889|帖子 889|积分 2667

1、下载flink-1.18.1-bin-scala_2.12.tgz,linux通过:
  wget https://archive.apache.org/dist/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
2、oracle11g客户端安装,下载:
instantclient-basic-linux.x64-11.2.0.4.0.zip
instantclient-sdk-linux.x64-11.2.0.4.0.zip
instantclient-sqlplus-linux.x64-11.2.0.4.0.zip
以上文件,在ORACLE网站下载。
3、配置oracle客户端:
  1. [root@wn1 ~]# ls /usr/local/
  2. [root@wn1 ~]# cp instantclient-* /usr/local
  3. [root@wn1 ~]# cd /usr/local
  4. [root@wn1 ~]# unzip instantclient-basic-linux.x64-11.2.0.4.0.zip
  5. [root@wn1 ~]# unzip instantclient-sdk-linux.x64-11.2.0.4.0.zip
  6. [root@wn1 ~]# unzip instantclient-sqlplus-linux.x64-11.2.0.4.0.zip
  7. [root@wn1 ~]# mv instantclient_11_2 oracle_11
  8. [root@wn1 ~]# rm instantclient-*
  9. [root@wn1 ~]# vi /etc/profile
  10. #增加以下内容
  11. export ORACLE_HOME=/usr/local/oracle_11
  12. export PATH=.:${PATH}:$ORACLE_HOME
  13. export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ORACLE_HOME
  14. #保存退出,执行
  15. [root@wn1 ~]# source /etc/profile
  16. #修改ld配置
  17. [root@wn1 ~]#  vi /etc/ld.so.conf.d/oracle.conf
  18. #写入内容
  19. /usr/local/oracle_11
  20. #保存退出,执行
  21. [root@wn1 ~]#ldconfig
  22. #配置oracle连接参数
  23. [root@wn1 ~]# mkdir -p network/admin/
  24. [root@wn1 ~]# cd network/admin/
  25. #找一个tnsnames.ora文件,直接上传到服务器
  26. [root@wn1 ~]# cp /root/tnsnames.ora ./
  27. #测试连接
  28. [root@wn1 ~]# sqlplus sys/manager@//192.168.56.1/orcl
  29. SQL*Plus: Release 11.2.0.4.0 Production on Sun Mar 24 18:04:15 2024
  30. Copyright (c) 1982, 2013, Oracle.  All rights reserved.
  31. Connected to:
  32. Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production
  33. With the Partitioning, OLAP, Data Mining and Real Application Testing options
  34. SQL>
复制代码
4、配置oracle数据库,启用归档日志,这步必要参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/legacy-flink-cdc-sources/oracle-cdc/
5、下载oracle cdc 毗连器
wget https://maven.aliyun.com/repository/public/com/ververica/flink-sql-connector-oracle-cdc/3.0.1/flink-sql-connector-oracle-cdc-3.0.1.jar
解压:
tar zxvf flink-1.18.1-bin-scala_2.12.tgz
将flink-sql-connector-oracle-cdc-3.0.1.jar复制到flink-1.18.1/lib目录中
6、下载 flink-connector-jdbc-3.1.1-1.17.jar,postgresql-42.7.3.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
https://jdbc.postgresql.org/download/postgresql-42.7.3.jar
将jar包复制到flink-1.18.1/lib目录中
7、安装postgresql就不说了,相信你已经有了数据库了
8、修改Flink的配置文件 /home/flink/flink-1.18.1/conf/flink-conf.yaml ,主要是各种服务的绑定地址,默认为localhost,齐备改为0.0.0.0,如:rest.address: 0.0.0.0 #localhost
9、启动
  1. [flink@cn1 bin]$ ./start-cluster.sh
  2. [flink@cn1 bin]$ ./sql-client.sh
  3. Flink SQL>
  4. #创建ORACLE源表
  5. SET execution.checkpointing.interval = 3s;
  6. create table SYS_DIC_DEPT
  7. (
  8.   DEPT_CODE       STRING,
  9.   DEPT_NAME       STRING,
  10.   DEPT_ADDR       STRING,
  11.   DEPT_MEMO       STRING,
  12.   DEPT_FLAG       STRING,
  13.   DEPT_GZYZLTJFLAG STRING,
  14.   DEPT_UPPER       STRING,
  15.   PRIMARY KEY (DEPT_CODE) NOT ENFORCED
  16. )
  17. WITH (
  18.    'connector' = 'oracle-cdc',
  19.    'hostname' = '192.168.56.1',
  20.    'port' = '1521',
  21.    'username' = 'username',
  22.    'password' = '123456',
  23.    'database-name' = 'ORCL',
  24.    'schema-name' = 'schema-name',
  25.     'table-name' = 'table-name',
  26.     'debezium.log.mining.strategy'='online_catalog',
  27.         'debezium.log.mining.continuos.mine'='true',
  28.         'debezium.snapshot.mode' = 'initial',
  29.         'debezium.database.tablename.case.insensitive'='true'
  30. );
  31. Flink SQL> select * from SYS_DIC_DEPT;
复制代码
如果看不到数据,请检查ORACLE的字段是否全部大写
10、创建PG Sink:
  1. Flink SQL>
  2. create table sys_dic_dept_sink
  3. (
  4.   dept_code       STRING,
  5.   dept_name       STRING,
  6.   dept_addr       STRING,
  7.   dept_memo       STRING,
  8.   dept_flag       STRING,
  9.   dept_gzyzltjflag STRING,
  10.   dept_upper       STRING,
  11.   PRIMARY KEY (dept_code) NOT ENFORCED
  12. )
  13. with(
  14. 'connector' = 'jdbc',
  15. 'url' = 'jdbc:postgresql://192.168.56.90:5432/postgres?currentSchema=public',
  16. 'username' = 'postgres',
  17. 'password' = '123456',  
  18. 'table-name' = 'sys_dic_dept'
  19. );
复制代码
11、抽数据
  1. Flink SQL> insert into sys_dic_dept_sink select * from SYS_DIC_DEPT;
复制代码
12、查察任务执行 http://192.168.56.90:8081/#/job/running


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

罪恶克星

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表