点一下关注吧!!!非常感谢!!连续更新!!!
Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据挖掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(正在更新…)
章节内容
上节我们完成了如下的内容(留存会员模块):
基本架构
之前已经完成了Flume的数据收罗到HDFS中,现在我们将依次走通流程:
- ODS
- DWD
- DWS
- ADS
- DataX数据导出到MySQL
ADS有4张表需要从数据堆栈的ADS层导入MySQL,即:Hive => MySQL
- ads.ads_member_active_count
- ads.ads_member_retention_count
- ads.ads_member_retention_rate
- ads.ads_new_member_cnt
复制代码 在Hive中可以看到这几张表:
创建库表
- -- MySQL 建表
- -- 活跃会员数
- create database dwads;
- drop table if exists dwads.ads_member_active_count;
- create table dwads.ads_member_active_count(
- `dt` varchar(10) COMMENT '统计日期',
- `day_count` int COMMENT '当日会员数量',
- `week_count` int COMMENT '当周会员数量',
- `month_count` int COMMENT '当月会员数量',
- primary key (dt)
- );
- -- 新增会员数
- drop table if exists dwads.ads_new_member_cnt;
- create table dwads.ads_new_member_cnt
- (
- `dt` varchar(10) COMMENT '统计日期',
- `cnt` int,
- primary key (dt)
- );
- -- 会员留存数
- drop table if exists dwads.ads_member_retention_count;
- create table dwads.ads_member_retention_count
- (
- `dt` varchar(10) COMMENT '统计日期',
- `add_date` varchar(10) comment '新增日期',
- `retention_day` int comment '截止当前日期留存天数',
- `retention_count` bigint comment '留存数',
- primary key (dt)
- ) COMMENT '会员留存情况';
- -- 会员留存率
- drop table if exists dwads.ads_member_retention_rate;
- create table dwads.ads_member_retention_rate
- (
- `dt` varchar(10) COMMENT '统计日期',
- `add_date` varchar(10) comment '新增日期',
- `retention_day` int comment '截止当前日期留存天数',
- `retention_count` bigint comment '留存数',
- `new_mid_count` bigint comment '当日会员新增数',
- `retention_ratio` decimal(10,2) comment '留存率',
- primary key (dt)
- ) COMMENT '会员留存率';
复制代码 执行结果如下图:
DataX
DataX 之前章节已经先容过了 这里就简单一说 详细教程看之前的
基本先容
DataX 是阿里巴巴开源的一款分布式数据同步工具,用于实现各种异构数据源之间高效、稳定的数据同步。其重要功能包括数据的批量导入、导出和实时传输,支持多种主流数据源,比方关系型数据库、NoSQL 数据库、大数据存储系统等。
DataX 的核心头脑是“插件化架构”,通过灵活的 Reader 和 Writer 插件实现不同数据源之间的数据互换。
DataX 的特点
插件化架构
- Reader:用于从数据源读取数据。
- Writer:用于将数据写入目标存储。
- 插件开发简单,可以根据需要扩展支持新的数据源。
高性能与高扩展性
- 支持大规模数据同步,处置处罚速率快。
- 支持多线程并发传输,利用 CPU 和 IO 性能。
- 可设置分片任务(Shard),实现分布式同步。
兼容性强
- 支持丰富的异构数据源,包括 MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、HDFS、Hive、ODPS、ElasticSearch 等。
- 可在不同系统之间传输数据,比如从传统 RDBMS 数据库迁移到大数据系统。
易用性
- 设置简单,基于 JSON 文件定义任务,易于上手。
- 提供详尽的运行日记,便于定位和解决题目。
- 开源代码,支持二次开发。
可监控性
- 提供详细的任务运行指标,比如吞吐量、数据量等。
- 支持失败任务自动重试,确保数据同步过程的可靠性。
设置文件
导出活跃会员数(ads_member_active_count),编写一个JSON出来:
- vim /opt/wzk/datax/export_member_active_count.json
复制代码 hdfsreader => mysqlwriter
- {
- "job": {
- "setting": {
- "speed": {
- "channel": 1
- }
- },
- "content": [{
- "reader": {
- "name": "hdfsreader",
- "parameter": {
- "path":
- "/user/hive/warehouse/ads.db/ads_member_active_count/dt=$do_date/*",
- "defaultFS": "hdfs://h121.wzk.icu:9000",
- "column": [{
- "type": "string",
- "value": "$do_date"
- }, {
- "index": 0,
- "type": "string"
- },
- {
- "index": 1,
- "type": "string"
- },
- {
- "index": 2,
- "type": "string"
- }
- ],
- "fileType": "text",
- "encoding": "UTF-8",
- "fieldDelimiter": ","
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "writeMode": "replace",
- "username": "hive",
- "password": "hive@wzk.icu",
- "column": ["dt","day_count","week_count","month_count"],
- "preSql": [
- ""
- ],
- "connection": [{
- "jdbcUrl":
- "jdbc:mysql://h122.wzk.icu:3306/dwads?
- useUnicode=true&characterEncoding=utf-8",
- "table": [
- "ads_member_active_count"
- ]
- }]
- }
- }
- }]
- }
- }
复制代码 写入的内容如下所示:
编写命令
DataX的运行的方式如下所示:
- python datax.py -p "-Ddo_date=2020-07-21" /opt/wzk/datax/export_member_active_count.json
复制代码 编写脚本
编写一个脚本用来完成这个流程:
- vim /opt/wzk/hive/export_member_active_count.sh
复制代码 写入的内容如下所示:
- #!/bin/bash
- JSON= /opt/wzk/datax
- source /etc/profile
- if [ -n "$1" ] ;then
- do_date=$1
- else
- do_date=`date -d "-1 day" +%F`
- fi
- python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" $JSON/export_member_active_count.json
复制代码 写入的内容如下所示:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |