ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据技能——实战项目:广告数仓(第三部门) [打印本页]

作者: 西河刘卡车医    时间: 2024-8-11 18:26
标题: 大数据技能——实战项目:广告数仓(第三部门)
目录
第6章 广告数仓采集通道
6.1 模拟数据预备
6.1.1 广告管理平台数据库
6.1.2 曝光点击监测数据
6.2 广告管理平台数据采集
6.2.1 数据库同步工具之DataX

6.2.2 数据采集通道
6.2.3 DataX配置文件
6.2.4 DataX配置文件天生
6.2.5 测试天生的DataX配置文件
6.2.6 全量数据同步脚本
6.3 曝光点击监测数据采集
6.3.1 采集通道安装
6.3.2 数据采集通道
6.3.3 日记采集Flume
6.3.3.1 配置概述
6.3.3.2 配置实操
6.3.3.3 测试
6.3.3.4 启停脚本
6.3.4 日记消费Flume
6.3.4.1 配置概述
6.3.4.2 配置实操
6.3.4.3 测试
6.3.4.4 启停脚本



第6章 广告数仓采集通道

6.1 模拟数据预备

6.1.1 广告管理平台数据库

1)数据库概述
广告管理平台中核心的几张表结构如下:
1ads(广告表)
id
(广告编号)
product_id
(商品id)
material_id
(素材id)
group_id
(广告组id)
ad_name
(广告名称)
material_url
(素材地址)
0
329375909941
712337489641
8
XXX
XXX
1
130171159227
519572879265
7
XXX
XXX
2
251005109937
294923573889
10
XXX
XXX
2platform_info(推广平台表)
id
(平台id)
platform
(平台标识)
platform_alias_zh
(平台中文名称)
1
tencent
腾讯广告
2
baidu
百度推广
3
juliang
巨量
3product(商品表)
id
(商品id)
name
(商品名称)
price
(商品价格)
659417
【精选】葡萄柚台湾葡萄蜜柚皮薄平和甜蜜柚当季新鲜水果批发包邮
9.9
1214894
【佳构】正宗赣南脐橙新鲜橙子江西甜橙孕妇水果冰糖果冻香橙包邮
14.8
5307635
特辣朝天椒小米椒微辣中辣干辣椒超辣特香散装干货500克
17.5
4ads_platform(广告投放表)
id
(编号)
ad_id
(广告id)
platform_id
(平台id)
create_time
(开始投放时间)
cancel_time
(取消投放时间)
1
0
3
XXX
XXX
2
0
2
XXX
XXX
3
1
3
XXX
XXX
5server_host(日记服务器列表)
id
(编号)
ipv4
(服务器ip)
1
203.195.136.146
2
103.250.32.51
3
203.90.0.54
2)模拟数据预备
1)安装MySQL数据库
1.  安装包预备
                1)上传mysql文件夹及里面所有内容上传到/opt/software/mysql目录下

