1、hadoop集群连接本地MySQL
1.1 首先测试集群是否可以ping通本地
捏造机可以ping通网关(192.168.10.2),但不能ping通192.168.10.1,这表明题目大概出在Windows主机的防火墙设置或VMware的网络配置上。
1.1.1 检查Windows防火墙设置
确保Windows防火墙没有阻止ICMP请求(ping)可以临时禁用Windows防火墙进行测试:
- 打开命令提示符(以管理员身份运行)
- 输入以下命令来禁用防火墙
- netsh advfirewall set allprofiles state off
复制代码 然后再次实验从捏造机ping 192.168.10.1。如果现在可以ping通,说明防火墙是题目所在。需要添加一个允许ICMP的规则。
1.1.2 重新启用防火墙并添加规则
输入以下命令来重新启用防火墙
- netsh advfirewall set allprofiles state on
复制代码 在命令提示符中输入以下命令来添加允许ICMP请求的规则
- netsh advfirewall firewall add rule name="Allow ICMPv4" protocol=icmpv4:8,any dir=in action=allow
复制代码 在命令提示符中输入以下命令来添加允许3306端口的入站规则
- netsh advfirewall firewall add rule name="Allow MySQL 3306" dir=in action=allow protocol=TCP localport=3306
复制代码 1.2 捏造机连接到物理机上的MySQL服务器
需要确保几个关键点:
- MySQL服务器配置:确保MySQL服务器允许长途连接。
- 网络配置:确保捏造机和物理机在同一网络中,并且可以相互通信。
- 防火墙设置:确保没有防火墙阻止MySQL的端口(默认是3306)。
- MySQL用户权限:确保有效户权限从捏造机的IP地址连接到MySQL服务器。
1.2.1 配置MySQL服务器以允许长途连接
- 登录MySQL服务器
- 查看当前的用户权限
- SHOW GRANTS FOR 'root'@'localhost';
复制代码 - 创建一个允许长途连接的用户
- CREATE USER 'remote_user'@'%' IDENTIFIED BY '123456';
复制代码 - 给这个用户授权:
- GRANT ALL PRIVILEGES ON *.* TO 'remote_user'@'%' WITH GRANT OPTION;
- FLUSH PRIVILEGES;
复制代码 - 确保MySQL配置文件(my.cnf或my.ini)允许长途连接:
- 找到MySQL的配置文件,通常在/etc/mysql/目录下。
- 确保bind-address设置为0.0.0.0或者被解释掉,如许MySQL服务器可以监听所有网络接口。
在捏造机上执行如下命令:
- mysql -h 192.168.10.1 -u remote_user -p
复制代码 已经乐成配置了从Hadoop集群的hadoop102节点访问本地MySQL服务器,并且已经解决了网络和防火墙的题目,现在可以使用SeaTunnel来全量同步news_data数据库的数据到HDFS上 。
2、seatunnel导入数据库全量数据到HDFS(单表)
2.1 下载并安装SeaTunnel
官方网站:Apache SeaTunnel | Apache SeaTunnel
导入到/opt/software下,执行如下命令解压
- tar -xzvf apache-seatunnel-2.3.8-bin.tar.gz -C /opt/module/
复制代码 2.2 安装Connector插件
从2.2.0-beta版本开始,二进制包默认不提供connector依赖。因此,需要执行以下命令来安装所需的connector插件:
- sh bin/install_plugin.sh $version
复制代码 2.3 驱动jar包
- 下载MySQL JDBC驱动: 确保下载了正确版本的MySQL JDBC驱动jar包。从MySQL官网下载。
- 将驱动添加到SeaTunnel的lib目录: 将下载的MySQL JDBC驱动jar包放到SeaTunnel的lib目录下。如许,当SeaTunnel启动时,它就能够加载所需的MySQL驱动类
2.4 配置SeaTunnel使命
配置文件mysql_to_hdfs.conf,并将其放置在SeaTunnel的config目录下。这个配置文件应该包含了源(MySQL)和吸收器(HDFS)的正确配置。
- env {
- execution.parallelism = 1 # 根据你的集群资源调整并行度
- }
- source {
- JDBC {
- driver = "com.mysql.cj.jdbc.Driver"
- url = "jdbc:mysql://192.168.10.1:3306/news_data?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
- user = "remote_user"
- password = "123456"
- query = "SELECT * FROM news_information" # 使用查询语句来导入数据
- }
- }
- sink {
- HdfsFile {
- fs.defaultFS = "hdfs://hadoop102:8020" # HDFS NameNode 地址
- path = "/news" # HDFS 目标目录路径
- file_type = "text" # 文件类型,例如 text, csv, orc 等
- field_delimiter = "," # 字段分隔符,根据实际情况调整
- file_format_type = "text" # 指定文件格式类型为文本
- }
- }
复制代码 选择执行模式
SeaTunnel支持多种执行模式,包罗本地模式(local)、独立模式(standalone)、集群模式(yarn/cluster)等。对于大多数大数据使命,推荐使用集群模式来充分利用Hadoop集群的资源。
2.5 环境变量配置
/opt/module/hadoop/etc/hadoop是包含Hadoop配置文件的目录。需要将这个目录路径设置为HADOOP_CONF_DIR环境变量,以便SeaTunnel的Spark作业能够正确地连接到YARN集群。
使用如下命令
- sudo vim /etc/profile.d/my_env.sh
复制代码 在环境变量最后添加如下
- export HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop
复制代码 运行以下命令使变量立刻生效
- source /etc/profile.d/my_env.sh
复制代码 运行命令
- ./bin/start-seatunnel-spark-3-connector-v2.sh --config ./config/mysql_to_hdfs.conf --master yarn
复制代码 SeaTunnel作业通过Spark提交到YARN集群,启动一个SeaTunnel作业,从MySQL读取数据并将其写入到HDFS中,使用YARN作为资源管理器。
3、seatunnel导入数据库全量数据到HDFS(多表)
在seatunnel的安装目录下创建脚本mysql_full_hdfs.sh
- #!/bin/bash
- # 获取当前日期
- current_date=$(date +%Y-%m-%d)
- # 定义数据库连接信息
- DB_URL="jdbc:mysql://192.168.10.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
- DB_USER="remote_user"
- DB_PASSWORD="123456"
- # 定义HDFS路径
- HDFS_PATH="hdfs://hadoop102:8020"
- # 定义表名数组
- TABLES=("stu" "t_user")
- # 循环处理每个表
- for TABLE in "${TABLES[@]}"; do
- # 创建配置文件内容,包含当前日期
- conf_file="/opt/module/apache-seatunnel-2.3.8/config/${TABLE}_to_hdfs_${current_date}.conf"
- cat > "$conf_file" << EOF
- env {
- execution.parallelism = 1
- }
- source {
- JDBC {
- driver = "com.mysql.cj.jdbc.Driver"
- url = "${DB_URL}"
- user = "${DB_USER}"
- password = "${DB_PASSWORD}"
- query = "SELECT * FROM ${TABLE}"
- }
- }
- sink {
- HdfsFile {
- fs.defaultFS = "${HDFS_PATH}"
- path = "/news/${TABLE}_full/${current_date}" # HDFS 目标目录路径,每个表都有自己的目录,并且包含日期
- file_type = "parquet" # 文件类型改为Parquet
- file_format_type = "parquet" # 文件格式类型为Parquet
- parquet_compression_CODEC = "gzip" # 使用gzip压缩
- }
- }
- EOF
- # 运行Seatunnel任务
- /opt/module/apache-seatunnel-2.3.8/bin/start-seatunnel-spark-3-connector-v2.sh --config "$conf_file" --master yarn
- # 删除配置文件
- rm "$conf_file"
- done
复制代码 使用chmod +x 脚本,再运行脚本。
4、MySQL到Kafka的及时数据流和变更数据捕获(单节点)
Debezium在Kafka Connect集群中监控MySQL数据库变更
4.1 MySQL配置
创建用户:首先,需要使用 CREATE USER 命令来创建一个新的用户。
- CREATE USER 'debezium'@'192.168.10.102' IDENTIFIED BY '123456';
复制代码 授予权限:然后,可以使用 GRANT 命令来授予这个用户所需的权限。
- GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';
复制代码 为用户 debezium 授予 LOCK TABLES 权限
请留意,授予 LOCK TABLES 权限大概会影响数据库的并发性,由于它允许用户锁定表。在授予权限时,应始终确保遵照最小权限原则,只授予用户完成其使命所需的权限。
- GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';
复制代码 为用户 debezium 授予 RELOAD 权限
- GRANT RELOAD ON *.* TO 'debezium'@'192.168.10.102';
复制代码 请留意,授予 RELOAD 权限允许用户执行 FLUSH TABLES WITH READ LOCK 命令,这在 Debezium 连接器进行初始快照时是须要的。确保在授予权限时,用户 debezium 已经存在于 MySQL 中,并且使用的是正确的主机地址(在本例中为捏造机的 IP 地址 192.168.10.102)。
革新权限:最后,需要革新 MySQL 的权限设置,使新权限生效:
请留意,上述步调中的 '192.168.10.102' 应该更换为捏造机的 IP 地址,由于这是 Kafka Connect 实验连接到 MySQL 数据库的地址。
MySQL的配置文件:
- # 后续换为允许的IP
- bind-address = 0.0.0.0
- log-bin=mysql-bin
- binlog-format = ROW
- binlog-row-image = FULL
- expire_logs_days = 10
复制代码 4.2 下载安装Zookeeper
在zoo.cfg下配置如下信息:
- tickTime=2000
- initLimit=10
- syncLimit=5
- dataDir=/opt/module/zookeeper/zkData
- clientPort=2181
复制代码 在bin目录下执行
4.3 下载安装Kafka
在config目录的server.properties配置文件中,配置如下信息:
- broker.id=0
- advertised.listeners=PLAINTEXT://hadoop102:9092
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/opt/module/kafka/datas
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- log.retention.hours=168
- log.retention.check.interval.ms=300000
- zookeeper.connect=hadoop102:2181/kafka
- zookeeper.connection.timeout.ms=18000
- group.initial.rebalance.delay.ms=0
复制代码 在bin目录下执行
- ./kafka-server-start.sh /opt/module/kafka/config/server.properties
复制代码 4.4 下载安装Debezium
下载到/opt/software
- wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.5.Final/debezium-connector-mysql-1.9.5.Final-plugin.tar.gz
复制代码 解压到:/opt/module/kafka/libs 下
- tar -xzf debezium-connector-mysql-1.9.5.Final-plugin.tar.gz -C /opt/module/kafka/libs/
复制代码 把 debezium-connector-mysql 内里的jar包移动到libs目录下。
- cp debezium-connector-mysql/*.jar /opt/module/kafka/libs/
复制代码 在/opt/module/kafka/config目录下的connect-distributed.properties 配置文件中添加
- plugin.path=/opt/module/kafka/libs
复制代码 在config目录下创建配置文件mysql-connector.json,并且添加如下信息
- {"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"192.168.10.1","database.port":"3306","database.user":"debezium","database.password":"123456","database.server.id":"184054","database.server.name":"mydb","database.include.list":"test","database.history.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.topic":"schema-changes.test","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","database.history.kafka.topic":"dbhistory.test","database.ssl.mode":"REQUIRED","database.ssl.truststore.location":"/path/to/truststore.jks","database.ssl.truststore.password":"truststore-password","database.connectionTimeZone":"UTC"}}
复制代码 在config目录下创建配置文件debezium.properties,如下信息:
- bootstrap.servers=hadoop102:9092
- key.converter=org.apache.kafka.connect.json.JsonConverter
- value.converter=org.apache.kafka.connect.json.JsonConverter
- key.converter.schemas.enable=true
- value.converter.schemas.enable=true
- internal.key.converter=org.apache.kafka.connect.json.JsonConverter
- internal.value.converter=org.apache.kafka.connect.json.JsonConverter
- internal.key.converter.schemas.enable=false
- internal.value.converter.schemas.enable=false
- offset.storage.topic=connect-offsets
- offset.flush.interval.ms=10000
- # 设置 offset topic 的复制因子为 1
- config.storage.topic=connect-configs
- config.storage.replication.factor=1
- offset.storage.replication.factor=1
- status.storage.topic=connect-status
- status.storage.replication.factor=1
- status.flush.interval.ms=5000
- group.id=connect-cluster
- name=mysql-connector
- connector.class=io.debezium.connector.mysql.MySqlConnector
- tasks.max=1
- database.hostname=192.168.10.1
- database.port=3306
- database.user=debezium
- database.password=123456
- database.server.id=184054
- database.server.name=mydb
- database.history.kafka.bootstrap.servers=hadoop102:9092
- database.history.kafka.topic=dbhistory.mydb
- database.history.store.only.monitored.tables.ddl=true
- table.include.list=test.stu,test.t_user
复制代码 将zookeeper和kafka启动起来,再启动kafka-connect。
kafka的bin目录下
- ./connect-distributed.sh /opt/module/kafka/config/debezium.properties
复制代码 执行注册命令,kafka的config目录下
- curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://hadoop102:8083/connectors/ -d @mysql-connector.json
复制代码 查看状态
- curl -s http://hadoop102:8083/connectors/mysql-connector/status
复制代码 删除
- url -X DELETE localhost:8083/connectors/mysql-connector
复制代码 5、MySQL到Kafka的及时数据流和变更数据捕获(多节点)
5.1 Zookeeper配置
- tickTime=2000
- initLimit=10
- syncLimit=5
- dataDir=/opt/module/zookeeper/zkData
- clientPort=2181server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888
复制代码 /opt/module/zookeeper/zkData目录下有一个文件myid,内里的内容是:2
- tickTime=2000
- initLimit=10
- syncLimit=5
- dataDir=/opt/module/zookeeper/zkData
- clientPort=2181server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888
复制代码 /opt/module/zookeeper/zkData目录下有一个文件myid,内里的内容是:3
- tickTime=2000
- initLimit=10
- syncLimit=5
- dataDir=/opt/module/zookeeper/zkData
- clientPort=2181server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888
复制代码 /opt/module/zookeeper/zkData目录下有一个文件myid,内里的内容是:4
5.2 Kafka配置
- broker.id=0
- advertised.listeners=PLAINTEXT://hadoop102:9092
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/opt/module/kafka/datas
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- log.retention.hours=168
- log.retention.check.interval.ms=300000
- zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
- zookeeper.connection.timeout.ms=18000
- group.initial.rebalance.delay.ms=0
复制代码
只需要修改kafka/config/server.properties
- broker.id=1
- advertised.listeners=PLAINTEXT://hadoop103:9092
复制代码
只需要修改kafka/config/server.properties
- broker.id=2
- advertised.listeners=PLAINTEXT://hadoop104:9092
复制代码 5.3 Dezebium下载安装
hadoop102、hadoop103、hadoop104下载了debezium,并且jar包放到了/opt/module/kafka/libs目录。每个节点的/opt/module/kafka/config/connect-distributed.properties配置文件内容都是
- bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
- group.id=connect-cluster
- key.converter=org.apache.kafka.connect.json.JsonConverter
- value.converter=org.apache.kafka.connect.json.JsonConverter
- key.converter.schemas.enable=true
- value.converter.schemas.enable=true
- offset.storage.topic=connect-offsets
- offset.storage.replication.factor=3
- config.storage.topic=connect-configs
- config.storage.replication.factor=3
- status.storage.topic=connect-status
- status.storage.replication.factor=3
- offset.flush.interval.ms=10000
- plugin.path=/opt/module/kafka/libs
- internal.key.converter=org.apache.kafka.connect.json.JsonConverter
- internal.value.converter=org.apache.kafka.connect.json.JsonConverter
- internal.key.converter.schemas.enable=false
- internal.value.converter.schemas.enable=false
- status.flush.interval.ms=5000
复制代码 每个节点的/opt/module/kafka/config下创建了一个配置文件:mysql-connector.json,内容如下:
- {
- "name": "mysql-connector",
- "config": {
- "connector.class": "io.debezium.connector.mysql.MySqlConnector",
- "tasks.max": "1",
- "database.hostname": "192.168.10.1",
- "database.port": "3306",
- "database.user": "debezium",
- "database.password": "123456",
- "database.server.id": "184054",
- "database.server.name": "mydb",
- "database.include.list": "news_data",
- "database.history.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
- "schema.history.internal.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
- "schema.history.internal.kafka.topic": "schema-changes.news_data",
- "key.converter": "org.apache.kafka.connect.json.JsonConverter",
- "key.converter.schemas.enable": "false",
- "value.converter": "org.apache.kafka.connect.json.JsonConverter",
- "value.converter.schemas.enable": "false",
- "database.history.kafka.topic": "dbhistory.news_data",
- "database.ssl.mode": "REQUIRED",
- "database.ssl.truststore.location": "/path/to/truststore.jks",
- "database.ssl.truststore.password": "truststore-password",
- "database.connectionTimeZone": "UTC"
- }
- }
复制代码 启动Zookeeper、Kafka,在hadoop102、hadoop103、hadoop104节点上依次启动Kafka-connect,并且在其中一台(hadoop102)注册即可。
- curl -s http://hadoop102:8083/connectors/mysql-connector/status
复制代码
- curl -s http://hadoop103:8083/connectors/mysql-connector/status
复制代码
- curl -s http://hadoop104:8083/connectors/mysql-connector/status
复制代码 都得到如下结果:
- {
- "name": "mysql-connector",
- "connector": {
- "state": "RUNNING",
- "worker_id": "192.168.10.102:8083"
- },
- "tasks": [
- {
- "id": 0,
- "state": "RUNNING",
- "worker_id": "192.168.10.102:8083"
- }
- ],
- "type": "source"
- }
复制代码 当hadoop102的Kafka-connect关闭时
- curl -s http://hadoop103:8083/connectors/mysql-connector/status
复制代码
- curl -s http://hadoop104:8083/connectors/mysql-connector/status
复制代码 都得到如下结果:
- {
- "name": "mysql-connector",
- "connector": {
- "state": "RUNNING",
- "worker_id": "192.168.10.103:8083"
- },
- "tasks": [
- {
- "id": 0,
- "state": "RUNNING",
- "worker_id": "192.168.10.103:8083"
- }
- ],
- "type": "source"
- }
复制代码 乐成地在多节点上配置了Zookeeper、Kafka和Debezium连接器,并且能够在Kafka Connect集群中注册和运行Debezium连接器
6、kafka写入HDFS
在hadoop102节点上的flume下编写一个配置文件kafka_to_hdfs.conf,内容如下:
- # Define the sources, sinks, and channels
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # # Configure the source
- a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
- a1.sources.r1.batchSize = 1000
- a1.sources.r1.batchDurationMillis = 2000
- a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
- a1.sources.r1.kafka.topics = mydb.news_data.comment_information,mydb.news_data.media_information,mydb.news_data.news_information,mydb.news_data.user_information,mydb.news_data.user_auth
- # # Interceptor configuration
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type = org.atguigu.news.flume.interceptor.TimestampInterceptor$Builder
- # # Configure the channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 10000
- a1.channels.c1.transactionCapacity = 1000
- # # Configure the sink
- a1.sinks.k1.type = hdfs
- a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/news_data/%{table}_inc/%Y-%m-%d
- a1.sinks.k1.hdfs.filePrefix = events
- a1.sinks.k1.hdfs.fileType = DataStream
- a1.sinks.k1.hdfs.writeFormat = Text
- a1.sinks.k1.hdfs.rollInterval = 0
- a1.sinks.k1.hdfs.rollSize = 134217728
- a1.sinks.k1.hdfs.rollCount = 0
- a1.sinks.k1.hdfs.useLocalTimeStamp = true
- # # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
复制代码 编写拦截器代码打jar包放入hadoop102的flume的lib目录下,jar包代码如下:
- package org.atguigu.news.flume.interceptor;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- import java.util.Map;
- public class TimestampInterceptor implements Interceptor {
- @Override
- public void initialize() {
- // Initialization logic, if needed
- }
- @Override
- public Event intercept(Event event) {
- // 1. Get header and body data
- Map<String, String> headers = event.getHeaders();
- String body = new String(event.getBody(), StandardCharsets.UTF_8);
- try {
- // 2. Parse body data as JSON
- JSONObject jsonObject = JSONObject.parseObject(body);
- // 3. Extract timestamp and table name
- String ts = jsonObject.getString("ts_ms");
- JSONObject source = jsonObject.getJSONObject("source");
- String tableName = source.getString("table");
- // 4. Put timestamp and table name into headers
- headers.put("timestamp", ts);
- headers.put("table", tableName);
- return event;
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
- @Override
- public List<Event> intercept(List<Event> events) {
- for (Event event : events) {
- if (intercept(event) == null) {
- events.remove(event);
- }
- }
- return events;
- }
- @Override
- public void close() {
- // Clean up resources, if needed
- }
- public static class Builder implements Interceptor.Builder {
- @Override
- public Interceptor build() {
- return new TimestampInterceptor();
- }
- @Override
- public void configure(Context context) {
- // Configuration logic, if needed
- }
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |