Debezium和SeaTunnel实现MySQL到Hadoop的及时数据流和全量同步(基于尚硅谷 ...

锦通  金牌会员 | 2024-10-26 15:48:15 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 894|帖子 894|积分 2682

1、hadoop集群连接本地MySQL

1.1 首先测试集群是否可以ping通本地

捏造机可以ping通网关(192.168.10.2),但不能ping通192.168.10.1,这表明题目大概出在Windows主机的防火墙设置或VMware的网络配置上。
1.1.1 检查Windows防火墙设置

确保Windows防火墙没有阻止ICMP请求(ping)可以临时禁用Windows防火墙进行测试:


  • 打开命令提示符(以管理员身份运行)
  • 输入以下命令来禁用防火墙
  1. netsh advfirewall set allprofiles state off
复制代码
然后再次实验从捏造机ping 192.168.10.1。如果现在可以ping通,说明防火墙是题目所在。需要添加一个允许ICMP的规则。
1.1.2 重新启用防火墙并添加规则

输入以下命令来重新启用防火墙
  1. netsh advfirewall set allprofiles state on
复制代码
在命令提示符中输入以下命令来添加允许ICMP请求的规则
  1. netsh advfirewall firewall add rule name="Allow ICMPv4" protocol=icmpv4:8,any dir=in action=allow
复制代码
在命令提示符中输入以下命令来添加允许3306端口的入站规则
  1. netsh advfirewall firewall add rule name="Allow MySQL 3306" dir=in action=allow protocol=TCP localport=3306
复制代码
1.2 捏造机连接到物理机上的MySQL服务器

需要确保几个关键点:

  • MySQL服务器配置:确保MySQL服务器允许长途连接。
  • 网络配置:确保捏造机和物理机在同一网络中,并且可以相互通信。
  • 防火墙设置:确保没有防火墙阻止MySQL的端口(默认是3306)。
  • MySQL用户权限:确保有效户权限从捏造机的IP地址连接到MySQL服务器。
1.2.1 配置MySQL服务器以允许长途连接


  • 登录MySQL服务器
    1. mysql -uroot -p123456
    复制代码
  • 查看当前的用户权限
    1. SHOW GRANTS FOR 'root'@'localhost';
    复制代码
  • 创建一个允许长途连接的用户
    1. CREATE USER 'remote_user'@'%' IDENTIFIED BY '123456';
    复制代码
  • 给这个用户授权
    1. GRANT ALL PRIVILEGES ON *.* TO 'remote_user'@'%' WITH GRANT OPTION;
    2. FLUSH PRIVILEGES;
    复制代码
  • 确保MySQL配置文件(my.cnf或my.ini)允许长途连接

    • 找到MySQL的配置文件,通常在/etc/mysql/目录下。
    • 确保bind-address设置为0.0.0.0或者被解释掉,如许MySQL服务器可以监听所有网络接口。


在捏造机上执行如下命令:
  1. mysql -h 192.168.10.1 -u remote_user -p
复制代码
已经乐成配置了从Hadoop集群的hadoop102节点访问本地MySQL服务器,并且已经解决了网络和防火墙的题目,现在可以使用SeaTunnel来全量同步news_data数据库的数据到HDFS上 。

2、seatunnel导入数据库全量数据到HDFS(单表)

2.1 下载并安装SeaTunnel

官方网站:Apache SeaTunnel | Apache SeaTunnel

导入到/opt/software下,执行如下命令解压
  1. tar -xzvf apache-seatunnel-2.3.8-bin.tar.gz -C /opt/module/
复制代码
2.2 安装Connector插件

从2.2.0-beta版本开始,二进制包默认不提供connector依赖。因此,需要执行以下命令来安装所需的connector插件:
  1. sh bin/install_plugin.sh $version
复制代码
 2.3 驱动jar包



  • 下载MySQL JDBC驱动: 确保下载了正确版本的MySQL JDBC驱动jar包。从MySQL官网下载。
  • 将驱动添加到SeaTunnel的lib目录: 将下载的MySQL JDBC驱动jar包放到SeaTunnel的lib目录下。如许,当SeaTunnel启动时,它就能够加载所需的MySQL驱动类
2.4 配置SeaTunnel使命

配置文件mysql_to_hdfs.conf,并将其放置在SeaTunnel的config目录下。这个配置文件应该包含了源(MySQL)和吸收器(HDFS)的正确配置。
  1. env {
  2.   execution.parallelism = 1 # 根据你的集群资源调整并行度
  3. }
  4. source {
  5.   JDBC {
  6.     driver = "com.mysql.cj.jdbc.Driver"
  7.     url = "jdbc:mysql://192.168.10.1:3306/news_data?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
  8.     user = "remote_user"
  9.     password = "123456"
  10.     query = "SELECT * FROM news_information"  # 使用查询语句来导入数据
  11.   }
  12. }
  13. sink {
  14.   HdfsFile {
  15.     fs.defaultFS = "hdfs://hadoop102:8020"  # HDFS NameNode 地址
  16.     path = "/news"  # HDFS 目标目录路径
  17.     file_type = "text"  # 文件类型,例如 text, csv, orc 等
  18.     field_delimiter = ","  # 字段分隔符,根据实际情况调整
  19.     file_format_type = "text"  # 指定文件格式类型为文本
  20.   }
  21. }
复制代码
选择执行模式
 SeaTunnel支持多种执行模式,包罗本地模式(local)、独立模式(standalone)、集群模式(yarn/cluster)等。对于大多数大数据使命,推荐使用集群模式来充分利用Hadoop集群的资源。
2.5 环境变量配置

/opt/module/hadoop/etc/hadoop是包含Hadoop配置文件的目录。需要将这个目录路径设置为HADOOP_CONF_DIR环境变量,以便SeaTunnel的Spark作业能够正确地连接到YARN集群。
使用如下命令
  1. sudo vim /etc/profile.d/my_env.sh
复制代码
在环境变量最后添加如下
  1. export HADOOP_CONF_DIR=/opt/module/hadoop/etc/hadoop
复制代码
运行以下命令使变量立刻生效
  1. source /etc/profile.d/my_env.sh
复制代码
运行命令
  1. ./bin/start-seatunnel-spark-3-connector-v2.sh --config ./config/mysql_to_hdfs.conf --master yarn
复制代码
SeaTunnel作业通过Spark提交到YARN集群,启动一个SeaTunnel作业,从MySQL读取数据并将其写入到HDFS中,使用YARN作为资源管理器。
 3、seatunnel导入数据库全量数据到HDFS(多表)

在seatunnel的安装目录下创建脚本mysql_full_hdfs.sh
  1. #!/bin/bash
  2. # 获取当前日期
  3. current_date=$(date +%Y-%m-%d)
  4. # 定义数据库连接信息
  5. DB_URL="jdbc:mysql://192.168.10.1:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
  6. DB_USER="remote_user"
  7. DB_PASSWORD="123456"
  8. # 定义HDFS路径
  9. HDFS_PATH="hdfs://hadoop102:8020"
  10. # 定义表名数组
  11. TABLES=("stu" "t_user")
  12. # 循环处理每个表
  13. for TABLE in "${TABLES[@]}"; do
  14.   # 创建配置文件内容,包含当前日期
  15.   conf_file="/opt/module/apache-seatunnel-2.3.8/config/${TABLE}_to_hdfs_${current_date}.conf"
  16.   cat > "$conf_file" << EOF
  17. env {
  18.   execution.parallelism = 1
  19. }
  20. source {
  21.   JDBC {
  22.     driver = "com.mysql.cj.jdbc.Driver"
  23.     url = "${DB_URL}"
  24.     user = "${DB_USER}"
  25.     password = "${DB_PASSWORD}"
  26.     query = "SELECT * FROM ${TABLE}"
  27.   }
  28. }
  29. sink {
  30.   HdfsFile {
  31.     fs.defaultFS = "${HDFS_PATH}"
  32.     path = "/news/${TABLE}_full/${current_date}"  # HDFS 目标目录路径,每个表都有自己的目录,并且包含日期
  33.     file_type = "parquet"  # 文件类型改为Parquet
  34.     file_format_type = "parquet"  # 文件格式类型为Parquet
  35.     parquet_compression_CODEC = "gzip"  # 使用gzip压缩
  36.   }
  37. }
  38. EOF
  39.   # 运行Seatunnel任务
  40.   /opt/module/apache-seatunnel-2.3.8/bin/start-seatunnel-spark-3-connector-v2.sh --config "$conf_file" --master yarn
  41.   # 删除配置文件
  42.   rm "$conf_file"
  43. done
复制代码
使用chmod +x 脚本,再运行脚本。
4、MySQL到Kafka的及时数据流和变更数据捕获(单节点)

Debezium在Kafka Connect集群中监控MySQL数据库变更
4.1 MySQL配置

创建用户:首先,需要使用 CREATE USER 命令来创建一个新的用户。
  1. CREATE USER 'debezium'@'192.168.10.102' IDENTIFIED BY '123456';
复制代码
授予权限:然后,可以使用 GRANT 命令来授予这个用户所需的权限。
  1. GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';
复制代码
为用户 debezium 授予 LOCK TABLES 权限
请留意,授予 LOCK TABLES 权限大概会影响数据库的并发性,由于它允许用户锁定表。在授予权限时,应始终确保遵照最小权限原则,只授予用户完成其使命所需的权限。
  1. GRANT SUPER, REPLICATION CLIENT, SELECT ON *.* TO 'debezium'@'192.168.10.102';
复制代码
为用户 debezium 授予 RELOAD 权限 
  1. GRANT RELOAD ON *.* TO 'debezium'@'192.168.10.102';
复制代码
请留意,授予 RELOAD 权限允许用户执行 FLUSH TABLES WITH READ LOCK 命令,这在 Debezium 连接器进行初始快照时是须要的。确保在授予权限时,用户 debezium 已经存在于 MySQL 中,并且使用的是正确的主机地址(在本例中为捏造机的 IP 地址 192.168.10.102)。
革新权限:最后,需要革新 MySQL 的权限设置,使新权限生效:
  1. FLUSH PRIVILEGES;
复制代码
请留意,上述步调中的 '192.168.10.102' 应该更换为捏造机的 IP 地址,由于这是 Kafka Connect 实验连接到 MySQL 数据库的地址。
MySQL的配置文件:
  1. # 后续换为允许的IP
  2. bind-address = 0.0.0.0
  3. log-bin=mysql-bin
  4. binlog-format = ROW
  5. binlog-row-image = FULL
  6. expire_logs_days = 10
复制代码
4.2 下载安装Zookeeper

在zoo.cfg下配置如下信息:
  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/opt/module/zookeeper/zkData
  5. clientPort=2181
复制代码
在bin目录下执行
  1. ./zkServer.sh start 
复制代码
4.3 下载安装Kafka

在config目录的server.properties配置文件中,配置如下信息:
  1. broker.id=0
  2. advertised.listeners=PLAINTEXT://hadoop102:9092
  3. num.network.threads=3
  4. num.io.threads=8
  5. socket.send.buffer.bytes=102400
  6. socket.receive.buffer.bytes=102400
  7. socket.request.max.bytes=104857600
  8. log.dirs=/opt/module/kafka/datas
  9. num.partitions=1
  10. num.recovery.threads.per.data.dir=1
  11. offsets.topic.replication.factor=1
  12. transaction.state.log.replication.factor=1
  13. transaction.state.log.min.isr=1
  14. log.retention.hours=168
  15. log.retention.check.interval.ms=300000
  16. zookeeper.connect=hadoop102:2181/kafka
  17. zookeeper.connection.timeout.ms=18000
  18. group.initial.rebalance.delay.ms=0
复制代码
在bin目录下执行
  1. ./kafka-server-start.sh /opt/module/kafka/config/server.properties 
复制代码
4.4 下载安装Debezium

下载到/opt/software
  1. wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.5.Final/debezium-connector-mysql-1.9.5.Final-plugin.tar.gz
复制代码
解压到:/opt/module/kafka/libs 下
  1. tar -xzf debezium-connector-mysql-1.9.5.Final-plugin.tar.gz -C /opt/module/kafka/libs/
复制代码
把 debezium-connector-mysql 内里的jar包移动到libs目录下。
  1. cp debezium-connector-mysql/*.jar /opt/module/kafka/libs/
复制代码
在/opt/module/kafka/config目录下的connect-distributed.properties 配置文件中添加
  1. plugin.path=/opt/module/kafka/libs
复制代码
在config目录下创建配置文件mysql-connector.json,并且添加如下信息
  1. {"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"192.168.10.1","database.port":"3306","database.user":"debezium","database.password":"123456","database.server.id":"184054","database.server.name":"mydb","database.include.list":"test","database.history.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.bootstrap.servers":"hadoop102:9092","schema.history.internal.kafka.topic":"schema-changes.test","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","database.history.kafka.topic":"dbhistory.test","database.ssl.mode":"REQUIRED","database.ssl.truststore.location":"/path/to/truststore.jks","database.ssl.truststore.password":"truststore-password","database.connectionTimeZone":"UTC"}}
复制代码
 在config目录下创建配置文件debezium.properties,如下信息:
  1. bootstrap.servers=hadoop102:9092
  2. key.converter=org.apache.kafka.connect.json.JsonConverter
  3. value.converter=org.apache.kafka.connect.json.JsonConverter
  4. key.converter.schemas.enable=true
  5. value.converter.schemas.enable=true
  6. internal.key.converter=org.apache.kafka.connect.json.JsonConverter
  7. internal.value.converter=org.apache.kafka.connect.json.JsonConverter
  8. internal.key.converter.schemas.enable=false
  9. internal.value.converter.schemas.enable=false
  10. offset.storage.topic=connect-offsets
  11. offset.flush.interval.ms=10000
  12. # 设置 offset topic 的复制因子为 1
  13. config.storage.topic=connect-configs
  14. config.storage.replication.factor=1
  15. offset.storage.replication.factor=1
  16. status.storage.topic=connect-status
  17. status.storage.replication.factor=1
  18. status.flush.interval.ms=5000
  19. group.id=connect-cluster
  20. name=mysql-connector
  21. connector.class=io.debezium.connector.mysql.MySqlConnector
  22. tasks.max=1
  23. database.hostname=192.168.10.1
  24. database.port=3306
  25. database.user=debezium
  26. database.password=123456
  27. database.server.id=184054
  28. database.server.name=mydb
  29. database.history.kafka.bootstrap.servers=hadoop102:9092
  30. database.history.kafka.topic=dbhistory.mydb
  31. database.history.store.only.monitored.tables.ddl=true
  32. table.include.list=test.stu,test.t_user
复制代码
将zookeeper和kafka启动起来,再启动kafka-connect。
kafka的bin目录下
  1. ./connect-distributed.sh /opt/module/kafka/config/debezium.properties
复制代码
执行注册命令,kafka的config目录下
  1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://hadoop102:8083/connectors/ -d @mysql-connector.json
复制代码
查看状态
  1. curl -s http://hadoop102:8083/connectors/mysql-connector/status
复制代码
删除
  1. url -X DELETE localhost:8083/connectors/mysql-connector
复制代码
 5、MySQL到Kafka的及时数据流和变更数据捕获(多节点)

5.1 Zookeeper配置



  • hadoop102节点上
  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/opt/module/zookeeper/zkData
  5. clientPort=2181server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888
复制代码
/opt/module/zookeeper/zkData目录下有一个文件myid,内里的内容是:2


  •  hadoop103节点上
  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/opt/module/zookeeper/zkData
  5. clientPort=2181server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888
复制代码
/opt/module/zookeeper/zkData目录下有一个文件myid,内里的内容是:3


  •  hadoop104节点上
  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/opt/module/zookeeper/zkData
  5. clientPort=2181server.2=hadoop102:2888:3888server.3=hadoop103:2888:3888server.4=hadoop104:2888:3888
复制代码
/opt/module/zookeeper/zkData目录下有一个文件myid,内里的内容是:4
5.2 Kafka配置



  • hadoop102节点上
  1. broker.id=0
  2. advertised.listeners=PLAINTEXT://hadoop102:9092
  3. num.network.threads=3
  4. num.io.threads=8
  5. socket.send.buffer.bytes=102400
  6. socket.receive.buffer.bytes=102400
  7. socket.request.max.bytes=104857600
  8. log.dirs=/opt/module/kafka/datas
  9. num.partitions=1
  10. num.recovery.threads.per.data.dir=1
  11. offsets.topic.replication.factor=1
  12. transaction.state.log.replication.factor=1
  13. transaction.state.log.min.isr=1
  14. log.retention.hours=168
  15. log.retention.check.interval.ms=300000
  16. zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
  17. zookeeper.connection.timeout.ms=18000
  18. group.initial.rebalance.delay.ms=0
复制代码


  • hadoop103节点上
只需要修改kafka/config/server.properties
  1. broker.id=1
  2. advertised.listeners=PLAINTEXT://hadoop103:9092
复制代码


  • hadoop104节点上
只需要修改kafka/config/server.properties
  1. broker.id=2
  2. advertised.listeners=PLAINTEXT://hadoop104:9092
复制代码
5.3 Dezebium下载安装

hadoop102、hadoop103、hadoop104下载了debezium,并且jar包放到了/opt/module/kafka/libs目录。每个节点的/opt/module/kafka/config/connect-distributed.properties配置文件内容都是
  1. bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
  2. group.id=connect-cluster
  3. key.converter=org.apache.kafka.connect.json.JsonConverter
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. key.converter.schemas.enable=true
  6. value.converter.schemas.enable=true
  7. offset.storage.topic=connect-offsets
  8. offset.storage.replication.factor=3
  9. config.storage.topic=connect-configs
  10. config.storage.replication.factor=3
  11. status.storage.topic=connect-status
  12. status.storage.replication.factor=3
  13. offset.flush.interval.ms=10000
  14. plugin.path=/opt/module/kafka/libs
  15. internal.key.converter=org.apache.kafka.connect.json.JsonConverter
  16. internal.value.converter=org.apache.kafka.connect.json.JsonConverter
  17. internal.key.converter.schemas.enable=false
  18. internal.value.converter.schemas.enable=false
  19. status.flush.interval.ms=5000
复制代码
每个节点的/opt/module/kafka/config下创建了一个配置文件:mysql-connector.json,内容如下:
  1. {
  2.     "name": "mysql-connector",
  3.     "config": {
  4.         "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  5.         "tasks.max": "1",
  6.         "database.hostname": "192.168.10.1",
  7.         "database.port": "3306",
  8.         "database.user": "debezium",
  9.         "database.password": "123456",
  10.         "database.server.id": "184054",
  11.         "database.server.name": "mydb",
  12.         "database.include.list": "news_data",
  13.         "database.history.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  14.         "schema.history.internal.kafka.bootstrap.servers": "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  15.         "schema.history.internal.kafka.topic": "schema-changes.news_data",
  16.         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  17.         "key.converter.schemas.enable": "false",
  18.         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  19.         "value.converter.schemas.enable": "false",
  20.         "database.history.kafka.topic": "dbhistory.news_data",
  21.         "database.ssl.mode": "REQUIRED",
  22.         "database.ssl.truststore.location": "/path/to/truststore.jks",
  23.         "database.ssl.truststore.password": "truststore-password",
  24.         "database.connectionTimeZone": "UTC"
  25.     }
  26. }
复制代码
启动Zookeeper、Kafka,在hadoop102、hadoop103、hadoop104节点上依次启动Kafka-connect,并且在其中一台(hadoop102)注册即可。


  • hadoop102上执行如下语句
  1. curl -s http://hadoop102:8083/connectors/mysql-connector/status
复制代码


  • hadoop103上执行如下语句
  1. curl -s http://hadoop103:8083/connectors/mysql-connector/status
复制代码


  •  hadoop104上执行如下语句
  1. curl -s http://hadoop104:8083/connectors/mysql-connector/status
复制代码
 都得到如下结果:
  1. {
  2.     "name": "mysql-connector",
  3.     "connector": {
  4.         "state": "RUNNING",
  5.         "worker_id": "192.168.10.102:8083"
  6.     },
  7.     "tasks": [
  8.         {
  9.             "id": 0,
  10.             "state": "RUNNING",
  11.             "worker_id": "192.168.10.102:8083"
  12.         }
  13.     ],
  14.     "type": "source"
  15. }
复制代码
当hadoop102的Kafka-connect关闭时


  • hadoop103上执行如下语句
  1. curl -s http://hadoop103:8083/connectors/mysql-connector/status
复制代码


  •  hadoop104上执行如下语句
  1. curl -s http://hadoop104:8083/connectors/mysql-connector/status
复制代码
 都得到如下结果:
  1. {
  2.     "name": "mysql-connector",
  3.     "connector": {
  4.         "state": "RUNNING",
  5.         "worker_id": "192.168.10.103:8083"
  6.     },
  7.     "tasks": [
  8.         {
  9.             "id": 0,
  10.             "state": "RUNNING",
  11.             "worker_id": "192.168.10.103:8083"
  12.         }
  13.     ],
  14.     "type": "source"
  15. }
复制代码
 乐成地在多节点上配置了Zookeeper、Kafka和Debezium连接器,并且能够在Kafka Connect集群中注册和运行Debezium连接器
6、kafka写入HDFS

在hadoop102节点上的flume下编写一个配置文件kafka_to_hdfs.conf,内容如下:
  1. # Define the sources, sinks, and channels
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # # Configure the source
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. a1.sources.r1.batchSize = 1000
  8. a1.sources.r1.batchDurationMillis = 2000
  9. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  10. a1.sources.r1.kafka.topics = mydb.news_data.comment_information,mydb.news_data.media_information,mydb.news_data.news_information,mydb.news_data.user_information,mydb.news_data.user_auth
  11. # # Interceptor configuration
  12. a1.sources.r1.interceptors = i1
  13. a1.sources.r1.interceptors.i1.type = org.atguigu.news.flume.interceptor.TimestampInterceptor$Builder
  14. # # Configure the channel
  15. a1.channels.c1.type = memory
  16. a1.channels.c1.capacity = 10000
  17. a1.channels.c1.transactionCapacity = 1000
  18. # # Configure the sink
  19. a1.sinks.k1.type = hdfs
  20. a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/news_data/%{table}_inc/%Y-%m-%d
  21. a1.sinks.k1.hdfs.filePrefix = events
  22. a1.sinks.k1.hdfs.fileType = DataStream
  23. a1.sinks.k1.hdfs.writeFormat = Text
  24. a1.sinks.k1.hdfs.rollInterval = 0
  25. a1.sinks.k1.hdfs.rollSize = 134217728
  26. a1.sinks.k1.hdfs.rollCount = 0
  27. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  28. # # Bind the source and sink to the channel
  29. a1.sources.r1.channels = c1
  30. a1.sinks.k1.channel = c1
复制代码
编写拦截器代码打jar包放入hadoop102的flume的lib目录下,jar包代码如下:
  1. package org.atguigu.news.flume.interceptor;
  2. import com.alibaba.fastjson.JSONObject;
  3. import org.apache.flume.Context;
  4. import org.apache.flume.Event;
  5. import org.apache.flume.interceptor.Interceptor;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.List;
  8. import java.util.Map;
  9. public class TimestampInterceptor implements Interceptor {
  10.     @Override
  11.     public void initialize() {
  12.         // Initialization logic, if needed
  13.     }
  14.     @Override
  15.     public Event intercept(Event event) {
  16.         // 1. Get header and body data
  17.         Map<String, String> headers = event.getHeaders();
  18.         String body = new String(event.getBody(), StandardCharsets.UTF_8);
  19.         try {
  20.             // 2. Parse body data as JSON
  21.             JSONObject jsonObject = JSONObject.parseObject(body);
  22.             // 3. Extract timestamp and table name
  23.             String ts = jsonObject.getString("ts_ms");
  24.             JSONObject source = jsonObject.getJSONObject("source");
  25.             String tableName = source.getString("table");
  26.             // 4. Put timestamp and table name into headers
  27.             headers.put("timestamp", ts);
  28.             headers.put("table", tableName);
  29.             return event;
  30.         } catch (Exception e) {
  31.             e.printStackTrace();
  32.             return null;
  33.         }
  34.     }
  35.     @Override
  36.     public List<Event> intercept(List<Event> events) {
  37.         for (Event event : events) {
  38.             if (intercept(event) == null) {
  39.                 events.remove(event);
  40.             }
  41.         }
  42.         return events;
  43.     }
  44.     @Override
  45.     public void close() {
  46.         // Clean up resources, if needed
  47.     }
  48.     public static class Builder implements Interceptor.Builder {
  49.         @Override
  50.         public Interceptor build() {
  51.             return new TimestampInterceptor();
  52.         }
  53.         @Override
  54.         public void configure(Context context) {
  55.             // Configuration logic, if needed
  56.         }
  57.     }
  58. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表