2.  安装MySQL
1)如果是阿里云服务器按照如下步骤执行
说明:由于阿里云服务器安装的是Linux最小体系版,没有如下工具,所以必要安装。
(1)卸载MySQL依靠,固然呆板上没有装MySQL,但是这一步不可少
[atguigu@hadoop102 mysql]# sudo yum remove mysql-libs
(2)下载依靠并安装
[atguigu@hadoop102 mysql]# sudo yum install libaio
[atguigu@hadoop102 mysql]# sudo yum -y install autoconf
2)切换到hadoop102的root用户
[atguigu@hadoop102 mysql]$ su root
3)执行/opt/software/mysql/目录下install_mysql.sh
内容如下
  1. [root@hadoop102 mysql]# vim install_mysql.sh
  2. #!/bin/bash
  3. set -x
  4. [ "$(whoami)" = "root" ] || exit 1
  5. [ "$(ls *.rpm | wc -l)" = "7" ] || exit 1
  6. test -f mysql-community-client-8.0.31-1.el7.x86_64.rpm && \
  7. test -f mysql-community-client-plugins-8.0.31-1.el7.x86_64.rpm && \
  8. test -f mysql-community-common-8.0.31-1.el7.x86_64.rpm && \
  9. test -f mysql-community-icu-data-files-8.0.31-1.el7.x86_64.rpm && \
  10. test -f mysql-community-libs-8.0.31-1.el7.x86_64.rpm && \
  11. test -f mysql-community-libs-compat-8.0.31-1.el7.x86_64.rpm && \
  12. test -f mysql-community-server-8.0.31-1.el7.x86_64.rpm || exit 1
  13. # 卸载MySQL
  14. systemctl stop mysql mysqld 2>/dev/null
  15. rpm -qa | grep -i 'mysql\|mariadb' | xargs -n1 rpm -e --nodeps 2>/dev/null
  16. rm -rf /var/lib/mysql /var/log/mysqld.log /usr/lib64/mysql /etc/my.cnf /usr/my.cnf
  17. set -e
  18. # 安装并启动MySQL
  19. yum install -y *.rpm >/dev/null 2>&1
  20. systemctl start mysqld
  21. #更改密码级别并重启MySQL
  22. sed -i '/\[mysqld\]/avalidate_password.length=4\nvalidate_password.policy=0' /etc/my.cnf
  23. systemctl restart mysqld
  24. # 更改MySQL配置
  25. tpass=$(cat /var/log/mysqld.log | grep "temporary password" | awk '{print $NF}')
  26. cat << EOF | mysql -uroot -p"${tpass}" --connect-expired-password >/dev/null 2>&1
  27. set password='000000';
  28. update mysql.user set host='%' where user='root';
  29. alter user 'root'@'%' identified with mysql_native_password by '000000';
  30. flush privileges;
  31. EOF
复制代码
执行脚本。
[root@hadoop102 mysql]# sh install_mysql.sh
4)退出root用户到atguigu用户
[root@hadoop102 mysql]# exit
2)利用MySQL可视化客户端毗连数据库

3)通过DBeaver创建数据库

设置数据库名称为ad,编码为utf8mb4,排序规则为utf8mb4_general_ci

3)导入数据库结构脚本(init.sql




6.1.2 曝光点击监测数据

1)日记格式概述
        前文提到过,监测数据由媒体方通过HTTP请求发送到我们的日记服务器。日记服务器担当到请求后会将数据落盘到日记文件中。文件中的每行日记格式如下:
time_local\u0001 request_method\u0001request_uri\u0001status\u0001server_addr
日记中每部门的含义如下:
字段
说明
示例数据
\u0001分隔符,Unicode中的一个不可见字符,用作分隔符,可避免与文本中的字符冲突,同时也是Hive中列分隔符的默认值
time_local
日记服务器收到监测数据上报请求的时间2023-01-07 00:00:10
request_methodHTTP请求方法,常用的方法有GET、POST,GET请求通常用于向Web服务器请求数据,POST通常用于向Web服务器提交数据处理请求GET
request_uri请求路径,这部门内容包括了媒体上报的监测数据的核心内容。122.189.79.23&ua=Mozilla/5.0%20 (Windows%20NT%2010.0)%20Appl eWebKit/537.36%20(KHTML,%20lik e%20Gecko)%20Chrome/40.0.2214 .93%20Safari/537.36&device_id= d4b8f3898515056278ccf78a7a2cca2 d&os_type=Android"
status日记服务器的响应状态码,200表现响应乐成200
server_addr日记服务器自身的IP地址203.195.136.146
        其中,request_uri请求路径的内容最为重要,下面临该部门内容做重点先容:
        请求的路径,格式如下
/ad/<platform>/<event_type>?id=<id>&t=<ts>&ip=<ip>&ua=<ua>&device_id=<device_id>&os_type=<os_type>

路径参数
参数名
含义
platform
有tencent、baidu、julang表现不同的广告投放平台
event_type
impression表现曝光事件,click表现点击事件
查询参数
参数
含义
id
广告id
t
毫秒时间戳,曝光或者点击时间的发生时间
ip
访问广告的用户客户端ipv4地址
ua
访问广告的用户客户端的user-agent
device_id
访问广告的用户客户端的装备id
os_type
访问广告的用户客户端的操作体系类型

2)数据模拟器利用说明
(1)上传数据模拟器jar包
        我们模拟hadoop102和hadoop103为担当监测数据的服务器,因此我们需在这两台节点上传日记模拟器。
        在hadoop102、hadoop103分别创建/opt/module/ad_mock路径,并上传资料中的NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar 以及nginxLogGen.setting文件到该路径中。
(2)修改模拟器配置文件
  1. ##############################
  2. # 数据库配置
  3. ##############################
  4. # jdbc链接,主要用来指定机器和端口
  5. jdbcUrl = jdbc:mysql://hadoop102:3306?useSSL=false
  6. # 用户名
  7. user = root
  8. # 密码
  9. password = 000000
  10. # 数据库
  11. database = ad
  12. # 连接数据库要用的驱动类
  13. driver = com.mysql.jdbc.Driver
  14. ##############################
  15. # 目标地址与文件名
  16. ##############################
  17. # 目标路径
  18. targetPath = /opt/module/ad_mock/log
  19. # 文件名称
  20. fileName = access.log
  21. ##############################
  22. # 数据生成器行为控制
  23. ##############################
  24. # 数据基数[整数](基数越大,单线程的数据量越大,但只能粗略控制)
  25. baseDataNum = 100
  26. # 线程数[整数](运用CPU的多核能力,同时数据量会翻倍)
  27. threadNum = 4
  28. # 数据的开始时间
  29. startTime = 2023-01-07 00:00:00
  30. # 数据的结束时间
  31. endTime = 2023-01-08 00:00:00
  32. ###############################
  33. # 数据比重[浮点数] (0或注释为关闭)
  34. ##############################
  35. # ip固定且周期访问的浏览器
  36. cycleBrowser = 0.1
  37. # 设备id固定且周期访问的移动设备
  38. cycleDevice = 0.1
  39. # ip固定且快速访问的浏览器
  40. fastFixedIpBrowser = 0.1
  41. # 设备id固定且快速访问的移动设备
  42. fastFixedIdDevice = 0.1
  43. # 正常浏览器设备
  44. normalBrowser = 0.1
  45. # 正常移动设备
  46. normalDevice = 0.4
  47. # 携带爬虫ua的浏览器请求
  48. botBrowser = 0.1
  49. ##############################
  50. # 生成器日志级别
  51. ##############################
  52. # DEBUG,INFO,WARN,ERROR
  53. logLevel = INFO
  54. ##############################
  55. # 仅为测试使用,标注每条数据来自哪个类,false不加,true会加
  56. ##############################
  57. # true or false
  58. whereDataFrom = false
复制代码
(3)测试模拟器
进入/opt/module/ad_mock目录,并执行以下命令。
[atguigu@hadoop102 ad_mock]$ java -jar NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar

(4)创建一键天生数据脚本
在hadoop102节点的/home/atguigu/bin/目录下创建ad_mock.sh文件。
[atguigu@hadoop102 ~]$ vim /home/atguigu/bin/ad_mock.sh

填入以下内容。
  1. #!/bin/bash
  2. for i in hadoop102 hadoop103
  3. do
  4. echo "========== $i =========="
  5.         ssh $i "cd /opt/module/ad_mock ; java -jar /opt/module/ad_mock/NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar >/dev/null  2>&1 &"
  6. done
复制代码
增长执行权限。
[atguigu@hadoop102 ~]$ chmod +x /home/atguigu/bin/ad_mock.sh

(5)测试数据天生脚本
执行ad_mock.sh文件。
[atguigu@hadoop102 ~]$ ad_mock.sh

观察hadoop102,hadoop103两台节点是否有数据天生。
6.2 广告管理平台数据采集

6.2.1 数据库同步工具之DataX

本项目中接纳的数据库同步工具为DataX,DataX的安装和利用可参考以下博客文章。
大数据技能——DataX的利用与优化-CSDN博客


6.2.2 数据采集通道

广告管理平台的数据库中的数据,重要用作广告数仓的维度表,故此处接纳DataX举行每日全量同步。

6.2.3 DataX配置文件

我们必要为每张全量表编写一个DataX的json配置文件,此处以product表为例,配置文件内容如下:
  1. {
  2.     "job": {
  3.         "content": [
  4.             {
  5.                 "reader": {
  6.                     "name": "mysqlreader",
  7.                     "parameter": {
  8.                         "column": [
  9.                             "id",
  10.                             "name",
  11.                             "price"
  12.                         ],
  13.                         "connection": [
  14.                             {
  15.                                 "jdbcUrl": [
  16.                                     "jdbc:mysql://hadoop102:3306/ad"
  17.                                 ],
  18.                                 "table": [
  19.                                     "product"
  20.                                 ]
  21.                             }
  22.                         ],
  23.                         "password": "000000",
  24.                         "splitPk": "",
  25.                         "username": "root"
  26.                     }
  27.                 },
  28.                 "writer": {
  29.                     "name": "hdfswriter",
  30.                     "parameter": {
  31.                         "column": [
  32.                             {
  33.                                 "name": "id",
  34.                                 "type": "bigint"
  35.                             },
  36.                             {
  37.                                 "name": "name",
  38.                                 "type": "string"
  39.                             },
  40.                             {
  41.                                 "name": "price",
  42.                                 "type": "double"
  43.                             }
  44.                         ],
  45.                         "compress": "gzip",
  46.                         "defaultFS": "hdfs://hadoop102:8020",
  47.                         "fieldDelimiter": "\t",
  48.                         "fileName": "product",
  49.                         "fileType": "text",
  50.                         "path": "${targetdir}",
  51.                         "writeMode": "append"
  52.                     }
  53.                 }
  54.             }
  55.         ],
  56.         "setting": {
  57.             "errorLimit": {
  58.                 "percentage": 0.02,
  59.                 "record": 0
  60.             },
  61.             "speed": {
  62.                 "channel": 3
  63.             }
  64.         }
  65.     }
  66. }
复制代码
        注:由于目的路径包罗一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交使命时通过参数动态传入,参数名称为targetdir
6.2.4 DataX配置文件天生

1DataX配置文件天生器利用
DataX配置文件天生器可参考推文:大数据技能——DataX配置文件天生器-CSDN博客
2将天生器上传到服务器的/opt/module/gen_datax_config目录
[atguigu@hadoop102 ~]$ mkdir /opt/module/gen_datax_config
[atguigu@hadoop102 ~]$ cd /opt/module/gen_datax_config
3)上传天生器

4修改configuration.properties配置
  1. mysql.username=root
  2. mysql.password=000000
  3. mysql.host=hadoop102
  4. mysql.port=3306
  5. mysql.database.import=ad
  6. # mysql.database.export=ad_report
  7. mysql.tables.import=
  8. # mysql.tables.export=
  9. is.seperated.tables=0
  10. hdfs.uri=hdfs://hadoop102:8020
  11. import_out_dir=/opt/module/datax/job/import
  12. # export_out_dir=
复制代码

5执行
[atguigu@hadoop102 gen_datax_config]$ java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar
6观察结果

6.2.5 测试天生的DataX配置文件

以Product为例,测试用脚本天生的配置文件是否可用。
1)创建目的路径
由于DataX同步使命要求目的路径提前存在,故需手动创建路径,当前Product表的目的路径应该为/origin_data/ad/db/product_full/2023-01-07
[atguigu@hadoop102 bin]$ hadoop fs -mkdir -p /origin_data/ad/db/product_full/2023-01-07

2)执行DataX同步命令
[atguigu@hadoop102 bin]$ python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/ad/db/product_full/2023-01-07" /opt/module/datax/job/import/ad.product.json

3)观察同步结果
观察HDFS目的路径是否出现数据。

6.2.6 全量数据同步脚本

        为方便利用以及后续的使命调治,此处编写一个全量表数据同步脚本。
1)在~/bin目录创建mysql_to_hdfs_full.sh
[atguigu@hadoop102 bin]$ vim ~/bin/ad_mysql_to_hdfs_full.sh
脚本内容如下。
  1. #!/bin/bash
  2. DATAX_HOME=/opt/module/datax
  3. # 如果传入日期则do_date等于传入的日期,否则等于前一天日期
  4. if [ -n "$2" ] ;then
  5.     do_date=$2
  6. else
  7.     do_date=`date -d "-1 day" +%F`
  8. fi
  9. #处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
  10. handle_targetdir() {
  11.   hadoop fs -test -e $1
  12.   if [[ $? -eq 1 ]]; then
  13.     echo "路径$1不存在,正在创建......"
  14.     hadoop fs -mkdir -p $1
  15.   else
  16.     echo "路径$1已经存在"
  17.     fs_count=$(hadoop fs -count $1)
  18.     content_size=$(echo $fs_count | awk '{print $3}')
  19.     if [[ $content_size -eq 0 ]]; then
  20.       echo "路径$1为空"
  21.     else
  22.       echo "路径$1不为空,正在清空......"
  23.       hadoop fs -rm -r -f $1/*
  24.     fi
  25.   fi
  26. }
  27. #数据同步
  28. #参数:arg1-datax 配置文件路径;arg2-源数据所在路径
  29. import_data() {
  30.   handle_targetdir $2
  31.   python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$2" $1
  32. }
  33. case $1 in
  34. "product")
  35.   import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
  36.   ;;
  37. "ads")
  38.   import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
  39.   ;;
  40. "server_host")
  41.   import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
  42.   ;;
  43. "ads_platform")
  44.   import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
  45.   ;;
  46. "platform_info")
  47.   import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
  48.   ;;
  49. "all")
  50.   import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
  51.   import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
  52.   import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
  53.   import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
  54.   import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
  55.   ;;
  56. esac
复制代码
2)为mysql_to_hdfs_full.sh增长执行权限
[atguigu@hadoop102 bin]$ chmod 777 ~/bin/mysql_to_hdfs_full.sh
3)测试同步脚本
[atguigu@hadoop102 bin]$ mysql_to_hdfs_full.sh all 2023-01-07
4)检测同步结果
查看HDFS目的路径是否出现全量表数据,全量表共5张。

6.3 曝光点击监测数据采集

6.3.1 采集通道安装

采集通道涉及到的组件有Flume和Kafka,故必要先完成两者的安装,安装步骤可参考下列文档。
1Zookeeper安装
参考博客:大数据技能——zookeeper的安装摆设与启停脚本-CSDN博客
2Kafka安装
参考博客:大数据技能——Kafka的安装摆设与启停脚本-CSDN博客
3Flume安装
三台节点均必要安装Flume,可先找如下文档在一台节点安装,完成后再举行分发。
参考博客:大数据技能——Flume的安装与摆设-CSDN博客
6.3.2 数据采集通道

        按照规划,必要采集的广告监测日记文件分布在hadoop102,hadoop103两台日记服务器,故必要在两台节点摆设Flume实时采集,Flume采集到的数据会发往Kafka集群,后再由hadoop104节点的Flume实例消费Kafka数据,并将数据写入HDFS相应目录。

6.3.3 日记采集Flume

6.3.3.1 配置概述

        hadoop102和hadoop103的两个Flume实例重要负责采集日记文件内容,并将数据发往Kafka,故Source可选择TailDirSource,Channel可选择KafkaChannel,不利用Sink。
其中关键配置如下:

6.3.3.2 配置实操

1)创建Flume配置文件
在hadoop102节点的Flume的job目录下创建ad_file_to_kafka.conf
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ vim job/ad_file_to_kafka.conf
2)配置文件内容如下
  1. #定义组件
  2. a1.sources = r1
  3. a1.channels = c1
  4. #配置source
  5. a1.sources.r1.type = TAILDIR
  6. a1.sources.r1.filegroups = f1
  7. a1.sources.r1.filegroups.f1 = /opt/module/ad_mock/log/.*
  8. a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
  9. #配置channel
  10. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  11. a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  12. a1.channels.c1.kafka.topic = ad_log
  13. #组装
  14. a1.sources.r1.channels = c1
复制代码
6.3.3.3 测试

1)启动ZookeeperKafka集群
2)启动hadoop102的日记采集Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/ad_file_to_kafka.conf -Dflume.root.logger=info,console
3)启动一个KafkaConsole-Consumer
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ad_log
4)天生模拟数据
[atguigu@hadoop102 ~]$ ad_mock.sh
5)观察Kafka消费者是否能消费到数据

6.3.3.4 启停脚本

1分发日记采集Flume配置文件
若上述测试通过,需将hadoop102节点的Flume的配置文件,向另一台日记服务器也发送一份。
[atguigu@hadoop102 flume]$ scp -r job hadoop103:/opt/module/flume/
2)方便起见,此处编写一个日记采集Flume历程的启停脚本
(1)在hadoop102节点的/home/atguigu/bin目录下创建脚本ad_f1.sh
[atguigu@hadoop102 bin]$ vim ad_f1.sh
在脚本中填写如下内容。
  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4.         for i in hadoop102 hadoop103
  5.         do
  6.                 echo " --------启动 $i 采集flume-------"
  7.                 ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/ad_file_to_kafka.conf >/dev/null 2>&1 &"
  8.         done
  9. };;
  10. "stop"){
  11.         for i in hadoop102 hadoop103
  12.         do
  13.                 echo " --------停止 $i 采集flume-------"
  14.                 ssh $i "ps -ef | grep ad_file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
  15.         done
  16. };;
  17. esac
复制代码
(2)增长脚本执行权限
[atguigu@hadoop102 bin]$ chmod x ad_f1.sh
(3)f1启动
[atguigu@hadoop102 module]$ ad_f1.sh start
(4)f1制止
[atguigu@hadoop102 module]$ ad_f1.sh stop
6.3.4 日记消费Flume

6.3.4.1 配置概述

该Flume实例重要负责消费Kafka中的数据并发往HDFS。并且对天天产生对应用户行为日记举行区分,将不同天的数据发往HDFS不同天的路径。
Source、Channel和Sink可分别选用KafkaSource、FileChannel、HDFSSink。
关键配置如下:

6.3.4.2 配置实操

1)创建Flume配置文件
hadoop104节点Flume的job目录下创建ad_kafka_to_hdfs.conf文件。
[atguigu@hadoop104 flume]$ mkdir job
[atguigu@hadoop104 flume]$ vim job/ad_kafka_to_hdfs.conf
2)配置文件内容如下
  1. #定义组件
  2. a1.sources=r1
  3. a1.channels=c1
  4. a1.sinks=k1
  5. #配置source1
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. a1.sources.r1.batchSize = 1000
  8. a1.sources.r1.batchDurationMillis = 2000
  9. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  10. a1.sources.r1.kafka.topics = ad_log
  11. a1.sources.r1.interceptors = i1
  12. a1.sources.r1.interceptors.i1.type = com.atguigu.ad.flume.interceptor.TimestampInterceptor$Builder
  13. #配置channel
  14. a1.channels.c1.type = file
  15. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
  16. a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
  17. a1.channels.c1.maxFileSize = 2146435071
  18. a1.channels.c1.capacity = 1000000
  19. a1.channels.c1.keep-alive = 6
  20. #配置sink
  21. a1.sinks.k1.type = hdfs
  22. a1.sinks.k1.hdfs.path = /origin_data/ad/log/ad_log/%Y-%m-%d
  23. a1.sinks.k1.hdfs.filePrefix = log
  24. a1.sinks.k1.hdfs.round = false
  25. a1.sinks.k1.hdfs.rollInterval = 10
  26. a1.sinks.k1.hdfs.rollSize = 134217728
  27. a1.sinks.k1.hdfs.rollCount = 0
  28. #控制输出文件类型
  29. a1.sinks.k1.hdfs.fileType = CompressedStream
  30. a1.sinks.k1.hdfs.codeC = gzip
  31. #组装
  32. a1.sources.r1.channels = c1
  33. a1.sinks.k1.channel = c1
复制代码
3编写Flume拦截器
(1)创建一个Maven项目,在pom.xml中添加如下依靠。
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.apache.flume</groupId>
  4.         <artifactId>flume-ng-core</artifactId>
  5.         <version>1.10.1</version>
  6.         <scope>provided</scope>
  7.     </dependency>
  8. </dependencies>
复制代码
(2)在com.atguigu.ad.flume.interceptor包下创建TimestampInterceptor类。
  1. package com.atguigu.ad.flume.interceptor;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.interceptor.Interceptor;
  5. import java.nio.charset.StandardCharsets;
  6. import java.util.List;
  7. import java.util.regex.Matcher;
  8. import java.util.regex.Pattern;
  9. public class TimestampInterceptor implements Interceptor {
  10.     private Pattern pattern;
  11.     @Override
  12.     public void initialize() {
  13.         pattern = Pattern.compile(".*t=(\\d{13}).*");
  14.     }
  15.     @Override
  16.     public Event intercept(Event event) {
  17.         String log = new String(event.getBody(), StandardCharsets.UTF_8);
  18.         //1.移除每个字段的双引号
  19.         //去掉前后两个双引号
  20.         String subLog = log.substring(1, log.length() - 1);
  21.         //去掉各分隔符两侧的双引号,\u0001表示分隔符
  22.         String result = subLog.replaceAll(""\u0001"", "\u0001");
  23.         event.setBody(result.getBytes(StandardCharsets.UTF_8));
  24.         //2.提取时间戳(利用正则表达式的分组捕获功能实现)
  25.         Matcher matcher = pattern.matcher(result);
  26.         if (matcher.matches()) {
  27.             String ts = matcher.group(1);
  28.             event.getHeaders().put("timestamp", ts);
  29.         } else {
  30.             return null;
  31.         }
  32.         return event;
  33.     }
  34.     @Override
  35.     public List<Event> intercept(List<Event> events) {
  36.         Iterator<Event> iterator = events.iterator();
  37.         while (iterator.hasNext()) {
  38.             Event next = iterator.next();
  39.             Event intercept = intercept(next);
  40.             if (intercept == null) {
  41.                 iterator.remove();
  42.             }
  43.         }
  44.         return events;
  45.     }
  46.     @Override
  47.     public void close() {
  48.     }
  49.     public static class Builder implements Interceptor.Builder{
  50.         @Override
  51.         public Interceptor build() {
  52.             return new TimestampInterceptor();
  53.         }
  54.         @Override
  55.         public void configure(Context context) {
  56.         }
  57.     }
  58. }
复制代码
(3)打包
(4)将打好的包放入到hadoop104的/opt/module/flume/lib目录下。
6.3.4.3 测试

1)启动zookeeperkafka集群。
2)启动日记采集Flume
[atguigu@hadoop102 ~]$ ad_f1.sh start
3)启动hadoop104的日记消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/ad_kafka_to_hdfs.conf -Dflume.root.logger=info,console
4)天生模拟数据
[atguigu@hadoop102 ~]$ ad_mock.sh
5)观察HDFS是否出现数据

6.3.4.4 启停脚本

若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本ad_f2.sh
[atguigu@hadoop102 bin]$ vim ad_f2.sh
        在脚本中填写如下内容。
  1. #!/bin/bash
  2. case $1 in
  3. "start")
  4.         echo " --------启动 hadoop104 日志数据flume-------"
  5.         ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/ad_kafka_to_hdfs.conf >/dev/null 2>&1 &"
  6. ;;
  7. "stop")
  8.         echo " --------停止 hadoop104 日志数据flume-------"
  9.         ssh hadoop104 "ps -ef | grep ad_kafka_to_hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
  10. ;;
  11. esac
复制代码
2)增长脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x ad_f2.sh
3f2启动
[atguigu@hadoop102 module]$ ad_f2.sh start
4f2制止
[atguigu@hadoop102 module]$ ad_f2.sh stop

前面章节部门:
大数据项目——实战项目:广告数仓(第一部门)-CSDN博客
大数据项目——实战项目:广告数仓(第二部门)-CSDN博客

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4