目录
需求流程:
需求分析与规范
作业
作业2
需求流程:
全量抽取 增量抽取 - DataX Kettle Sqoop ... 场景: 业务部分同事大概甲方的工作职员给我们的部分经理和你提出了新的需 求 流程: 接洽 => 开会讨论 => 确认需求 => 落地 需求文档( 具体需要的东西) 原型文档 (报表的原型 纸笔/画图工具) 第一张汇总报表需要的指标 - 决策报表 汇总表 每次计算只有一天的记录 - 大 BOSS: - 全部会员数 新增会员数 - 有效会员数 有效会员占比 - 流失会员数: 倒推一年含一年无消耗记录的会员 - 净增有效会员数 - 会员消耗级别分类人数 (A >=2000 B >=1000 < 2000 C >=500 <1000 D >=100 <500 E <100) - 会员消耗总额 - 会员消耗总次数 - 会员消耗单价 - 流失会员数 - 新增流失会员 - 60 天会员复购率 - 180 天会员复购率 - 365 天会员复购率 第二张报表用于市场营销 - 明细报表, 普通报表 - 市场部同事 - 筛选大于 30 笔的会员大概消耗总金额大于 1500 的会员作为目的用户 用于电 话营销 - 字段: 姓名 手机号 消耗总额 消耗次数 都会 门店 付款偏好 (手机 刷卡 现金..) 关注的疾病 - 该会员近来 3 个月的月消耗订单数和额度 m1_total m1_sale m2_total m2_sale 第三张报表用于市场营销 - 2022.1-2023.12 每个月消耗前 20 的会员名单 24X20=480 条 - 市场部经理 - T+1(月) yyyy-mm 月份 - 会员姓名 接洽方式... 消耗级别分类, 近来 30 天消耗订单数和总额 - 该会员当月前 30 天消耗, 60 天消耗, 90 天消耗 (困难点) - 报表排序方式: 默认按消耗总额倒序 / 按消耗次数倒序 - 报表默认显示 2021 年 1 月份的数据, 可选 2 年内任何一个月的数据查看 需求分析与规范
经理整理出一个类似宽表文档的东西 - 方便后续的明细查询和指标计算 它决定了我们需要抽取哪些表 crm.user_base_info_his 客户信息表 erp.u_memcard_reg 会员卡信息表 erp.u_sale_m 订单表 1900W 数据 erp.u_sale_pay 订单付出表 1200W erp.c_memcard_class_group 会员分组表 erp.u_memcard_reg_c 疾病关注表 his.chronic_patient_info_new 检测表 erp.c_org_busi 门店表 # 额外的从文件处理的码值表 erp.c_code_value # 7 个全量 体系名前缀 _ 表名 _(full|inc) crm.user_base_info_his 全量 => ods_lijinquan.crm_user_base_info_his_full erp.u_memcard_reg 全量=> ods_lijinquan.erp_u_memcard_reg_full erp.c_memcard_class_group 全量 => ods_lijinquan.erp_c_memcard_class_group_full erp.u_memcard_reg_c 全量=>ods_lijinquan.erp_u_memcard_reg_c_full his.chronic_patient_info_new 全量 => ods_lijinquan.his_chronic_patient_info_new_full erp.c_org_busi 全量 => ods_lijinquan.erp_c_org_busi_full erp.c_code_value 全量文件处理 =>ods_lijinquan.c_code_value_full # 增量 erp.u_sale_m 先 做 全 量 ( 一 次 性 ) 再 做 增 量 ( 每 天 执 行 ) => ods_lijinquan.erp_u_sale_m_inc erp.u_sale_pay 同上 增量 => ods_lijinquan.erp_u_sale_pay_inc 作业
完成 7 张全量表的抽取, 部署到调理平台 7 个调理任务 所有表最后都要跟源表的总数举行对比 需要一致 全量表处理 升级辅助脚本 - 主动读取表的字段信息, 主动生成 datax json 文件 和 Hive 建表文件 full.py: - #!/bin/python3
- import pymysql
- import sys
- # 自动写 datax 的 json 文件
- if len(sys.argv)!=3:
- print("使用方法为:python3 full.py 数据库名 表名")
- sys.exit()
- sys_name=sys.argv[1]
- table_name=sys.argv[2]
- # datax_json=f"{sys_name}.{table_name}_full.json"
- db=pymysql.connect(
- host='zhiyun.pub',
- port=23306,
- user='zhiyun',
- password='zhiyun',
- database='information_schema'
- )
- cursor=db.cursor()
- cursor.execute(f"select column_name,data_type from
- information_schema.columns where table_schema='{sys_name}' and
- table_name='{table_name}'")
- data=cursor.fetchall()
- fileds=[]for field in data:
- field_name = field[0]
- field_type = field[1]
- #转换成 hive 类型
- field_hive_type="string"
- if field_type=="int" or field_type=="tinyint" or
- field_type=="bigint":
- field_hive_type="int"
- if field_type=="float" or field_type=="double":
- field_hive_type="float"
- fileds.append([field_name,field_hive_type])
- db.close()
- print("=============== 配置 datax ===============")
- file_path=f"/zhiyun/shihaihong/jobs/{sys_name}_{table_name}_fu
- ll.json"
- template_path="/zhiyun/shihaihong/jobs/template.json"
- with open(template_path,"r",encoding="utf-8") as f:
- template_content=f.read()
- new_content=template_content.replace("#sys_name#",sys_name)
- new_content=new_content.replace("#table_name#",table_name)
- #列的替换
- lines=[]
- for filed in fileds:
- line='
- {"name":"'+filed[0]+'
- ","type":"'+filed[1]+'"},'
- lines.append(line)
- columns="\n".join(lines)
- columns=columns.strip(",")
- new_content=new_content.replace(""#columns#"",columns)
- #写入到新的配置
- with open(file_path,"w",encoding="utf-8") as ff:
- ff.write(new_content)
- ff.close()
- f.close()
- print("datax 文件配置成功")
- print("=============== 配置 hive ===============")file_path=f"/zhiyun/shihaihong/sql/{sys_name}_{table_name}_ful
- l.sql"
- template_path="/zhiyun/shihaihong/sql/template.sql"
- with open(template_path,"r",encoding="utf-8") as f:
- template_content=f.read()
- new_content=template_content.replace("#sys_name#",sys_name)
- new_content=new_content.replace("#table_name#",table_name)
- #列的替换
- lines=[]
- for filed in fileds:
- line=f"
- {filed[0]} {filed[1]},"
- lines.append(line)
- columns="\n".join(lines)
- columns=columns.strip(",")
- new_content=new_content.replace("#columns#",columns)
- #写入到新的配置
- with open(file_path,"w",encoding="utf-8") as ff:
- ff.write(new_content)
- ff.close()
- print("hive 建表文件生成成功")
复制代码 json 模板:
template.json:
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": ["jdbc:mysql://zhiyun.pub:233
- 06/crm?useSSL=false"
- ],
- "table": [
- "#table_name#"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- "#column#"
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName":
- "#sys_name#_#table_name#_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }
复制代码 sql 模板文件: - create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";-- 增量表
- create external table if not exists
- ods_shihaihong.#table_name#_full(
- #columns#
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full";
复制代码 测试: python3 python/full.py crm user_base_info_his 生成 json 和 sql 文件:  根据 json 和 sql 文件写出得到数据的 shell 脚本: crm_user_base_info_his_full.sh : - #!/bin/bash
- echo "生成全量配置文件"mkdir -p /zhiyun/shihaihong/jobs
- echo '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/crm?useSSL=false"
- ],
- "table": [
- "user_base_info_his"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},
- {"name":"user_id","type":"string"},
- {"name":"user_type","type":"string"}
- ,
- {"name":"source","type":"string"},
- {"name":"erp_code","type":"string"}
- ,
- {"name":"active_time","type":"strin
- g"},
- {"name":"name","type":"string"},
- {"name":"sex","type":"string"},
- {"name":"education","type":"string"}
- ,{"name":"job","type":"string"},
- {"name":"email","type":"string"},
- {"name":"wechat","type":"string"},
- {"name":"webo","type":"string"},
- {"name":"birthday","type":"string"}
- ,
- {"name":"age","type":"int"},
- {"name":"id_card_no","type":"string
- "},
- {"name":"social_insurance_no","type
- ":"string"},
- {"name":"address","type":"string"},
- {"name":"last_subscribe_time","type
- ":"int"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName":
- "crm_user_base_info_his_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/crm_user_base_info_his_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p
- /zhiyun/shihaihong/ods/crm_user_base_info_his_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists
- ods_shihaihong.user_base_info_his_full(
- id int,
- user_id string,
- user_type string,
- source string,
- erp_code string,
- active_time string,
- name string,
- sex string,
- education string,
- job string,
- email string,
- wechat string,
- webo string,
- birthday string,
- age int,
- id_card_no string,
- social_insurance_no string,
- address string,
- last_subscribe_time int
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/crm_user_base_info_his_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/crm_user_base_info_his_full/*"
- overwrite into table ods_shihaihong.crm_user_base_info_his_full
- partition(createtime='$day');
- # "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.user_base_info_his_full;
- "echo "抽取完成"
复制代码 运行测试:  在本人数据库查抄:  在生产调理中心设置任务:  在 GLUE IDE 插入 sh 文件后,执行一次:  运行成功。 别的六张表用同样的方式操纵即可。 别的六张表的 shell 脚本: erp_u_memcard_reg_full.sh: - #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "u_memcard_reg"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},
- {"name":"memcardno","type":"string"}
- ,
- {"name":"busno","type":"string"},
- {"name":"introducer","type":"string
- "},
- {"name":"cardtype","type":"int"},
- {"name":"cardlevel","type":"int"},{"name":"cardpass","type":"string"}
- ,
- {"name":"cardstatus","type":"int"},
- {"name":"saleamount","type":"string
- "},
- {"name":"realamount","type":"string
- "},
- {"name":"puramount","type":"string"}
- ,
- {"name":"integral","type":"string"}
- ,
- {"name":"integrala","type":"string"}
- ,
- {"name":"integralflag","type":"int"}
- ,
- {"name":"cardholder","type":"string
- "},
- {"name":"cardaddress","type":"strin
- g"},
- {"name":"sex","type":"string"},
- {"name":"tel","type":"string"},
- {"name":"handset","type":"string"},
- {"name":"fax","type":"string"},
- {"name":"createuser","type":"string
- "},
- {"name":"createtime","type":"string
- "},
- {"name":"tstatus","type":"int"},
- {"name":"notes","type":"string"},
- {"name":"stamp","type":"string"},
- {"name":"idcard","type":"string"},
- {"name":"birthday","type":"string"}
- ,
- {"name":"allowintegral","type":"int
- "},
- {"name":"apptype","type":"string"},
- {"name":"applytime","type":"string"}
- ,
- {"name":"invalidate","type":"string
- "},
- {"name":"lastdate","type":"string"}
- ,
- {"name":"bak1","type":"string"},{"name":"scrm_userid","type":"strin
- g"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName":
- "erp_u_memcard_reg_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/erp_u_memcard_reg_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_memcard_reg_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists
- ods_shihaihong.u_memcard_reg_full(
- id int,
- memcardno string,
- busno string,
- introducer string,
- cardtype int,
- cardlevel int,
- cardpass string,
- cardstatus int,saleamount string,
- realamount string,
- puramount string,
- integral string,
- integrala string,
- integralflag int,
- cardholder string,
- cardaddress string,
- sex string,
- tel string,
- handset string,
- fax string,
- createuser string,
- createtime string,
- tstatus int,
- notes string,
- stamp string,
- idcard string,
- birthday string,
- allowintegral int,
- apptype string,
- applytime string,
- invalidate string,
- lastdate string,
- bak1 string,
- scrm_userid string
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/erp_u_memcard_reg_full/*" overwrite
- into table ods_shihaihong.erp_u_memcard_reg_full
- partition(createtime='$day');
- # "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "select count(1) from ods_shihaihong.u_memcard_reg_full;
- "
- echo "抽取完成"
复制代码 erp.c_memcard_class_group: - #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "c_memcard_class_group"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "createtime","type":"string"},{"name":"createuser","type":"string
- "},
- {"name":"groupid","type":"int"},
- {"name":"groupname","type":"string"}
- ,
- {"name":"notes","type":"string"},
- {"name":"stamp","type":"int"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName":
- "erp_c_memcard_class_group_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p
- /zhiyun/shihaihong/ods/erp_c_memcard_class_group_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists
- ods_shihaihong.c_memcard_class_group_full(
- createtime string,
- createuser string,groupid int,
- groupname string,
- notes string,
- stamp int
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location
- "/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/erp_c_memcard_class_group_full/*"
- overwrite into table
- ods_shihaihong.erp_c_memcard_class_group_full
- partition(createtime='$day');
- # "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.c_memcard_class_group_full;
- "
- echo "抽取完成"
- erp.u_memcard_reg_c:
- #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader","parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "u_memcard_reg_c"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},
- {"name":"memcardno","type":"string"}
- ,
- {"name":"sickness","type":"string"}
- ,
- {"name":"status","type":"string"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName":
- "erp_u_memcard_reg_c_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {"channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p
- /zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists
- ods_shihaihong.u_memcard_reg_c_full(
- id int,
- memcardno string,
- sickness string,
- status string
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/erp_u_memcard_reg_c_full/*" overwrite
- into table ods_shihaihong.erp_u_memcard_reg_c_full
- partition(createtime='$day');
- # "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.u_memcard_reg_c_full;
- "echo "抽取完成"
复制代码 his.chronic_patient_info_new: - #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/his?useSSL=false"
- ],
- "table": [
- "chronic_patient_info_new"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},{"name":"member_id","type":"string"}
- ,
- {"name":"erp_code","type":"string"}
- ,
- {"name":"extend","type":"string"},
- {"name":"detect_time","type":"strin
- g"},
- {"name":"bec_chr_mbr_date","type":"
- string"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName":
- "his_chronic_patient_info_new_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' >
- /zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p
- /zhiyun/shihaihong/ods/his_chronic_patient_info_new_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表create external table if not exists
- ods_shihaihong.chronic_patient_info_new_full(
- id int,
- member_id string,
- erp_code string,
- extend string,
- detect_time string,
- bec_chr_mbr_date string
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location
- "/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/his_chronic_patient_info_new_full/*"
- overwrite into table
- ods_shihaihong.his_chronic_patient_info_new_full
- partition(createtime='$day');
- # "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from
- ods_shihaihong.chronic_patient_info_new_full;
- "
- echo "抽取完成"
复制代码 erp.c_org_busi : - #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobsecho '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "c_org_busi"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},
- {"name":"busno","type":"string"},
- {"name":"orgname","type":"string"},
- {"name":"orgsubno","type":"string"}
- ,
- {"name":"orgtype","type":"string"},
- {"name":"salegroup","type":"string"}
- ,
- {"name":"org_tran_code","type":"str
- ing"},
- {"name":"accno","type":"string"},
- {"name":"sendtype","type":"string"}
- ,
- {"name":"sendday","type":"string"},{"name":"maxday","type":"string"},
- {"name":"minday","type":"string"},
- {"name":"notes","type":"string"},
- {"name":"stamp","type":"string"},
- {"name":"status","type":"string"},
- {"name":"customid","type":"string"}
- ,
- {"name":"whl_vendorno","type":"stri
- ng"},
- {"name":"whlgroup","type":"string"}
- ,
- {"name":"rate","type":"string"},
- {"name":"creditamt","type":"string"}
- ,
- {"name":"creditday","type":"string"}
- ,
- {"name":"peoples","type":"string"},
- {"name":"area","type":"string"},
- {"name":"abc","type":"string"},
- {"name":"address","type":"string"},
- {"name":"tel","type":"string"},
- {"name":"principal","type":"string"}
- ,
- {"name":"identity_card","type":"str
- ing"},
- {"name":"mobil","type":"string"},
- {"name":"corporation","type":"strin
- g"},
- {"name":"saler","type":"string"},
- {"name":"createtime","type":"string
- "},
- {"name":"bank","type":"string"},
- {"name":"bankno","type":"string"},
- {"name":"bak1","type":"string"},
- {"name":"bak2","type":"string"},
- {"name":"a_bak1","type":"string"},
- {"name":"aa_bak1","type":"string"},
- {"name":"b_bak1","type":"string"},
- {"name":"bb_bak1","type":"string"},
- {"name":"y_bak1","type":"string"},
- {"name":"t_bak1","type":"string"},
- {"name":"ym_bak1","type":"string"},
- {"name":"tm_bak1","type":"string"},{"name":"supervise_code","type":"st
- ring"},
- {"name":"monthrent","type":"string"}
- ,
- {"name":"wms_warehid","type":"strin
- g"},
- {"name":"settlement_cycle","type":"
- string"},
- {"name":"apply_cycle","type":"strin
- g"},
- {"name":"applydate","type":"string"}
- ,
- {"name":"accounttype","type":"strin
- g"},
- {"name":"applydate_last","type":"st
- ring"},
- {"name":"paymode","type":"string"},
- {"name":"yaolian_flag","type":"stri
- ng"},
- {"name":"org_longitude","type":"str
- ing"},
- {"name":"org_latitude","type":"stri
- ng"},
- {"name":"org_province","type":"stri
- ng"},
- {"name":"org_city","type":"string"}
- ,
- {"name":"org_area","type":"string"}
- ,
- {"name":"business_time","type":"str
- ing"},
- {"name":"yaolian_group","type":"str
- ing"},
- {"name":"pacard_storeid","type":"st
- ring"},
- {"name":"opening_time","type":"stri
- ng"},
- {"name":"ret_ent_id","type":"string
- "},
- {"name":"ent_id","type":"string"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t","fileName": "erp_c_org_busi_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/erp_c_org_busi_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_org_busi_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists
- ods_shihaihong.c_org_busi_full(
- id int,
- busno string,
- orgname string,
- orgsubno string,
- orgtype string,
- salegroup string,
- org_tran_code string,
- accno string,
- sendtype string,
- sendday string,
- maxday string,
- minday string,
- notes string,
- stamp string,status string,
- customid string,
- whl_vendorno string,
- whlgroup string,
- rate string,
- creditamt string,
- creditday string,
- peoples string,
- area string,
- abc string,
- address string,
- tel string,
- principal string,
- identity_card string,
- mobil string,
- corporation string,
- saler string,
- createtime string,
- bank string,
- bankno string,
- bak1 string,
- bak2 string,
- a_bak1 string,
- aa_bak1 string,
- b_bak1 string,
- bb_bak1 string,
- y_bak1 string,
- t_bak1 string,
- ym_bak1 string,
- tm_bak1 string,
- supervise_code string,
- monthrent string,
- wms_warehid string,
- settlement_cycle string,
- apply_cycle string,
- applydate string,
- accounttype string,
- applydate_last string,
- paymode string,
- yaolian_flag string,
- org_longitude string,
- org_latitude string,
- org_province string,org_city string,
- org_area string,
- business_time string,
- yaolian_group string,
- pacard_storeid string,
- opening_time string,
- ret_ent_id string,
- ent_id string
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_c_org_busi_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/erp_c_org_busi_full/*" overwrite into
- table ods_shihaihong.erp_c_org_busi_full
- partition(createtime='$day');
- # "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.c_org_busi_full;
- "
- echo "抽取完成"
复制代码 erp.c_code_value: - #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '{
- "job": {"content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "c_code_value"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},
- {"name":"cat_name","type":"string"}
- ,
- {"name":"cat_code","type":"string"}
- ,
- {"name":"val_name","type":"string"}
- ,
- {"name":"var_desc","type":"string"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName":
- "erp_c_code_value_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/erp_c_code_value_full",
- "writeMode": "truncate"}
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_c_code_value_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_code_value_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_c_code_value_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists
- ods_shihaihong.c_code_value_full(
- id int,
- cat_name string,
- cat_code string,
- val_name string,
- var_desc string
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_c_code_value_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/erp_c_code_value_full/*" overwrite
- into table ods_shihaihong.erp_c_code_value_full
- partition(createtime='$day');
- # "echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.c_code_value_full;
- "
- echo "抽取完成"
复制代码 任务调理: erp.u_memcard_reg:   erp.c_memcard_class_group:   erp.u_memcard_reg_c:   his.chronic_patient_info_new:   erp.c_org_busi :   erp.c_code_value : 将码值表文件上传到 data 中,用 python 写一个数据洗濯的脚本: - #!/bin/python3
- import os
- import pandas as pd
- from openpyxl import load_workbook
- ss=''
- lst=[]#使用 pandas 的 read_excel 函数读取指定路径的 Excel 文件。
- sheet_name=None 表示读取文件中的所有工作表,而 header=2 表示数据的表
- 头位于第 3 行(索引从 0 开始)
- dfs = pd.read_excel('/zhiyun/shihaihong/data/12.码值
- 表.xlsx',sheet_name=None,header=2)
- dir=list(dfs.keys())
- #获取 xlsx 文件数据
- for i in range(len(dir)):
- if i>1:
- #获取 A2 行数据
- wb = load_workbook(filename='/zhiyun/shihaihong/data/12.
- 码值表.xlsx')
- str_head = wb[dir[i]]['A2'].value
- data=dfs[dir[i]]
- #获取其它行数据
- lst1=[]
- for i in data.columns:
- for j in range(len(data)):
- if data[i][j] != 'NaN':
- lst1.append(str(data[i][j]))
- n=int(len(lst1)/2)
- for i in range(n):
- ss=f"{str_head.split('-')[0]}|{str_head.split('-')[
- 1]}|{lst1[i]}|{lst1[i+n]}"
- lst.append(ss)
- print("写入数据到 data")
- template_path = "/zhiyun/shihaihong/data/code_value.txt"
- with open(template_path,"w",encoding="utf-8") as f:
- content="\n".join(lst)
- f.write(content)
- f.close
- print("上传 data 文件 到 hdfs")
- os.system(f"hdfs dfs -mkdir -p /zhiyun/shihaihong/filetxt/")
- os.system(f"hdfs dfs -put {template_path}
- /zhiyun/shihaihong/filetxt/")
复制代码- #!/bin/bash
- # 作用: 完成从编写配置文件到验证数据的整个过程
- # 需要在任何节点都可以执行
- # 创建本人文件夹
- mkdir -p /zhiyun/shihaihong/data /zhiyun/shihaihong/jobs
- /zhiyun/shihaihong/python /zhiyun/shihaihong/shell
- /zhiyun/shihaihong/sql
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e'
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- create external table if not exists
- ods_shihaihong.c_code_value_full(
- cat_name string,
- cat_code string,
- val_name string,
- var_desc string
- )
- row format delimited fields terminated by "|"
- lines terminated by "\n"
- stored as textfile
- location "/zhiyun/shihaihong/filetxt";'
- echo "hive 建表完成"
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.c_code_value_full;
- "
- echo "验证完成"
复制代码 执行后,用 shell 脚本抽取
任务调理: 作业2
完成 2 张增量表的处理 历史数据调理任务 + 增量调理任务 4 个调理任务 所有表最后都要跟源表的总数举行对比 需要一致 抽取的增量表: erp.u_sale_m 先 做 全 量 ( 一 次 性 ) 再 做 增 量 ( 每 天 执 行 ) => ods_lijinquan.erp_u_sale_m_inc erp.u_sale_pay 同上 增量 => ods_lijinquan.erp_u_sale_pay_inc 首次抽取为全量抽取: erp_u_sale_m_full.sh: - #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "u_sale_m"]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":"id","type":"int"},
- {"name":"saleno","type":"string"},
- {"name":"busno","type":"string"},
- {"name":"posno","type":"string"},
- {"name":"extno","type":"string"},
- {"name":"extsource","type":"string"}
- ,
- {"name":"o2o_trade_from","type":"st
- ring"},
- {"name":"channel","type":"int"},
- {"name":"starttime","type":"string"}
- ,
- {"name":"finaltime","type":"string"}
- ,
- {"name":"payee","type":"string"},
- {"name":"discounter","type":"string
- "},
- {"name":"crediter","type":"string"}
- ,
- {"name":"returner","type":"string"}
- ,
- {"name":"warranter1","type":"string
- "},
- {"name":"warranter2","type":"string
- "},
- {"name":"stdsum","type":"string"},
- {"name":"netsum","type":"string"},
- {"name":"loss","type":"string"},
- {"name":"discount","type":"float"},
- {"name":"member","type":"string"},
- {"name":"precash","type":"string"},
- {"name":"stamp","type":"string"},{"name":"shiftid","type":"string"},
- {"name":"shiftdate","type":"string"}
- ,
- {"name":"yb_saleno","type":"string"
- }
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName": "erp_u_sale_m_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/erp_u_sale_m_full",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_m_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists ods_shihaihong.u_sale_m_full(
- id int,
- saleno string,
- busno string,
- posno string,
- extno string,
- extsource string,
- o2o_trade_from string,channel int,
- starttime string,
- finaltime string,
- payee string,
- discounter string,
- crediter string,
- returner string,
- warranter1 string,
- warranter2 string,
- stdsum string,
- netsum string,
- loss string,
- discount float,
- member string,
- precash string,
- stamp string,
- shiftid string,
- shiftdate string,
- yb_saleno string
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_u_sale_m_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath "/zhiyun/shihaihong/tmp/erp_u_sale_m_full/*"
- overwrite into table ods_shihaihong.erp_u_sale_m_full
- partition(createtime='$day');
- # "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.u_sale_m_full;
- "
- echo "抽取完成"
复制代码 写入任务调理平台:

将 sh 文件内容复制入 GLUE IDE 中,执行一次:  后续用增量抽取: - #!/bin/bash
- day=$(date -d "yesterday" +%Y-%m-%d)
- if [ $1 != "" ]; thenday=$(date -d "$1 -1 day" +%Y-%m-%d);
- fi;
- echo "抽取的日期为 $day"
- echo "生成增量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "querySql": [
- "select * from u_sale_m where
- stamp between '\'"$day" 00:00:00\'' and '\'"$day" 23:59:59\'' and
- id>0"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":"id","type":"int"},
- {"name":"saleno","type":"string"},
- {"name":"busno","type":"string"},
- {"name":"posno","type":"string"},
- {"name":"extno","type":"string"},{"name":"extsource","type":"string"}
- ,
- {"name":"o2o_trade_from","type":"str
- ing"},
- {"name":"channel","type":"int"},
- {"name":"starttime","type":"string"}
- ,
- {"name":"finaltime","type":"string"}
- ,
- {"name":"payee","type":"string"},
- {"name":"discounter","type":"string"}
- ,
- {"name":"crediter","type":"string"},
- {"name":"returner","type":"string"},
- {"name":"warranter1","type":"string"}
- ,
- {"name":"warranter2","type":"string"}
- ,
- {"name":"stdsum","type":"string"},
- {"name":"netsum","type":"string"},
- {"name":"loss","type":"string"},
- {"name":"discount","type":"float"},
- {"name":"member","type":"string"},
- {"name":"precash","type":"string"},
- {"name":"stamp","type":"string"},
- {"name":"shiftid","type":"string"},
- {"name":"shiftdate","type":"string"}
- ,
- {"name":"yb_saleno","type":"string"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName": "erp_u_sale_m_inc.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/tmp/erp_u_sale_m_inc",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {"channel": 2
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
- echo "开始抽取"
- hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_m_inc
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- create external table if not exists
- ods_shihaihong.erp_u_sale_m_inc(
- id int,
- saleno string,
- busno string,
- posno string,
- extno string,
- extsource string,
- o2o_trade_from string,
- channel int,
- starttime string,
- finaltime string,
- payee string,
- discounter string,
- crediter string,
- returner string,
- warranter1 string,
- warranter2 string,
- stdsum string,
- netsum string,
- loss string,
- discount float,
- member string,
- precash string,
- shiftid string,
- shiftdate string,
- yb_saleno string) partitioned by (stamp string)
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_u_sale_m_inc";
- '
- echo "加载数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_m_inc/*'
- overwrite into table ods_shihaihong.erp_u_sale_m_inc
- partition(stamp='"$day"');
- "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- show partitions ods_shihaihong.erp_u_sale_m_inc;
- select count(1) from ods_shihaihong.erp_u_sale_m_inc where stamp
- = '"$day"';
- select * from ods_shihaihong.erp_u_sale_m_inc where stamp =
- '"$day"' limit 5;
- "
- echo "抽取完成"
复制代码 任务调理:  执行一次,输入参数:   erp_u_sale_pay_inc : 全量抽取: - #!/bin/bash
- echo "生成全量配置文件"
- mkdir -p /zhiyun/shihaihong/jobsecho '{
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "u_sale_pay"
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},
- {"name":"saleno","type":"string"},
- {"name":"cardno","type":"string"},
- {"name":"netsum","type":"string"},
- {"name":"paytype","type":"string"},
- {"name":"bak1","type":"string"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName": "erp_u_sale_pay_full.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/ods/erp_u_sale_pay_full",
- "writeMode": "truncate"}
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }' > /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
- echo "开始抽取"
- hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_pay_full
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
- create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- -- 增量表
- create external table if not exists
- ods_shihaihong.u_sale_pay_full(
- id int,
- saleno string,
- cardno string,
- netsum string,
- paytype string,
- bak1 string
- )
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_u_sale_pay_full";
- '
- # echo "加载数据"
- # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- # load data inpath
- "/zhiyun/shihaihong/tmp/erp_u_sale_pay_full/*" overwrite into
- table ods_shihaihong.erp_u_sale_pay_full
- partition(createtime='$day');# "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- select count(1) from ods_shihaihong.u_sale_pay_full;
- "
- echo "抽取完成"
复制代码 任务调理:  编辑 GLUE IDE, 执行一次:  增量抽取: - #!/bin/bash
- day=$(date -d "yesterday" +%Y-%m-%d)
- if [ $1 != "" ]; then
- day=$(date -d "$1 -1 day" +%Y-%m-%d);
- fi;
- echo "抽取的日期为 $day"
- echo "生成增量配置文件"
- mkdir -p /zhiyun/shihaihong/jobs
- echo '
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": ["*"],
- "connection": [
- {
- "jdbcUrl": [
- "jdbc:mysql://zhiyun.pub:233
- 06/erp?useSSL=false"
- ],
- "table": [
- "select u_sale_pay.*,stamp
- from u_sale_pay left join u_sale_m on
- u_sale_pay.saleno=u_sale_m.saleno where stamp between '\'"$day"
- 00:00:00\'' and '\'"$day" 23:59:59\'' and id>0 "
- ]
- }
- ],
- "password": "zhiyun",
- "username": "zhiyun"
- }},
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "column": [
- {"name":
- "id","type":"int"},
- {"name":"saleno","type":"string"},
- {"name":"cardno","type":"string"},
- {"name":"netsum","type":"string"},
- {"name":"paytype","type":"string"},
- {"name":"bak1","type":"string"},
- {"name":"stamp","type":"string"}
- ],
- "defaultFS": "hdfs://cdh02:8020",
- "fieldDelimiter": "\t",
- "fileName": "erp_u_sale_pay_inc.data",
- "fileType": "orc",
- "path":
- "/zhiyun/shihaihong/tmp/erp_u_sale_pay_inc",
- "writeMode": "truncate"
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": "3"
- }
- }
- }
- }
- ' > /zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
- echo "开始抽取"
- hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_play_inc
- python /opt/datax/bin/datax.py
- /zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
- echo "hive 建表"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
- "/zhiyun/shihaihong/ods";
- create external table if not exists
- ods_shihaihong.erp_u_sale_play_inc(
- id int,
- saleno string,
- cardno string,
- netsum string,
- paytype string,
- bak1 string
- ) partitioned by (stamp string)
- row format delimited fields terminated by "\t"
- lines terminated by "\n"
- stored as orc
- location "/zhiyun/shihaihong/ods/erp_u_sale_play_inc";
- '
- echo "加载数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_play_inc/*'
- overwrite into table ods_shihaihong.erp_u_sale_play_inc
- partition(stamp='"$day"');
- "
- echo "验证数据"
- beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
- show partitions ods_shihaihong.erp_u_sale_play_inc;
- select count(1) from ods_shihaihong.erp_u_sale_play_inc where
- stamp = '"$day"';
- select * from ods_shihaihong.erp_u_sale_play_inc where stamp =
- '"$day"' limit 5;
- "
- echo "抽取完成"
复制代码 任务调理:  执行一次:  
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |