flink同步mysql数据表到pg库
https://i-blog.csdnimg.cn/direct/4a83044bfe874ca294ce39d754bdce43.png1.关闭防火墙和selinux
systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld
vi /etc/selinux/config
修改为disabled
https://i-blog.csdnimg.cn/direct/e654d1ddd4fc429aae34be838a02e3e3.png
2.安装java8
yum list java-1.8*
yum install java-1.8.0-openjdk* -y
java -version
https://i-blog.csdnimg.cn/direct/57f5704167d245629ee5bf6d7761782d.png
3.下载和部署postgresql
下载地址:http://www.postgresql.org/ftp/source/
我安装的最新版本,可以看需求安装pg库版本
https://i-blog.csdnimg.cn/direct/481e898c42bb4a8b9084634957d58384.png
上传到服务器
tar -zxvf postgresql-16.2.tar.gz
./configure
如果出现以下报错解决方法:
yum -y install gcc
https://i-blog.csdnimg.cn/direct/d90007ae16d04fe7bd40f00593b665ea.png
如果出现以下错误解决方法:
yum -y install libicu libicu-devel libunwind readline-devel zlib-devel
https://i-blog.csdnimg.cn/direct/48fdcf61260245919bc94113b0a71c2e.png
make
make install
useradd postgres
groupadd postgres
useradd -g postgres postgres
id postgres
cd /usr/local/pgsql/
mkdir data
chown postgres:postgres data
cd /home/postgres/
ll -al
https://i-blog.csdnimg.cn/direct/1fd15e3fa50f46f8bd537c45a01c3235.png
vi .bash_profile
添加
export PGHOME=/usr/local/pgsql/
export PGDATA=/usr/local/pgsql/data
PATH=$PATH:$HOME/bin:$PGHOME/bin
https://i-blog.csdnimg.cn/direct/0c69560fdd0a46d09d638a7873cea097.png
source .bash_profile
su postgres
initdb
https://i-blog.csdnimg.cn/direct/b0bff4272c4a4e4bb6f243843de70228.png
cd /usr/local/pgsql/data/
vi postgresql.conf
修改为listen_addresses = '*'
https://i-blog.csdnimg.cn/direct/de4aa1deec334350ab5a681f355f6e38.png
启动pg库
pg_ctl -D /usr/local/pgsql/data -l logfile start
service postgresql start
https://i-blog.csdnimg.cn/direct/3276f4c8791b4526bfda9b0b399f97d3.png
su – postgres
psql
https://i-blog.csdnimg.cn/direct/0b718275e2054529a8c5712be229a7f2.png
4.下载和部署mysql
yum -y install wget
wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz
https://i-blog.csdnimg.cn/direct/003a2358571846c5bb90ca76d0543417.png
mv mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz /usr/local/
cd /usr/local/
tar xvJf mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz
mv mysql-8.0.28-linux-glibc2.12-x86_64 mysql-8.0
mkdir data
groupadd mysql
chown -R mysql.mysql /usr/local/mysql-8.0
cd mysql-8.0/bin/
mkdir data
./mysqld --user=mysql --basedir=/usr/local/mysql-8.0 --datadir=/usr/local/mysql-8.0/data/ --initialize
https://i-blog.csdnimg.cn/direct/182dcdb6e3b24a91bdc94e14afbe2ace.png
圈起来的部分为背面数据库登岸的初始密码
vi /etc/my.cnf
basedir=/usr/local/mysql-8.0/
datadir=/usr/local/mysql-8.0/data/
socket=/tmp/mysql.sock
character-set-server=UTF8MB4
https://i-blog.csdnimg.cn/direct/5f5d52f144694fd8b8dff8651691b7d1.png
cp -a .././support-files/mysql.server /etc/init.d/mysql
chmod +x /etc/init.d/mysql
chkconfig --add mysql
service mysql status
service mysql start
ln -s /usr/local/mysql-8.0/bin/mysql /usr/bin
https://i-blog.csdnimg.cn/direct/76a7254c737343bc8fd158ed9ebf683f.png
登岸
mysql -u root -p
输入向前初始化给的初始密码
https://i-blog.csdnimg.cn/direct/7f81ee5ba497491b807979131825725f.png
修改mysql密码
https://i-blog.csdnimg.cn/direct/523e0f91b93e4a7899b2a070378a7684.png
登岸
https://i-blog.csdnimg.cn/direct/b2b7ca5030964b3f8b8f4c37b0681c71.png
5.下载部署flink
flink下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
https://i-blog.csdnimg.cn/direct/995b36341ef3492f86faa96cb5d154dc.png
下载flink sql所需驱动
1. flink-connector-jdbc-3.1.1-1.17.jar 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/
2. flink-sql-connector-mysql-cdc-3.0.1.jar 下载地址:https://github.com/ververica/flink-cdc-connectors/releases
3. flink-sql-connector-postgres-cdc-3.0.1.jar 下载地址:https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/3.0.1
4. mysql-connector-java-8.0.28.jar 下载地址:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/
https://i-blog.csdnimg.cn/direct/4c7bd41f2ce44ea1a142bdb709495e61.png
https://i-blog.csdnimg.cn/direct/b06601cd02864d49a1346e92c213371e.png
https://i-blog.csdnimg.cn/direct/4c57c9b0285d4d7e8a61522a6478aa06.png
上传flink压缩包并解压
tar -zxvf flink-1.18.1-bin-scala_2.12.tgz
进入flink的lib目录上传四个依赖
cd flink-1.18.1/lib/
需要修改一下flink设置文件
不然会出现以下环境
https://i-blog.csdnimg.cn/direct/44ee6a6e68ff42f4ba31bb58efc0f795.png
vi flink-1.18.1/conf/flink-conf.yaml
原来是localhost修改为ip
https://i-blog.csdnimg.cn/direct/6fda68c50dd44fa5a5cab7724a7abdd5.png
./flink-1.18.1/bin/start-cluster.sh
访问 192.168.207.193:8081 (默认是8081端口 可在设置文件里修改)
https://i-blog.csdnimg.cn/direct/29762da41a824004a99205149cfe27b1.png
6.实时同步mysql数据到postgresql
数据库先创建一个库,在库里创建表再添加数据
create database ljq;
use ljq
CREATE TABLE players (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);;
insert into players (player_id,team_id,player_name,height) values
(1001,1001,'韦德','1.93'),
(1002,1002,'雷吉','1.91'),
(1003,1003,'安德烈','2.11'),
(1004,1004,'索恩','2.16'),
(1005,1005,'兰斯顿','1.88'),
(1006,1006,'格伦','1.98'),
(1007,1007,'伊斯梅尔','1.83'),
(1008,1008,'扎扎','2.11'),
(1009,1009,'乔恩','2.08');
select * from players;
https://i-blog.csdnimg.cn/direct/8b4dc82f52b64641b8a725dc74fde1f0.png
https://i-blog.csdnimg.cn/direct/8cfef948014949ffbaf991f604204860.png
在pg库创建一个库,在库里创建一个表不插入数据
create database ljq;
\c ljq
CREATE TABLE players3 (
player_id INT NOT NULL,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id)
);
https://i-blog.csdnimg.cn/direct/fa40f9a5a9254117afa026070d7f8eef.png
启动flink-sql
./flink-1.18.1/bin/sql-client.sh embedded
根据需要同步的数据创建源表
CREATE TABLE nbaplayers (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.207.193',
'port' = '3306',
'username' = 'root',
'password' = 'linux123',
'database-name' = 'ljq',
'table-name' = 'players',
'server-time-zone' = 'Asia/Shanghai'
);
https://i-blog.csdnimg.cn/direct/eb11e9e268c841caba5a05da8f6aedc8.png
检察表时出现报错! https://i-blog.csdnimg.cn/direct/3ef793474eb649f696d6c9272fe0a90b.png
解决方法:修改为答应所有ip访问
use mysql;
select user,host from user;
update user set host = '%' where user = 'root';
flush privileges;
select user,host from user;
https://i-blog.csdnimg.cn/direct/567aa31eed8e4242aaf979684463fafd.png
https://i-blog.csdnimg.cn/direct/38029bb425544148a4902fd8fcaa63c0.png
重新检察表
select * from nbaplayers;·
https://i-blog.csdnimg.cn/direct/680958b99e574d889fb949c8c5df6a0d.png
创建结果表
CREATE TABLE nba (
player_id INT,
team_id INT,
player_name VARCHAR,
height NUMERIC(3,2),
PRIMARY KEY (player_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.207.193:5432/ljq??currentSchema=public&reWriteBatchedInserts=true',
'username' = 'postgres',
'password' = 'linux123',
'table-name' = 'players3'
);
实行从源表插入结果表操作,生成同步作业
INSERT INTO nba
SELECT
player_id,
team_id,
player_name,
height
FROM nbaplayers;
Web端检察
https://i-blog.csdnimg.cn/direct/74a019928ff349659d76fa72ff2604e7.png
检察是否同步数据到pg库的players3表
https://i-blog.csdnimg.cn/direct/8407045d6aee4d479b2c6b26fe2a20c9.png
测试一下增删改是否同步
1.增
mysql实行
insert into players (player_id,team_id,player_name,height) values (10001,10002,'韦德2','1.95');
https://i-blog.csdnimg.cn/direct/b9e1de2bd7f94c368cde560c69996452.png
pg库检察
https://i-blog.csdnimg.cn/direct/d513b0cf4f964525a04f712a252bc8e7.png
2.删
mysql实行
delete from players where player_id = 10001;
https://i-blog.csdnimg.cn/direct/4a719b253ca944afa0e4f682587fbee0.png
pg库检察
https://i-blog.csdnimg.cn/direct/feb3129686524f07a2690600656cd6b5.png
3.改
mysql实行
update players set player_name='高大的姚明' where player_id=1001;
https://i-blog.csdnimg.cn/direct/629098f2c9ee4179919ceb0be2314bd8.png
pg库检察
https://i-blog.csdnimg.cn/direct/40592310dc9c4455bbdd58ad659d0e99.png
注:字符串小数点精确问题
https://i-blog.csdnimg.cn/direct/bb0afdb947db4d42b4528414e116ca2c.pnghttps://i-blog.csdnimg.cn/direct/c217c67d28524b5d92b67b2bc31fee4d.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]