西河刘卡车医 发表于 2024-8-11 18:26:03

大数据技能——实战项目:广告数仓(第三部门)

目录
第6章 广告数仓采集通道
6.1 模拟数据预备
6.1.1 广告管理平台数据库
6.1.2 曝光点击监测数据
6.2 广告管理平台数据采集
6.2.1 数据库同步工具之DataX

6.2.2 数据采集通道
6.2.3 DataX配置文件
6.2.4 DataX配置文件天生
6.2.5 测试天生的DataX配置文件
6.2.6 全量数据同步脚本
6.3 曝光点击监测数据采集
6.3.1 采集通道安装
6.3.2 数据采集通道
6.3.3 日记采集Flume
6.3.3.1 配置概述
6.3.3.2 配置实操
6.3.3.3 测试
6.3.3.4 启停脚本
6.3.4 日记消费Flume
6.3.4.1 配置概述
6.3.4.2 配置实操
6.3.4.3 测试
6.3.4.4 启停脚本


第6章 广告数仓采集通道

6.1 模拟数据预备

6.1.1 广告管理平台数据库

1)数据库概述
广告管理平台中核心的几张表结构如下:
(1)ads(广告表)
id
(广告编号)
product_id
(商品id)
material_id
(素材id)
group_id
(广告组id)
ad_name
(广告名称)
material_url
(素材地址)
0
329375909941
712337489641
8
XXX
XXX
1
130171159227
519572879265
7
XXX
XXX
2
251005109937
294923573889
10
XXX
XXX
(2)platform_info(推广平台表)
id
(平台id)
platform
(平台标识)
platform_alias_zh
(平台中文名称)
1
tencent
腾讯广告
2
baidu
百度推广
3
juliang
巨量
(3)product(商品表)
id
(商品id)
name
(商品名称)
price
(商品价格)
659417
【精选】葡萄柚台湾葡萄蜜柚皮薄平和甜蜜柚当季新鲜水果批发包邮
9.9
1214894
【佳构】正宗赣南脐橙新鲜橙子江西甜橙孕妇水果冰糖果冻香橙包邮
14.8
5307635
特辣朝天椒小米椒微辣中辣干辣椒超辣特香散装干货500克
17.5
(4)ads_platform(广告投放表)
id
(编号)
ad_id
(广告id)
platform_id
(平台id)
create_time
(开始投放时间)
cancel_time
(取消投放时间)
1
0
3
XXX
XXX
2
0
2
XXX
XXX
3
1
3
XXX
XXX
(5)server_host(日记服务器列表)
id
(编号)
ipv4
(服务器ip)
1
203.195.136.146
2
103.250.32.51
3
203.90.0.54
2)模拟数据预备
(1)安装MySQL数据库
1.  安装包预备
                1)上传mysql文件夹及里面所有内容上传到/opt/software/mysql目录下
https://i-blog.csdnimg.cn/direct/9f82d340f13349afbb95f95851bb6d9f.png
2.  安装MySQL
1)如果是阿里云服务器按照如下步骤执行
说明:由于阿里云服务器安装的是Linux最小体系版,没有如下工具,所以必要安装。
(1)卸载MySQL依靠,固然呆板上没有装MySQL,但是这一步不可少
# sudo yum remove mysql-libs
(2)下载依靠并安装
# sudo yum install libaio
# sudo yum -y install autoconf
2)切换到hadoop102的root用户
$ su root
3)执行/opt/software/mysql/目录下install_mysql.sh
内容如下
# vim install_mysql.sh

#!/bin/bash
set -x
[ "$(whoami)" = "root" ] || exit 1
[ "$(ls *.rpm | wc -l)" = "7" ] || exit 1
test -f mysql-community-client-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-client-plugins-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-common-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-icu-data-files-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-libs-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-libs-compat-8.0.31-1.el7.x86_64.rpm && \
test -f mysql-community-server-8.0.31-1.el7.x86_64.rpm || exit 1

# 卸载MySQL
systemctl stop mysql mysqld 2>/dev/null
rpm -qa | grep -i 'mysql\|mariadb' | xargs -n1 rpm -e --nodeps 2>/dev/null
rm -rf /var/lib/mysql /var/log/mysqld.log /usr/lib64/mysql /etc/my.cnf /usr/my.cnf

set -e
# 安装并启动MySQL
yum install -y *.rpm >/dev/null 2>&1
systemctl start mysqld

#更改密码级别并重启MySQL
sed -i '/\/avalidate_password.length=4\nvalidate_password.policy=0' /etc/my.cnf
systemctl restart mysqld

# 更改MySQL配置
tpass=$(cat /var/log/mysqld.log | grep "temporary password" | awk '{print $NF}')
cat << EOF | mysql -uroot -p"${tpass}" --connect-expired-password >/dev/null 2>&1
set password='000000';
update mysql.user set host='%' where user='root';

alter user 'root'@'%' identified with mysql_native_password by '000000';

flush privileges;
EOF
执行脚本。
# sh install_mysql.sh
4)退出root用户到atguigu用户
# exit
(2)利用MySQL可视化客户端毗连数据库
https://i-blog.csdnimg.cn/direct/bc668afca6754a349f5fef660bdc4c8b.png
(3)通过DBeaver创建数据库
https://i-blog.csdnimg.cn/direct/85083ab08d3145bea33d184f8c938cb7.png
设置数据库名称为ad,编码为utf8mb4,排序规则为utf8mb4_general_ci
https://i-blog.csdnimg.cn/direct/20632505d5e54b6e9905fe04f0ff6f76.png
3)导入数据库结构脚本(init.sql)
https://i-blog.csdnimg.cn/direct/126dd00e3a8a4d26a4cb87abc44b64dc.png

https://i-blog.csdnimg.cn/direct/e58f65bb09b744d58229da11194caa80.png

6.1.2 曝光点击监测数据

1)日记格式概述
        前文提到过,监测数据由媒体方通过HTTP请求发送到我们的日记服务器。日记服务器担当到请求后会将数据落盘到日记文件中。文件中的每行日记格式如下:
time_local\u0001 request_method\u0001request_uri\u0001status\u0001server_addr
日记中每部门的含义如下:
字段
说明
示例数据
\u0001分隔符,Unicode中的一个不可见字符,用作分隔符,可避免与文本中的字符冲突,同时也是Hive中列分隔符的默认值 time_local
日记服务器收到监测数据上报请求的时间2023-01-07 00:00:10request_methodHTTP请求方法,常用的方法有GET、POST,GET请求通常用于向Web服务器请求数据,POST通常用于向Web服务器提交数据处理请求GETrequest_uri请求路径,这部门内容包括了媒体上报的监测数据的核心内容。122.189.79.23&ua=Mozilla/5.0%20 (Windows%20NT%2010.0)%20Appl eWebKit/537.36%20(KHTML,%20lik e%20Gecko)%20Chrome/40.0.2214 .93%20Safari/537.36&device_id= d4b8f3898515056278ccf78a7a2cca2 d&os_type=Android"status日记服务器的响应状态码,200表现响应乐成200server_addr日记服务器自身的IP地址203.195.136.146         其中,request_uri请求路径的内容最为重要,下面临该部门内容做重点先容:
        请求的路径,格式如下
/ad/<platform>/<event_type>?id=<id>&t=<ts>&ip=<ip>&ua=<ua>&device_id=<device_id>&os_type=<os_type>
路径参数
参数名
含义
platform
有tencent、baidu、julang表现不同的广告投放平台
event_type
impression表现曝光事件,click表现点击事件
查询参数
参数
含义
id
广告id
t
毫秒时间戳,曝光或者点击时间的发生时间
ip
访问广告的用户客户端ipv4地址
ua
访问广告的用户客户端的user-agent
device_id
访问广告的用户客户端的装备id
os_type
访问广告的用户客户端的操作体系类型

2)数据模拟器利用说明
(1)上传数据模拟器jar包
        我们模拟hadoop102和hadoop103为担当监测数据的服务器,因此我们需在这两台节点上传日记模拟器。
        在hadoop102、hadoop103分别创建/opt/module/ad_mock路径,并上传资料中的NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar 以及nginxLogGen.setting文件到该路径中。
(2)修改模拟器配置文件
##############################
# 数据库配置
##############################

# jdbc链接,主要用来指定机器和端口
jdbcUrl = jdbc:mysql://hadoop102:3306?useSSL=false

# 用户名
user = root

# 密码
password = 000000

# 数据库
database = ad

# 连接数据库要用的驱动类
driver = com.mysql.jdbc.Driver

##############################
# 目标地址与文件名
##############################

# 目标路径
targetPath = /opt/module/ad_mock/log

# 文件名称
fileName = access.log

##############################
# 数据生成器行为控制
##############################

# 数据基数[整数](基数越大,单线程的数据量越大,但只能粗略控制)
baseDataNum = 100

# 线程数[整数](运用CPU的多核能力,同时数据量会翻倍)
threadNum = 4

# 数据的开始时间
startTime = 2023-01-07 00:00:00

# 数据的结束时间
endTime = 2023-01-08 00:00:00

###############################
# 数据比重[浮点数] (0或注释为关闭)
##############################

# ip固定且周期访问的浏览器
cycleBrowser = 0.1

# 设备id固定且周期访问的移动设备
cycleDevice = 0.1

# ip固定且快速访问的浏览器
fastFixedIpBrowser = 0.1

# 设备id固定且快速访问的移动设备
fastFixedIdDevice = 0.1

# 正常浏览器设备
normalBrowser = 0.1

# 正常移动设备
normalDevice = 0.4

# 携带爬虫ua的浏览器请求
botBrowser = 0.1

##############################
# 生成器日志级别
##############################

# DEBUG,INFO,WARN,ERROR
logLevel = INFO

##############################
# 仅为测试使用,标注每条数据来自哪个类,false不加,true会加
##############################

# true or false
whereDataFrom = false
(3)测试模拟器
进入/opt/module/ad_mock目录,并执行以下命令。
$ java -jar NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar
(4)创建一键天生数据脚本
在hadoop102节点的/home/atguigu/bin/目录下创建ad_mock.sh文件。
$ vim /home/atguigu/bin/ad_mock.sh
填入以下内容。
#!/bin/bash

for i in hadoop102 hadoop103
do
echo "========== $i =========="
      ssh $i "cd /opt/module/ad_mock ; java -jar /opt/module/ad_mock/NginxDataGenerator-1.0-SNAPSHOT-jar-with-dependencies.jar >/dev/null2>&1 &"
done
增长执行权限。
$ chmod +x /home/atguigu/bin/ad_mock.sh
(5)测试数据天生脚本
执行ad_mock.sh文件。
$ ad_mock.sh
观察hadoop102,hadoop103两台节点是否有数据天生。
6.2 广告管理平台数据采集

6.2.1 数据库同步工具之DataX

本项目中接纳的数据库同步工具为DataX,DataX的安装和利用可参考以下博客文章。
大数据技能——DataX的利用与优化-CSDN博客


6.2.2 数据采集通道

广告管理平台的数据库中的数据,重要用作广告数仓的维度表,故此处接纳DataX举行每日全量同步。
https://i-blog.csdnimg.cn/direct/d85ccd26d98349fa849ded934171a05e.png
6.2.3 DataX配置文件

我们必要为每张全量表编写一个DataX的json配置文件,此处以product表为例,配置文件内容如下:
{
    "job": {
      "content": [
            {
                "reader": {
                  "name": "mysqlreader",
                  "parameter": {
                        "column": [
                            "id",
                            "name",
                            "price"
                        ],
                        "connection": [
                            {
                              "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/ad"
                              ],
                              "table": [
                                    "product"
                              ]
                            }
                        ],
                        "password": "000000",
                        "splitPk": "",
                        "username": "root"
                  }
                },
                "writer": {
                  "name": "hdfswriter",
                  "parameter": {
                        "column": [
                            {
                              "name": "id",
                              "type": "bigint"
                            },
                            {
                              "name": "name",
                              "type": "string"
                            },
                            {
                              "name": "price",
                              "type": "double"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "product",
                        "fileType": "text",
                        "path": "${targetdir}",
                        "writeMode": "append"
                  }
                }
            }
      ],
      "setting": {
            "errorLimit": {
                "percentage": 0.02,
                "record": 0
            },
            "speed": {
                "channel": 3
            }
      }
    }
}
        注:由于目的路径包罗一层日期,用于对不同天的数据加以区分,故path参数并未写死,需在提交使命时通过参数动态传入,参数名称为targetdir。
6.2.4 DataX配置文件天生

1)DataX配置文件天生器利用
DataX配置文件天生器可参考推文:大数据技能——DataX配置文件天生器-CSDN博客
2)将天生器上传到服务器的/opt/module/gen_datax_config目录
$ mkdir /opt/module/gen_datax_config
$ cd /opt/module/gen_datax_config
3)上传天生器
https://i-blog.csdnimg.cn/direct/433c412e0cc84b01bb54ea7c76ca392c.png
4)修改configuration.properties配置
mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
mysql.database.import=ad
# mysql.database.export=ad_report
mysql.tables.import=
# mysql.tables.export=
is.seperated.tables=0
hdfs.uri=hdfs://hadoop102:8020
import_out_dir=/opt/module/datax/job/import
# export_out_dir=
https://i-blog.csdnimg.cn/direct/bdf74a83ee9b4b53a475629d2f27dc3c.png
5)执行
$ java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar
6)观察结果
https://i-blog.csdnimg.cn/direct/13001415e0ea45159c1f9678ca5a2b0f.png
6.2.5 测试天生的DataX配置文件

以Product为例,测试用脚本天生的配置文件是否可用。
1)创建目的路径
由于DataX同步使命要求目的路径提前存在,故需手动创建路径,当前Product表的目的路径应该为/origin_data/ad/db/product_full/2023-01-07
$ hadoop fs -mkdir -p /origin_data/ad/db/product_full/2023-01-07
https://i-blog.csdnimg.cn/direct/a1aaf9215add4e98b8bc39eeb49b8da9.png
2)执行DataX同步命令
$ python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/ad/db/product_full/2023-01-07" /opt/module/datax/job/import/ad.product.json
https://i-blog.csdnimg.cn/direct/5c0884cf92f14547b6dd2e24e6276e53.png
3)观察同步结果
观察HDFS目的路径是否出现数据。
https://i-blog.csdnimg.cn/direct/28e26c81518a46c187c16f7a2cec98fc.png
6.2.6 全量数据同步脚本

        为方便利用以及后续的使命调治,此处编写一个全量表数据同步脚本。
1)在~/bin目录创建mysql_to_hdfs_full.sh
$ vim ~/bin/ad_mysql_to_hdfs_full.sh
脚本内容如下。
#!/bin/bash

DATAX_HOME=/opt/module/datax

# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
hadoop fs -test -e $1
if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
fi
}

#数据同步
#参数:arg1-datax 配置文件路径;arg2-源数据所在路径
import_data() {
handle_targetdir $2
python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$2" $1
}

case $1 in
"product")
import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
;;
"ads")
import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
;;
"server_host")
import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
;;
"ads_platform")
import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
;;
"platform_info")
import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
;;
"all")
import_data /opt/module/datax/job/import/ad.product.json /origin_data/ad/db/product_full/$do_date
import_data /opt/module/datax/job/import/ad.ads.json /origin_data/ad/db/ads_full/$do_date
import_data /opt/module/datax/job/import/ad.server_host.json /origin_data/ad/db/server_host_full/$do_date
import_data /opt/module/datax/job/import/ad.ads_platform.json /origin_data/ad/db/ads_platform_full/$do_date
import_data /opt/module/datax/job/import/ad.platform_info.json /origin_data/ad/db/platform_info_full/$do_date
;;
esac
2)为mysql_to_hdfs_full.sh增长执行权限
$ chmod 777 ~/bin/mysql_to_hdfs_full.sh
3)测试同步脚本
$ mysql_to_hdfs_full.sh all 2023-01-07
4)检测同步结果
查看HDFS目的路径是否出现全量表数据,全量表共5张。
https://i-blog.csdnimg.cn/direct/d783bbea9583499682378fa3c11bdb7b.png
6.3 曝光点击监测数据采集

6.3.1 采集通道安装

采集通道涉及到的组件有Flume和Kafka,故必要先完成两者的安装,安装步骤可参考下列文档。
1)Zookeeper安装
参考博客:大数据技能——zookeeper的安装摆设与启停脚本-CSDN博客
2)Kafka安装
参考博客:大数据技能——Kafka的安装摆设与启停脚本-CSDN博客
3)Flume安装
三台节点均必要安装Flume,可先找如下文档在一台节点安装,完成后再举行分发。
参考博客:大数据技能——Flume的安装与摆设-CSDN博客
6.3.2 数据采集通道

        按照规划,必要采集的广告监测日记文件分布在hadoop102,hadoop103两台日记服务器,故必要在两台节点摆设Flume实时采集,Flume采集到的数据会发往Kafka集群,后再由hadoop104节点的Flume实例消费Kafka数据,并将数据写入HDFS相应目录。
https://i-blog.csdnimg.cn/direct/a9e7e7e4b25f47c1b1fad434f3dfb305.png
6.3.3 日记采集Flume

6.3.3.1 配置概述

        hadoop102和hadoop103的两个Flume实例重要负责采集日记文件内容,并将数据发往Kafka,故Source可选择TailDirSource,Channel可选择KafkaChannel,不利用Sink。
其中关键配置如下:
https://i-blog.csdnimg.cn/direct/5b56e7a5b5d04372a50fe9ad43d2bde3.png
6.3.3.2 配置实操

1)创建Flume配置文件
在hadoop102节点的Flume的job目录下创建ad_file_to_kafka.conf
$ mkdir job
$ vim job/ad_file_to_kafka.conf
2)配置文件内容如下
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/ad_mock/log/.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = ad_log

#组装
a1.sources.r1.channels = c1
6.3.3.3 测试

1)启动Zookeeper、Kafka集群
2)启动hadoop102的日记采集Flume
$ bin/flume-ng agent -n a1 -c conf/ -f job/ad_file_to_kafka.conf -Dflume.root.logger=info,console
3)启动一个Kafka的Console-Consumer
$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ad_log
4)天生模拟数据
$ ad_mock.sh
5)观察Kafka消费者是否能消费到数据

6.3.3.4 启停脚本

1)分发日记采集Flume配置文件
若上述测试通过,需将hadoop102节点的Flume的配置文件,向另一台日记服务器也发送一份。
$ scp -r job hadoop103:/opt/module/flume/
2)方便起见,此处编写一个日记采集Flume历程的启停脚本
(1)在hadoop102节点的/home/atguigu/bin目录下创建脚本ad_f1.sh
$ vim ad_f1.sh
在脚本中填写如下内容。
#!/bin/bash

case $1 in
"start"){
      for i in hadoop102 hadoop103
      do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/ad_file_to_kafka.conf >/dev/null 2>&1 &"
      done
};;
"stop"){
      for i in hadoop102 hadoop103
      do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep ad_file_to_kafka | grep -v grep |awk'{print \$2}' | xargs -n1 kill -9 "
      done

};;
esac
(2)增长脚本执行权限
$ chmod x ad_f1.sh
(3)f1启动
$ ad_f1.sh start
(4)f1制止
$ ad_f1.sh stop
6.3.4 日记消费Flume

6.3.4.1 配置概述

该Flume实例重要负责消费Kafka中的数据并发往HDFS。并且对天天产生对应用户行为日记举行区分,将不同天的数据发往HDFS不同天的路径。
Source、Channel和Sink可分别选用KafkaSource、FileChannel、HDFSSink。
关键配置如下:
https://i-blog.csdnimg.cn/direct/a0ea1de6f58e4f8c9d1daf1d986e4443.png
6.3.4.2 配置实操

1)创建Flume配置文件
在hadoop104节点Flume的job目录下创建ad_kafka_to_hdfs.conf文件。
$ mkdir job
$ vim job/ad_kafka_to_hdfs.conf
2)配置文件内容如下
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = ad_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.ad.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/ad/log/ad_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3)编写Flume拦截器
(1)创建一个Maven项目,在pom.xml中添加如下依靠。
<dependencies>
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.10.1</version>
      <scope>provided</scope>
    </dependency>
</dependencies> (2)在com.atguigu.ad.flume.interceptor包下创建TimestampInterceptor类。
package com.atguigu.ad.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TimestampInterceptor implements Interceptor {

    private Pattern pattern;

    @Override
    public void initialize() {
      pattern = Pattern.compile(".*t=(\\d{13}).*");
    }

    @Override
    public Event intercept(Event event) {

      String log = new String(event.getBody(), StandardCharsets.UTF_8);

      //1.移除每个字段的双引号
      //去掉前后两个双引号
      String subLog = log.substring(1, log.length() - 1);

      //去掉各分隔符两侧的双引号,\u0001表示分隔符
      String result = subLog.replaceAll("\"\u0001\"", "\u0001");
      event.setBody(result.getBytes(StandardCharsets.UTF_8));

      //2.提取时间戳(利用正则表达式的分组捕获功能实现)
      Matcher matcher = pattern.matcher(result);
      if (matcher.matches()) {
            String ts = matcher.group(1);
            event.getHeaders().put("timestamp", ts);
      } else {
            return null;
      }
      return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

      Iterator<Event> iterator = events.iterator();
      while (iterator.hasNext()) {
            Event next = iterator.next();
            Event intercept = intercept(next);
            if (intercept == null) {
                iterator.remove();
            }
      }

      return events;
    }
    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{
      @Override
      public Interceptor build() {
            return new TimestampInterceptor();
      }

      @Override
      public void configure(Context context) {

      }
    }
}
(3)打包
(4)将打好的包放入到hadoop104的/opt/module/flume/lib目录下。
6.3.4.3 测试

1)启动zookeeper、kafka集群。
2)启动日记采集Flume。
$ ad_f1.sh start
3)启动hadoop104的日记消费Flume。
$ bin/flume-ng agent -n a1 -c conf/ -f job/ad_kafka_to_hdfs.conf -Dflume.root.logger=info,console
4)天生模拟数据
$ ad_mock.sh
5)观察HDFS是否出现数据
https://i-blog.csdnimg.cn/direct/32a8188378fa40d79129e3510df58f30.png
6.3.4.4 启停脚本

若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本ad_f2.sh
$ vim ad_f2.sh
        在脚本中填写如下内容。
#!/bin/bash

case $1 in
"start")
      echo " --------启动 hadoop104 日志数据flume-------"
      ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/ad_kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")

      echo " --------停止 hadoop104 日志数据flume-------"
      ssh hadoop104 "ps -ef | grep ad_kafka_to_hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增长脚本执行权限
$ chmod +x ad_f2.sh
3)f2启动
$ ad_f2.sh start
4)f2制止
$ ad_f2.sh stop

前面章节部门:
大数据项目——实战项目:广告数仓(第一部门)-CSDN博客
大数据项目——实战项目:广告数仓(第二部门)-CSDN博客

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 大数据技能——实战项目:广告数仓(第三部门)