学习大数据DAY59 全量抽取和增量抽取实战

打印 上一主题 下一主题

主题 888|帖子 888|积分 2666

目录

需求流程:
需求分析与规范
作业
作业2


需求流程:

   全量抽取 增量抽取 - DataX Kettle Sqoop ...     场景: 业务部分同事大概甲方的工作职员给我们的部分经理和你提出了新的需      流程: 接洽 => 开会讨论 => 确认需求 => 落地    需求文档(  具体需要的东西)     原型文档  (报表的原型 纸笔/画图工具)     第一张汇总报表需要的指标 - 决策报表 汇总表 每次计算只有一天的记录 -     大 BOSS:     - 全部会员数 新增会员数     - 有效会员数 有效会员占比     - 流失会员数: 倒推一年含一年无消耗记录的会员     - 净增有效会员数     - 会员消耗级别分类人数 (A >=2000 B >=1000 < 2000 C >=500 <1000 D >=100     <500 E <100)     - 会员消耗总额     - 会员消耗总次数     - 会员消耗单价     - 流失会员数     - 新增流失会员     - 60 天会员复购率     - 180 天会员复购率     - 365 天会员复购率     第二张报表用于市场营销 - 明细报表, 普通报表 - 市场部同事     - 筛选大于 30 笔的会员大概消耗总金额大于 1500 的会员作为目的用户 用于电     话营销     - 字段: 姓名 手机号 消耗总额 消耗次数 都会 门店 付款偏好 (手机 刷卡     现金..) 关注的疾病     - 该会员近来 3 个月的月消耗订单数和额度 m1_total m1_sale m2_total     m2_sale     第三张报表用于市场营销 - 2022.1-2023.12 每个月消耗前 20 的会员名单     24X20=480 条 - 市场部经理     - T+1(月) yyyy-mm 月份     - 会员姓名 接洽方式... 消耗级别分类, 近来 30 天消耗订单数和总额     - 该会员当月前 30 天消耗, 60 天消耗, 90 天消耗 (困难点)     - 报表排序方式: 默认按消耗总额倒序 / 按消耗次数倒序     - 报表默认显示 2021 年 1 月份的数据, 可选 2 年内任何一个月的数据查看   需求分析与规范

   经理整理出一个类似宽表文档的东西 - 方便后续的明细查询和指标计算     它决定了我们需要抽取哪些表     crm.user_base_info_his   客户信息表     erp.u_memcard_reg   会员卡信息表     erp.u_sale_m   订单表     1900W   数据     erp.u_sale_pay   订单付出表     1200W  erp.c_memcard_class_group   会员分组表     erp.u_memcard_reg_c   疾病关注表     his.chronic_patient_info_new   检测表     erp.c_org_busi   门店表     #   额外的从文件处理的码值表     erp.c_code_value     # 7   个全量     体系名前缀  _  表名  _(full|inc)     crm.user_base_info_his     全量 => ods_lijinquan.crm_user_base_info_his_full     erp.u_memcard_reg     全量=> ods_lijinquan.erp_u_memcard_reg_full     erp.c_memcard_class_group     全量 => ods_lijinquan.erp_c_memcard_class_group_full     erp.u_memcard_reg_c     全量=>ods_lijinquan.erp_u_memcard_reg_c_full     his.chronic_patient_info_new     全量 => ods_lijinquan.his_chronic_patient_info_new_full     erp.c_org_busi   全量   => ods_lijinquan.erp_c_org_busi_full     erp.c_code_value     全量文件处理     =>ods_lijinquan.c_code_value_full     #   增量     erp.u_sale_m   先 做 全 量   (   一 次 性   )   再 做 增 量   (   每 天 执 行   )     => ods_lijinquan.erp_u_sale_m_inc     erp.u_sale_pay   同上 增量   => ods_lijinquan.erp_u_sale_pay_inc  作业

   完成 7 张全量表的抽取, 部署到调理平台 7 个调理任务     所有表最后都要跟源表的总数举行对比 需要一致     全量表处理     升级辅助脚本     - 主动读取表的字段信息, 主动生成 datax json 文件 和 Hive     建表文件     full.py:   
  1. #!/bin/python3
  2. import pymysql
  3. import sys
  4. # 自动写 datax 的 json 文件
  5. if len(sys.argv)!=3:
  6. print("使用方法为:python3 full.py 数据库名 表名")
  7. sys.exit()
  8. sys_name=sys.argv[1]
  9. table_name=sys.argv[2]
  10. # datax_json=f"{sys_name}.{table_name}_full.json"
  11. db=pymysql.connect(
  12. host='zhiyun.pub',
  13. port=23306,
  14. user='zhiyun',
  15. password='zhiyun',
  16. database='information_schema'
  17. )
  18. cursor=db.cursor()
  19. cursor.execute(f"select column_name,data_type from
  20. information_schema.columns where table_schema='{sys_name}' and
  21. table_name='{table_name}'")
  22. data=cursor.fetchall()
  23. fileds=[]for field in data:
  24. field_name = field[0]
  25. field_type = field[1]
  26. #转换成 hive 类型
  27. field_hive_type="string"
  28. if field_type=="int" or field_type=="tinyint" or
  29. field_type=="bigint":
  30. field_hive_type="int"
  31. if field_type=="float" or field_type=="double":
  32. field_hive_type="float"
  33. fileds.append([field_name,field_hive_type])
  34. db.close()
  35. print("=============== 配置 datax ===============")
  36. file_path=f"/zhiyun/shihaihong/jobs/{sys_name}_{table_name}_fu
  37. ll.json"
  38. template_path="/zhiyun/shihaihong/jobs/template.json"
  39. with open(template_path,"r",encoding="utf-8") as f:
  40. template_content=f.read()
  41. new_content=template_content.replace("#sys_name#",sys_name)
  42. new_content=new_content.replace("#table_name#",table_name)
  43. #列的替换
  44. lines=[]
  45. for filed in fileds:
  46. line='
  47. {"name":"'+filed[0]+'
  48. ","type":"'+filed[1]+'"},'
  49. lines.append(line)
  50. columns="\n".join(lines)
  51. columns=columns.strip(",")
  52. new_content=new_content.replace(""#columns#"",columns)
  53. #写入到新的配置
  54. with open(file_path,"w",encoding="utf-8") as ff:
  55. ff.write(new_content)
  56. ff.close()
  57. f.close()
  58. print("datax 文件配置成功")
  59. print("=============== 配置 hive ===============")file_path=f"/zhiyun/shihaihong/sql/{sys_name}_{table_name}_ful
  60. l.sql"
  61. template_path="/zhiyun/shihaihong/sql/template.sql"
  62. with open(template_path,"r",encoding="utf-8") as f:
  63. template_content=f.read()
  64. new_content=template_content.replace("#sys_name#",sys_name)
  65. new_content=new_content.replace("#table_name#",table_name)
  66. #列的替换
  67. lines=[]
  68. for filed in fileds:
  69. line=f"
  70. {filed[0]} {filed[1]},"
  71. lines.append(line)
  72. columns="\n".join(lines)
  73. columns=columns.strip(",")
  74. new_content=new_content.replace("#columns#",columns)
  75. #写入到新的配置
  76. with open(file_path,"w",encoding="utf-8") as ff:
  77. ff.write(new_content)
  78. ff.close()
  79. print("hive 建表文件生成成功")
复制代码
json 模板:
  template.json:
  1. {
  2. "job": {
  3. "content": [
  4. {
  5. "reader": {
  6. "name": "mysqlreader",
  7. "parameter": {
  8. "column": ["*"],
  9. "connection": [
  10. {
  11. "jdbcUrl": ["jdbc:mysql://zhiyun.pub:233
  12. 06/crm?useSSL=false"
  13. ],
  14. "table": [
  15. "#table_name#"
  16. ]
  17. }
  18. ],
  19. "password": "zhiyun",
  20. "username": "zhiyun"
  21. }
  22. },
  23. "writer": {
  24. "name": "hdfswriter",
  25. "parameter": {
  26. "column": [
  27. "#column#"
  28. ],
  29. "defaultFS": "hdfs://cdh02:8020",
  30. "fieldDelimiter": "\t",
  31. "fileName":
  32. "#sys_name#_#table_name#_full.data",
  33. "fileType": "orc",
  34. "path":
  35. "/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full",
  36. "writeMode": "truncate"
  37. }
  38. }
  39. }
  40. ],
  41. "setting": {
  42. "speed": {
  43. "channel": "3"
  44. }
  45. }
  46. }
  47. }
复制代码
   sql   模板文件:   
  1. create database if not exists ods_shihaihong location
  2. "/zhiyun/shihaihong/ods";-- 增量表
  3. create external table if not exists
  4. ods_shihaihong.#table_name#_full(
  5. #columns#
  6. )
  7. row format delimited fields terminated by "\t"
  8. lines terminated by "\n"
  9. stored as orc
  10. location "/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full";
复制代码
   测试:     python3 python/full.py crm user_base_info_his     生成   json   和   sql   文件:     
    根据   json   和   sql   文件写出得到数据的   shell   脚本:     crm_user_base_info_his_full.sh  :   
  1. #!/bin/bash
  2. echo "生成全量配置文件"mkdir -p /zhiyun/shihaihong/jobs
  3. echo '{
  4. "job": {
  5. "content": [
  6. {
  7. "reader": {
  8. "name": "mysqlreader",
  9. "parameter": {
  10. "column": ["*"],
  11. "connection": [
  12. {
  13. "jdbcUrl": [
  14. "jdbc:mysql://zhiyun.pub:233
  15. 06/crm?useSSL=false"
  16. ],
  17. "table": [
  18. "user_base_info_his"
  19. ]
  20. }
  21. ],
  22. "password": "zhiyun",
  23. "username": "zhiyun"
  24. }
  25. },
  26. "writer": {
  27. "name": "hdfswriter",
  28. "parameter": {
  29. "column": [
  30. {"name":
  31. "id","type":"int"},
  32. {"name":"user_id","type":"string"},
  33. {"name":"user_type","type":"string"}
  34. ,
  35. {"name":"source","type":"string"},
  36. {"name":"erp_code","type":"string"}
  37. ,
  38. {"name":"active_time","type":"strin
  39. g"},
  40. {"name":"name","type":"string"},
  41. {"name":"sex","type":"string"},
  42. {"name":"education","type":"string"}
  43. ,{"name":"job","type":"string"},
  44. {"name":"email","type":"string"},
  45. {"name":"wechat","type":"string"},
  46. {"name":"webo","type":"string"},
  47. {"name":"birthday","type":"string"}
  48. ,
  49. {"name":"age","type":"int"},
  50. {"name":"id_card_no","type":"string
  51. "},
  52. {"name":"social_insurance_no","type
  53. ":"string"},
  54. {"name":"address","type":"string"},
  55. {"name":"last_subscribe_time","type
  56. ":"int"}
  57. ],
  58. "defaultFS": "hdfs://cdh02:8020",
  59. "fieldDelimiter": "\t",
  60. "fileName":
  61. "crm_user_base_info_his_full.data",
  62. "fileType": "orc",
  63. "path":
  64. "/zhiyun/shihaihong/ods/crm_user_base_info_his_full",
  65. "writeMode": "truncate"
  66. }
  67. }
  68. }
  69. ],
  70. "setting": {
  71. "speed": {
  72. "channel": "3"
  73. }
  74. }
  75. }
  76. }' > /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
  77. echo "开始抽取"
  78. hadoop fs -mkdir -p
  79. /zhiyun/shihaihong/ods/crm_user_base_info_his_full
  80. python /opt/datax/bin/datax.py
  81. /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
  82. echo "hive 建表"
  83. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
  84. "/zhiyun/shihaihong/ods";
  85. -- 增量表
  86. create external table if not exists
  87. ods_shihaihong.user_base_info_his_full(
  88. id int,
  89. user_id string,
  90. user_type string,
  91. source string,
  92. erp_code string,
  93. active_time string,
  94. name string,
  95. sex string,
  96. education string,
  97. job string,
  98. email string,
  99. wechat string,
  100. webo string,
  101. birthday string,
  102. age int,
  103. id_card_no string,
  104. social_insurance_no string,
  105. address string,
  106. last_subscribe_time int
  107. )
  108. row format delimited fields terminated by "\t"
  109. lines terminated by "\n"
  110. stored as orc
  111. location "/zhiyun/shihaihong/ods/crm_user_base_info_his_full";
  112. '
  113. # echo "加载数据"
  114. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  115. # load data inpath
  116. "/zhiyun/shihaihong/tmp/crm_user_base_info_his_full/*"
  117. overwrite into table ods_shihaihong.crm_user_base_info_his_full
  118. partition(createtime='$day');
  119. # "
  120. echo "验证数据"
  121. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  122. select count(1) from ods_shihaihong.user_base_info_his_full;
  123. "echo "抽取完成"
复制代码
   运行测试:     
    在本人数据库查抄:   
    在生产调理中心设置任务:     
    在 GLUE IDE 插入 sh 文件后,执行一次:     
    运行成功。     别的六张表用同样的方式操纵即可。     别的六张表的   shell   脚本: erp_u_memcard_reg_full.sh:   
  1. #!/bin/bash
  2. echo "生成全量配置文件"
  3. mkdir -p /zhiyun/shihaihong/jobs
  4. echo '{
  5. "job": {
  6. "content": [
  7. {
  8. "reader": {
  9. "name": "mysqlreader",
  10. "parameter": {
  11. "column": ["*"],
  12. "connection": [
  13. {
  14. "jdbcUrl": [
  15. "jdbc:mysql://zhiyun.pub:233
  16. 06/erp?useSSL=false"
  17. ],
  18. "table": [
  19. "u_memcard_reg"
  20. ]
  21. }
  22. ],
  23. "password": "zhiyun",
  24. "username": "zhiyun"
  25. }
  26. },
  27. "writer": {
  28. "name": "hdfswriter",
  29. "parameter": {
  30. "column": [
  31. {"name":
  32. "id","type":"int"},
  33. {"name":"memcardno","type":"string"}
  34. ,
  35. {"name":"busno","type":"string"},
  36. {"name":"introducer","type":"string
  37. "},
  38. {"name":"cardtype","type":"int"},
  39. {"name":"cardlevel","type":"int"},{"name":"cardpass","type":"string"}
  40. ,
  41. {"name":"cardstatus","type":"int"},
  42. {"name":"saleamount","type":"string
  43. "},
  44. {"name":"realamount","type":"string
  45. "},
  46. {"name":"puramount","type":"string"}
  47. ,
  48. {"name":"integral","type":"string"}
  49. ,
  50. {"name":"integrala","type":"string"}
  51. ,
  52. {"name":"integralflag","type":"int"}
  53. ,
  54. {"name":"cardholder","type":"string
  55. "},
  56. {"name":"cardaddress","type":"strin
  57. g"},
  58. {"name":"sex","type":"string"},
  59. {"name":"tel","type":"string"},
  60. {"name":"handset","type":"string"},
  61. {"name":"fax","type":"string"},
  62. {"name":"createuser","type":"string
  63. "},
  64. {"name":"createtime","type":"string
  65. "},
  66. {"name":"tstatus","type":"int"},
  67. {"name":"notes","type":"string"},
  68. {"name":"stamp","type":"string"},
  69. {"name":"idcard","type":"string"},
  70. {"name":"birthday","type":"string"}
  71. ,
  72. {"name":"allowintegral","type":"int
  73. "},
  74. {"name":"apptype","type":"string"},
  75. {"name":"applytime","type":"string"}
  76. ,
  77. {"name":"invalidate","type":"string
  78. "},
  79. {"name":"lastdate","type":"string"}
  80. ,
  81. {"name":"bak1","type":"string"},{"name":"scrm_userid","type":"strin
  82. g"}
  83. ],
  84. "defaultFS": "hdfs://cdh02:8020",
  85. "fieldDelimiter": "\t",
  86. "fileName":
  87. "erp_u_memcard_reg_full.data",
  88. "fileType": "orc",
  89. "path":
  90. "/zhiyun/shihaihong/ods/erp_u_memcard_reg_full",
  91. "writeMode": "truncate"
  92. }
  93. }
  94. }
  95. ],
  96. "setting": {
  97. "speed": {
  98. "channel": "3"
  99. }
  100. }
  101. }
  102. }' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
  103. echo "开始抽取"
  104. hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_memcard_reg_full
  105. python /opt/datax/bin/datax.py
  106. /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
  107. echo "hive 建表"
  108. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  109. create database if not exists ods_shihaihong location
  110. "/zhiyun/shihaihong/ods";
  111. -- 增量表
  112. create external table if not exists
  113. ods_shihaihong.u_memcard_reg_full(
  114. id int,
  115. memcardno string,
  116. busno string,
  117. introducer string,
  118. cardtype int,
  119. cardlevel int,
  120. cardpass string,
  121. cardstatus int,saleamount string,
  122. realamount string,
  123. puramount string,
  124. integral string,
  125. integrala string,
  126. integralflag int,
  127. cardholder string,
  128. cardaddress string,
  129. sex string,
  130. tel string,
  131. handset string,
  132. fax string,
  133. createuser string,
  134. createtime string,
  135. tstatus int,
  136. notes string,
  137. stamp string,
  138. idcard string,
  139. birthday string,
  140. allowintegral int,
  141. apptype string,
  142. applytime string,
  143. invalidate string,
  144. lastdate string,
  145. bak1 string,
  146. scrm_userid string
  147. )
  148. row format delimited fields terminated by "\t"
  149. lines terminated by "\n"
  150. stored as orc
  151. location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_full";
  152. '
  153. # echo "加载数据"
  154. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  155. # load data inpath
  156. "/zhiyun/shihaihong/tmp/erp_u_memcard_reg_full/*" overwrite
  157. into table ods_shihaihong.erp_u_memcard_reg_full
  158. partition(createtime='$day');
  159. # "
  160. echo "验证数据"
  161. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "select count(1) from ods_shihaihong.u_memcard_reg_full;
  162. "
  163. echo "抽取完成"
复制代码
   erp.c_memcard_class_group:   
  1. #!/bin/bash
  2. echo "生成全量配置文件"
  3. mkdir -p /zhiyun/shihaihong/jobs
  4. echo '{
  5. "job": {
  6. "content": [
  7. {
  8. "reader": {
  9. "name": "mysqlreader",
  10. "parameter": {
  11. "column": ["*"],
  12. "connection": [
  13. {
  14. "jdbcUrl": [
  15. "jdbc:mysql://zhiyun.pub:233
  16. 06/erp?useSSL=false"
  17. ],
  18. "table": [
  19. "c_memcard_class_group"
  20. ]
  21. }
  22. ],
  23. "password": "zhiyun",
  24. "username": "zhiyun"
  25. }
  26. },
  27. "writer": {
  28. "name": "hdfswriter",
  29. "parameter": {
  30. "column": [
  31. {"name":
  32. "createtime","type":"string"},{"name":"createuser","type":"string
  33. "},
  34. {"name":"groupid","type":"int"},
  35. {"name":"groupname","type":"string"}
  36. ,
  37. {"name":"notes","type":"string"},
  38. {"name":"stamp","type":"int"}
  39. ],
  40. "defaultFS": "hdfs://cdh02:8020",
  41. "fieldDelimiter": "\t",
  42. "fileName":
  43. "erp_c_memcard_class_group_full.data",
  44. "fileType": "orc",
  45. "path":
  46. "/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full",
  47. "writeMode": "truncate"
  48. }
  49. }
  50. }
  51. ],
  52. "setting": {
  53. "speed": {
  54. "channel": "3"
  55. }
  56. }
  57. }
  58. }' > /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
  59. echo "开始抽取"
  60. hadoop fs -mkdir -p
  61. /zhiyun/shihaihong/ods/erp_c_memcard_class_group_full
  62. python /opt/datax/bin/datax.py
  63. /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
  64. echo "hive 建表"
  65. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  66. create database if not exists ods_shihaihong location
  67. "/zhiyun/shihaihong/ods";
  68. -- 增量表
  69. create external table if not exists
  70. ods_shihaihong.c_memcard_class_group_full(
  71. createtime string,
  72. createuser string,groupid int,
  73. groupname string,
  74. notes string,
  75. stamp int
  76. )
  77. row format delimited fields terminated by "\t"
  78. lines terminated by "\n"
  79. stored as orc
  80. location
  81. "/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full";
  82. '
  83. # echo "加载数据"
  84. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  85. # load data inpath
  86. "/zhiyun/shihaihong/tmp/erp_c_memcard_class_group_full/*"
  87. overwrite into table
  88. ods_shihaihong.erp_c_memcard_class_group_full
  89. partition(createtime='$day');
  90. # "
  91. echo "验证数据"
  92. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  93. select count(1) from ods_shihaihong.c_memcard_class_group_full;
  94. "
  95. echo "抽取完成"
  96. erp.u_memcard_reg_c:
  97. #!/bin/bash
  98. echo "生成全量配置文件"
  99. mkdir -p /zhiyun/shihaihong/jobs
  100. echo '{
  101. "job": {
  102. "content": [
  103. {
  104. "reader": {
  105. "name": "mysqlreader","parameter": {
  106. "column": ["*"],
  107. "connection": [
  108. {
  109. "jdbcUrl": [
  110. "jdbc:mysql://zhiyun.pub:233
  111. 06/erp?useSSL=false"
  112. ],
  113. "table": [
  114. "u_memcard_reg_c"
  115. ]
  116. }
  117. ],
  118. "password": "zhiyun",
  119. "username": "zhiyun"
  120. }
  121. },
  122. "writer": {
  123. "name": "hdfswriter",
  124. "parameter": {
  125. "column": [
  126. {"name":
  127. "id","type":"int"},
  128. {"name":"memcardno","type":"string"}
  129. ,
  130. {"name":"sickness","type":"string"}
  131. ,
  132. {"name":"status","type":"string"}
  133. ],
  134. "defaultFS": "hdfs://cdh02:8020",
  135. "fieldDelimiter": "\t",
  136. "fileName":
  137. "erp_u_memcard_reg_c_full.data",
  138. "fileType": "orc",
  139. "path":
  140. "/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full",
  141. "writeMode": "truncate"
  142. }
  143. }
  144. }
  145. ],
  146. "setting": {
  147. "speed": {"channel": "3"
  148. }
  149. }
  150. }
  151. }' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
  152. echo "开始抽取"
  153. hadoop fs -mkdir -p
  154. /zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full
  155. python /opt/datax/bin/datax.py
  156. /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
  157. echo "hive 建表"
  158. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  159. create database if not exists ods_shihaihong location
  160. "/zhiyun/shihaihong/ods";
  161. -- 增量表
  162. create external table if not exists
  163. ods_shihaihong.u_memcard_reg_c_full(
  164. id int,
  165. memcardno string,
  166. sickness string,
  167. status string
  168. )
  169. row format delimited fields terminated by "\t"
  170. lines terminated by "\n"
  171. stored as orc
  172. location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full";
  173. '
  174. # echo "加载数据"
  175. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  176. # load data inpath
  177. "/zhiyun/shihaihong/tmp/erp_u_memcard_reg_c_full/*" overwrite
  178. into table ods_shihaihong.erp_u_memcard_reg_c_full
  179. partition(createtime='$day');
  180. # "
  181. echo "验证数据"
  182. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  183. select count(1) from ods_shihaihong.u_memcard_reg_c_full;
  184. "echo "抽取完成"
复制代码
   his.chronic_patient_info_new:   
  1. #!/bin/bash
  2. echo "生成全量配置文件"
  3. mkdir -p /zhiyun/shihaihong/jobs
  4. echo '{
  5. "job": {
  6. "content": [
  7. {
  8. "reader": {
  9. "name": "mysqlreader",
  10. "parameter": {
  11. "column": ["*"],
  12. "connection": [
  13. {
  14. "jdbcUrl": [
  15. "jdbc:mysql://zhiyun.pub:233
  16. 06/his?useSSL=false"
  17. ],
  18. "table": [
  19. "chronic_patient_info_new"
  20. ]
  21. }
  22. ],
  23. "password": "zhiyun",
  24. "username": "zhiyun"
  25. }
  26. },
  27. "writer": {
  28. "name": "hdfswriter",
  29. "parameter": {
  30. "column": [
  31. {"name":
  32. "id","type":"int"},{"name":"member_id","type":"string"}
  33. ,
  34. {"name":"erp_code","type":"string"}
  35. ,
  36. {"name":"extend","type":"string"},
  37. {"name":"detect_time","type":"strin
  38. g"},
  39. {"name":"bec_chr_mbr_date","type":"
  40. string"}
  41. ],
  42. "defaultFS": "hdfs://cdh02:8020",
  43. "fieldDelimiter": "\t",
  44. "fileName":
  45. "his_chronic_patient_info_new_full.data",
  46. "fileType": "orc",
  47. "path":
  48. "/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full",
  49. "writeMode": "truncate"
  50. }
  51. }
  52. }
  53. ],
  54. "setting": {
  55. "speed": {
  56. "channel": "3"
  57. }
  58. }
  59. }
  60. }' >
  61. /zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
  62. echo "开始抽取"
  63. hadoop fs -mkdir -p
  64. /zhiyun/shihaihong/ods/his_chronic_patient_info_new_full
  65. python /opt/datax/bin/datax.py
  66. /zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
  67. echo "hive 建表"
  68. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  69. create database if not exists ods_shihaihong location
  70. "/zhiyun/shihaihong/ods";
  71. -- 增量表create external table if not exists
  72. ods_shihaihong.chronic_patient_info_new_full(
  73. id int,
  74. member_id string,
  75. erp_code string,
  76. extend string,
  77. detect_time string,
  78. bec_chr_mbr_date string
  79. )
  80. row format delimited fields terminated by "\t"
  81. lines terminated by "\n"
  82. stored as orc
  83. location
  84. "/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full";
  85. '
  86. # echo "加载数据"
  87. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  88. # load data inpath
  89. "/zhiyun/shihaihong/tmp/his_chronic_patient_info_new_full/*"
  90. overwrite into table
  91. ods_shihaihong.his_chronic_patient_info_new_full
  92. partition(createtime='$day');
  93. # "
  94. echo "验证数据"
  95. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  96. select count(1) from
  97. ods_shihaihong.chronic_patient_info_new_full;
  98. "
  99. echo "抽取完成"
复制代码
   erp.c_org_busi :   
  1. #!/bin/bash
  2. echo "生成全量配置文件"
  3. mkdir -p /zhiyun/shihaihong/jobsecho '{
  4. "job": {
  5. "content": [
  6. {
  7. "reader": {
  8. "name": "mysqlreader",
  9. "parameter": {
  10. "column": ["*"],
  11. "connection": [
  12. {
  13. "jdbcUrl": [
  14. "jdbc:mysql://zhiyun.pub:233
  15. 06/erp?useSSL=false"
  16. ],
  17. "table": [
  18. "c_org_busi"
  19. ]
  20. }
  21. ],
  22. "password": "zhiyun",
  23. "username": "zhiyun"
  24. }
  25. },
  26. "writer": {
  27. "name": "hdfswriter",
  28. "parameter": {
  29. "column": [
  30. {"name":
  31. "id","type":"int"},
  32. {"name":"busno","type":"string"},
  33. {"name":"orgname","type":"string"},
  34. {"name":"orgsubno","type":"string"}
  35. ,
  36. {"name":"orgtype","type":"string"},
  37. {"name":"salegroup","type":"string"}
  38. ,
  39. {"name":"org_tran_code","type":"str
  40. ing"},
  41. {"name":"accno","type":"string"},
  42. {"name":"sendtype","type":"string"}
  43. ,
  44. {"name":"sendday","type":"string"},{"name":"maxday","type":"string"},
  45. {"name":"minday","type":"string"},
  46. {"name":"notes","type":"string"},
  47. {"name":"stamp","type":"string"},
  48. {"name":"status","type":"string"},
  49. {"name":"customid","type":"string"}
  50. ,
  51. {"name":"whl_vendorno","type":"stri
  52. ng"},
  53. {"name":"whlgroup","type":"string"}
  54. ,
  55. {"name":"rate","type":"string"},
  56. {"name":"creditamt","type":"string"}
  57. ,
  58. {"name":"creditday","type":"string"}
  59. ,
  60. {"name":"peoples","type":"string"},
  61. {"name":"area","type":"string"},
  62. {"name":"abc","type":"string"},
  63. {"name":"address","type":"string"},
  64. {"name":"tel","type":"string"},
  65. {"name":"principal","type":"string"}
  66. ,
  67. {"name":"identity_card","type":"str
  68. ing"},
  69. {"name":"mobil","type":"string"},
  70. {"name":"corporation","type":"strin
  71. g"},
  72. {"name":"saler","type":"string"},
  73. {"name":"createtime","type":"string
  74. "},
  75. {"name":"bank","type":"string"},
  76. {"name":"bankno","type":"string"},
  77. {"name":"bak1","type":"string"},
  78. {"name":"bak2","type":"string"},
  79. {"name":"a_bak1","type":"string"},
  80. {"name":"aa_bak1","type":"string"},
  81. {"name":"b_bak1","type":"string"},
  82. {"name":"bb_bak1","type":"string"},
  83. {"name":"y_bak1","type":"string"},
  84. {"name":"t_bak1","type":"string"},
  85. {"name":"ym_bak1","type":"string"},
  86. {"name":"tm_bak1","type":"string"},{"name":"supervise_code","type":"st
  87. ring"},
  88. {"name":"monthrent","type":"string"}
  89. ,
  90. {"name":"wms_warehid","type":"strin
  91. g"},
  92. {"name":"settlement_cycle","type":"
  93. string"},
  94. {"name":"apply_cycle","type":"strin
  95. g"},
  96. {"name":"applydate","type":"string"}
  97. ,
  98. {"name":"accounttype","type":"strin
  99. g"},
  100. {"name":"applydate_last","type":"st
  101. ring"},
  102. {"name":"paymode","type":"string"},
  103. {"name":"yaolian_flag","type":"stri
  104. ng"},
  105. {"name":"org_longitude","type":"str
  106. ing"},
  107. {"name":"org_latitude","type":"stri
  108. ng"},
  109. {"name":"org_province","type":"stri
  110. ng"},
  111. {"name":"org_city","type":"string"}
  112. ,
  113. {"name":"org_area","type":"string"}
  114. ,
  115. {"name":"business_time","type":"str
  116. ing"},
  117. {"name":"yaolian_group","type":"str
  118. ing"},
  119. {"name":"pacard_storeid","type":"st
  120. ring"},
  121. {"name":"opening_time","type":"stri
  122. ng"},
  123. {"name":"ret_ent_id","type":"string
  124. "},
  125. {"name":"ent_id","type":"string"}
  126. ],
  127. "defaultFS": "hdfs://cdh02:8020",
  128. "fieldDelimiter": "\t","fileName": "erp_c_org_busi_full.data",
  129. "fileType": "orc",
  130. "path":
  131. "/zhiyun/shihaihong/ods/erp_c_org_busi_full",
  132. "writeMode": "truncate"
  133. }
  134. }
  135. }
  136. ],
  137. "setting": {
  138. "speed": {
  139. "channel": "3"
  140. }
  141. }
  142. }
  143. }' > /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
  144. echo "开始抽取"
  145. hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_org_busi_full
  146. python /opt/datax/bin/datax.py
  147. /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
  148. echo "hive 建表"
  149. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  150. create database if not exists ods_shihaihong location
  151. "/zhiyun/shihaihong/ods";
  152. -- 增量表
  153. create external table if not exists
  154. ods_shihaihong.c_org_busi_full(
  155. id int,
  156. busno string,
  157. orgname string,
  158. orgsubno string,
  159. orgtype string,
  160. salegroup string,
  161. org_tran_code string,
  162. accno string,
  163. sendtype string,
  164. sendday string,
  165. maxday string,
  166. minday string,
  167. notes string,
  168. stamp string,status string,
  169. customid string,
  170. whl_vendorno string,
  171. whlgroup string,
  172. rate string,
  173. creditamt string,
  174. creditday string,
  175. peoples string,
  176. area string,
  177. abc string,
  178. address string,
  179. tel string,
  180. principal string,
  181. identity_card string,
  182. mobil string,
  183. corporation string,
  184. saler string,
  185. createtime string,
  186. bank string,
  187. bankno string,
  188. bak1 string,
  189. bak2 string,
  190. a_bak1 string,
  191. aa_bak1 string,
  192. b_bak1 string,
  193. bb_bak1 string,
  194. y_bak1 string,
  195. t_bak1 string,
  196. ym_bak1 string,
  197. tm_bak1 string,
  198. supervise_code string,
  199. monthrent string,
  200. wms_warehid string,
  201. settlement_cycle string,
  202. apply_cycle string,
  203. applydate string,
  204. accounttype string,
  205. applydate_last string,
  206. paymode string,
  207. yaolian_flag string,
  208. org_longitude string,
  209. org_latitude string,
  210. org_province string,org_city string,
  211. org_area string,
  212. business_time string,
  213. yaolian_group string,
  214. pacard_storeid string,
  215. opening_time string,
  216. ret_ent_id string,
  217. ent_id string
  218. )
  219. row format delimited fields terminated by "\t"
  220. lines terminated by "\n"
  221. stored as orc
  222. location "/zhiyun/shihaihong/ods/erp_c_org_busi_full";
  223. '
  224. # echo "加载数据"
  225. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  226. # load data inpath
  227. "/zhiyun/shihaihong/tmp/erp_c_org_busi_full/*" overwrite into
  228. table ods_shihaihong.erp_c_org_busi_full
  229. partition(createtime='$day');
  230. # "
  231. echo "验证数据"
  232. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  233. select count(1) from ods_shihaihong.c_org_busi_full;
  234. "
  235. echo "抽取完成"
复制代码
   erp.c_code_value:   
  1. #!/bin/bash
  2. echo "生成全量配置文件"
  3. mkdir -p /zhiyun/shihaihong/jobs
  4. echo '{
  5. "job": {"content": [
  6. {
  7. "reader": {
  8. "name": "mysqlreader",
  9. "parameter": {
  10. "column": ["*"],
  11. "connection": [
  12. {
  13. "jdbcUrl": [
  14. "jdbc:mysql://zhiyun.pub:233
  15. 06/erp?useSSL=false"
  16. ],
  17. "table": [
  18. "c_code_value"
  19. ]
  20. }
  21. ],
  22. "password": "zhiyun",
  23. "username": "zhiyun"
  24. }
  25. },
  26. "writer": {
  27. "name": "hdfswriter",
  28. "parameter": {
  29. "column": [
  30. {"name":
  31. "id","type":"int"},
  32. {"name":"cat_name","type":"string"}
  33. ,
  34. {"name":"cat_code","type":"string"}
  35. ,
  36. {"name":"val_name","type":"string"}
  37. ,
  38. {"name":"var_desc","type":"string"}
  39. ],
  40. "defaultFS": "hdfs://cdh02:8020",
  41. "fieldDelimiter": "\t",
  42. "fileName":
  43. "erp_c_code_value_full.data",
  44. "fileType": "orc",
  45. "path":
  46. "/zhiyun/shihaihong/ods/erp_c_code_value_full",
  47. "writeMode": "truncate"}
  48. }
  49. }
  50. ],
  51. "setting": {
  52. "speed": {
  53. "channel": "3"
  54. }
  55. }
  56. }
  57. }' > /zhiyun/shihaihong/jobs/erp_c_code_value_full.json
  58. echo "开始抽取"
  59. hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_code_value_full
  60. python /opt/datax/bin/datax.py
  61. /zhiyun/shihaihong/jobs/erp_c_code_value_full.json
  62. echo "hive 建表"
  63. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  64. create database if not exists ods_shihaihong location
  65. "/zhiyun/shihaihong/ods";
  66. -- 增量表
  67. create external table if not exists
  68. ods_shihaihong.c_code_value_full(
  69. id int,
  70. cat_name string,
  71. cat_code string,
  72. val_name string,
  73. var_desc string
  74. )
  75. row format delimited fields terminated by "\t"
  76. lines terminated by "\n"
  77. stored as orc
  78. location "/zhiyun/shihaihong/ods/erp_c_code_value_full";
  79. '
  80. # echo "加载数据"
  81. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  82. # load data inpath
  83. "/zhiyun/shihaihong/tmp/erp_c_code_value_full/*" overwrite
  84. into table ods_shihaihong.erp_c_code_value_full
  85. partition(createtime='$day');
  86. # "echo "验证数据"
  87. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  88. select count(1) from ods_shihaihong.c_code_value_full;
  89. "
  90. echo "抽取完成"
复制代码
   任务调理:     erp.u_memcard_reg:   
   
    erp.c_memcard_class_group:     
   
    erp.u_memcard_reg_c:   
   
    his.chronic_patient_info_new:   
   
    erp.c_org_busi :   
   
  erp.c_code_value :     将码值表文件上传到   data   中,用   python   写一个数据洗濯的脚本:   
  1. #!/bin/python3
  2. import os
  3. import pandas as pd
  4. from openpyxl import load_workbook
  5. ss=''
  6. lst=[]#使用 pandas 的 read_excel 函数读取指定路径的 Excel 文件。
  7. sheet_name=None 表示读取文件中的所有工作表,而 header=2 表示数据的表
  8. 头位于第 3 行(索引从 0 开始)
  9. dfs = pd.read_excel('/zhiyun/shihaihong/data/12.码值
  10. 表.xlsx',sheet_name=None,header=2)
  11. dir=list(dfs.keys())
  12. #获取 xlsx 文件数据
  13. for i in range(len(dir)):
  14. if i>1:
  15. #获取 A2 行数据
  16. wb = load_workbook(filename='/zhiyun/shihaihong/data/12.
  17. 码值表.xlsx')
  18. str_head = wb[dir[i]]['A2'].value
  19. data=dfs[dir[i]]
  20. #获取其它行数据
  21. lst1=[]
  22. for i in data.columns:
  23. for j in range(len(data)):
  24. if data[i][j] != 'NaN':
  25. lst1.append(str(data[i][j]))
  26. n=int(len(lst1)/2)
  27. for i in range(n):
  28. ss=f"{str_head.split('-')[0]}|{str_head.split('-')[
  29. 1]}|{lst1[i]}|{lst1[i+n]}"
  30. lst.append(ss)
  31. print("写入数据到 data")
  32. template_path = "/zhiyun/shihaihong/data/code_value.txt"
  33. with open(template_path,"w",encoding="utf-8") as f:
  34. content="\n".join(lst)
  35. f.write(content)
  36. f.close
  37. print("上传 data 文件 到 hdfs")
  38. os.system(f"hdfs dfs -mkdir -p /zhiyun/shihaihong/filetxt/")
  39. os.system(f"hdfs dfs -put {template_path}
  40. /zhiyun/shihaihong/filetxt/")
复制代码
  1. #!/bin/bash
  2. # 作用: 完成从编写配置文件到验证数据的整个过程
  3. # 需要在任何节点都可以执行
  4. # 创建本人文件夹
  5. mkdir -p /zhiyun/shihaihong/data /zhiyun/shihaihong/jobs
  6. /zhiyun/shihaihong/python /zhiyun/shihaihong/shell
  7. /zhiyun/shihaihong/sql
  8. echo "hive 建表"
  9. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e'
  10. create database if not exists ods_shihaihong location
  11. "/zhiyun/shihaihong/ods";
  12. create external table if not exists
  13. ods_shihaihong.c_code_value_full(
  14. cat_name string,
  15. cat_code string,
  16. val_name string,
  17. var_desc string
  18. )
  19. row format delimited fields terminated by "|"
  20. lines terminated by "\n"
  21. stored as textfile
  22. location "/zhiyun/shihaihong/filetxt";'
  23. echo "hive 建表完成"
  24. echo "验证数据"
  25. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  26. select count(1) from ods_shihaihong.c_code_value_full;
  27. "
  28. echo "验证完成"
复制代码
执行后,用 shell 脚本抽取
 
    任务调理:   
   
      作业2

     完成 2 张增量表的处理 历史数据调理任务 + 增量调理任务 4 个调理任务        所有表最后都要跟源表的总数举行对比 需要一致        抽取的增量表:        erp.u_sale_m 先 做 全 量 ( 一 次 性 ) 再 做 增 量 ( 每 天 执 行 ) =>        ods_lijinquan.erp_u_sale_m_inc        erp.u_sale_pay 同上 增量 => ods_lijinquan.erp_u_sale_pay_inc        首次抽取为全量抽取:        erp_u_sale_m_full.sh:      
  1. #!/bin/bash
  2. echo "生成全量配置文件"
  3. mkdir -p /zhiyun/shihaihong/jobs
  4. echo '{
  5. "job": {
  6. "content": [
  7. {
  8. "reader": {
  9. "name": "mysqlreader",
  10. "parameter": {
  11. "column": ["*"],
  12. "connection": [
  13. {
  14. "jdbcUrl": [
  15. "jdbc:mysql://zhiyun.pub:233
  16. 06/erp?useSSL=false"
  17. ],
  18. "table": [
  19. "u_sale_m"]
  20. }
  21. ],
  22. "password": "zhiyun",
  23. "username": "zhiyun"
  24. }
  25. },
  26. "writer": {
  27. "name": "hdfswriter",
  28. "parameter": {
  29. "column": [
  30. {"name":"id","type":"int"},
  31. {"name":"saleno","type":"string"},
  32. {"name":"busno","type":"string"},
  33. {"name":"posno","type":"string"},
  34. {"name":"extno","type":"string"},
  35. {"name":"extsource","type":"string"}
  36. ,
  37. {"name":"o2o_trade_from","type":"st
  38. ring"},
  39. {"name":"channel","type":"int"},
  40. {"name":"starttime","type":"string"}
  41. ,
  42. {"name":"finaltime","type":"string"}
  43. ,
  44. {"name":"payee","type":"string"},
  45. {"name":"discounter","type":"string
  46. "},
  47. {"name":"crediter","type":"string"}
  48. ,
  49. {"name":"returner","type":"string"}
  50. ,
  51. {"name":"warranter1","type":"string
  52. "},
  53. {"name":"warranter2","type":"string
  54. "},
  55. {"name":"stdsum","type":"string"},
  56. {"name":"netsum","type":"string"},
  57. {"name":"loss","type":"string"},
  58. {"name":"discount","type":"float"},
  59. {"name":"member","type":"string"},
  60. {"name":"precash","type":"string"},
  61. {"name":"stamp","type":"string"},{"name":"shiftid","type":"string"},
  62. {"name":"shiftdate","type":"string"}
  63. ,
  64. {"name":"yb_saleno","type":"string"
  65. }
  66. ],
  67. "defaultFS": "hdfs://cdh02:8020",
  68. "fieldDelimiter": "\t",
  69. "fileName": "erp_u_sale_m_full.data",
  70. "fileType": "orc",
  71. "path":
  72. "/zhiyun/shihaihong/ods/erp_u_sale_m_full",
  73. "writeMode": "truncate"
  74. }
  75. }
  76. }
  77. ],
  78. "setting": {
  79. "speed": {
  80. "channel": "3"
  81. }
  82. }
  83. }
  84. }' > /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
  85. echo "开始抽取"
  86. hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_m_full
  87. python /opt/datax/bin/datax.py
  88. /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
  89. echo "hive 建表"
  90. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  91. create database if not exists ods_shihaihong location
  92. "/zhiyun/shihaihong/ods";
  93. -- 增量表
  94. create external table if not exists ods_shihaihong.u_sale_m_full(
  95. id int,
  96. saleno string,
  97. busno string,
  98. posno string,
  99. extno string,
  100. extsource string,
  101. o2o_trade_from string,channel int,
  102. starttime string,
  103. finaltime string,
  104. payee string,
  105. discounter string,
  106. crediter string,
  107. returner string,
  108. warranter1 string,
  109. warranter2 string,
  110. stdsum string,
  111. netsum string,
  112. loss string,
  113. discount float,
  114. member string,
  115. precash string,
  116. stamp string,
  117. shiftid string,
  118. shiftdate string,
  119. yb_saleno string
  120. )
  121. row format delimited fields terminated by "\t"
  122. lines terminated by "\n"
  123. stored as orc
  124. location "/zhiyun/shihaihong/ods/erp_u_sale_m_full";
  125. '
  126. # echo "加载数据"
  127. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  128. # load data inpath "/zhiyun/shihaihong/tmp/erp_u_sale_m_full/*"
  129. overwrite into table ods_shihaihong.erp_u_sale_m_full
  130. partition(createtime='$day');
  131. # "
  132. echo "验证数据"
  133. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  134. select count(1) from ods_shihaihong.u_sale_m_full;
  135. "
  136. echo "抽取完成"
复制代码
  写入任务调理平台:
   

       将 sh 文件内容复制入 GLUE IDE 中,执行一次:        
       后续用增量抽取:      
  1. #!/bin/bash
  2. day=$(date -d "yesterday" +%Y-%m-%d)
  3. if [ $1 != "" ]; thenday=$(date -d "$1 -1 day" +%Y-%m-%d);
  4. fi;
  5. echo "抽取的日期为 $day"
  6. echo "生成增量配置文件"
  7. mkdir -p /zhiyun/shihaihong/jobs
  8. echo '
  9. {
  10. "job": {
  11. "content": [
  12. {
  13. "reader": {
  14. "name": "mysqlreader",
  15. "parameter": {
  16. "connection": [
  17. {
  18. "jdbcUrl": [
  19. "jdbc:mysql://zhiyun.pub:233
  20. 06/erp?useSSL=false"
  21. ],
  22. "querySql": [
  23. "select * from u_sale_m where
  24. stamp between '\'"$day" 00:00:00\'' and '\'"$day" 23:59:59\'' and
  25. id>0"
  26. ]
  27. }
  28. ],
  29. "password": "zhiyun",
  30. "username": "zhiyun"
  31. }
  32. },
  33. "writer": {
  34. "name": "hdfswriter",
  35. "parameter": {
  36. "column": [
  37. {"name":"id","type":"int"},
  38. {"name":"saleno","type":"string"},
  39. {"name":"busno","type":"string"},
  40. {"name":"posno","type":"string"},
  41. {"name":"extno","type":"string"},{"name":"extsource","type":"string"}
  42. ,
  43. {"name":"o2o_trade_from","type":"str
  44. ing"},
  45. {"name":"channel","type":"int"},
  46. {"name":"starttime","type":"string"}
  47. ,
  48. {"name":"finaltime","type":"string"}
  49. ,
  50. {"name":"payee","type":"string"},
  51. {"name":"discounter","type":"string"}
  52. ,
  53. {"name":"crediter","type":"string"},
  54. {"name":"returner","type":"string"},
  55. {"name":"warranter1","type":"string"}
  56. ,
  57. {"name":"warranter2","type":"string"}
  58. ,
  59. {"name":"stdsum","type":"string"},
  60. {"name":"netsum","type":"string"},
  61. {"name":"loss","type":"string"},
  62. {"name":"discount","type":"float"},
  63. {"name":"member","type":"string"},
  64. {"name":"precash","type":"string"},
  65. {"name":"stamp","type":"string"},
  66. {"name":"shiftid","type":"string"},
  67. {"name":"shiftdate","type":"string"}
  68. ,
  69. {"name":"yb_saleno","type":"string"}
  70. ],
  71. "defaultFS": "hdfs://cdh02:8020",
  72. "fieldDelimiter": "\t",
  73. "fileName": "erp_u_sale_m_inc.data",
  74. "fileType": "orc",
  75. "path":
  76. "/zhiyun/shihaihong/tmp/erp_u_sale_m_inc",
  77. "writeMode": "truncate"
  78. }
  79. }
  80. }
  81. ],
  82. "setting": {
  83. "speed": {"channel": 2
  84. }
  85. }
  86. }
  87. }' > /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
  88. echo "开始抽取"
  89. hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_m_inc
  90. python /opt/datax/bin/datax.py
  91. /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
  92. echo "hive 建表"
  93. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  94. create database if not exists ods_shihaihong location
  95. "/zhiyun/shihaihong/ods";
  96. create external table if not exists
  97. ods_shihaihong.erp_u_sale_m_inc(
  98. id int,
  99. saleno string,
  100. busno string,
  101. posno string,
  102. extno string,
  103. extsource string,
  104. o2o_trade_from string,
  105. channel int,
  106. starttime string,
  107. finaltime string,
  108. payee string,
  109. discounter string,
  110. crediter string,
  111. returner string,
  112. warranter1 string,
  113. warranter2 string,
  114. stdsum string,
  115. netsum string,
  116. loss string,
  117. discount float,
  118. member string,
  119. precash string,
  120. shiftid string,
  121. shiftdate string,
  122. yb_saleno string) partitioned by (stamp string)
  123. row format delimited fields terminated by "\t"
  124. lines terminated by "\n"
  125. stored as orc
  126. location "/zhiyun/shihaihong/ods/erp_u_sale_m_inc";
  127. '
  128. echo "加载数据"
  129. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  130. load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_m_inc/*'
  131. overwrite into table ods_shihaihong.erp_u_sale_m_inc
  132. partition(stamp='"$day"');
  133. "
  134. echo "验证数据"
  135. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  136. show partitions ods_shihaihong.erp_u_sale_m_inc;
  137. select count(1) from ods_shihaihong.erp_u_sale_m_inc where stamp
  138. = '"$day"';
  139. select * from ods_shihaihong.erp_u_sale_m_inc where stamp =
  140. '"$day"' limit 5;
  141. "
  142. echo "抽取完成"
复制代码
      任务调理:      
       执行一次,输入参数:        
      
       erp_u_sale_pay_inc   :        全量抽取:      
  1. #!/bin/bash
  2. echo "生成全量配置文件"
  3. mkdir -p /zhiyun/shihaihong/jobsecho '{
  4. "job": {
  5. "content": [
  6. {
  7. "reader": {
  8. "name": "mysqlreader",
  9. "parameter": {
  10. "column": ["*"],
  11. "connection": [
  12. {
  13. "jdbcUrl": [
  14. "jdbc:mysql://zhiyun.pub:233
  15. 06/erp?useSSL=false"
  16. ],
  17. "table": [
  18. "u_sale_pay"
  19. ]
  20. }
  21. ],
  22. "password": "zhiyun",
  23. "username": "zhiyun"
  24. }
  25. },
  26. "writer": {
  27. "name": "hdfswriter",
  28. "parameter": {
  29. "column": [
  30. {"name":
  31. "id","type":"int"},
  32. {"name":"saleno","type":"string"},
  33. {"name":"cardno","type":"string"},
  34. {"name":"netsum","type":"string"},
  35. {"name":"paytype","type":"string"},
  36. {"name":"bak1","type":"string"}
  37. ],
  38. "defaultFS": "hdfs://cdh02:8020",
  39. "fieldDelimiter": "\t",
  40. "fileName": "erp_u_sale_pay_full.data",
  41. "fileType": "orc",
  42. "path":
  43. "/zhiyun/shihaihong/ods/erp_u_sale_pay_full",
  44. "writeMode": "truncate"}
  45. }
  46. }
  47. ],
  48. "setting": {
  49. "speed": {
  50. "channel": "3"
  51. }
  52. }
  53. }
  54. }' > /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
  55. echo "开始抽取"
  56. hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_pay_full
  57. python /opt/datax/bin/datax.py
  58. /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
  59. echo "hive 建表"
  60. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
  61. create database if not exists ods_shihaihong location
  62. "/zhiyun/shihaihong/ods";
  63. -- 增量表
  64. create external table if not exists
  65. ods_shihaihong.u_sale_pay_full(
  66. id int,
  67. saleno string,
  68. cardno string,
  69. netsum string,
  70. paytype string,
  71. bak1 string
  72. )
  73. row format delimited fields terminated by "\t"
  74. lines terminated by "\n"
  75. stored as orc
  76. location "/zhiyun/shihaihong/ods/erp_u_sale_pay_full";
  77. '
  78. # echo "加载数据"
  79. # beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  80. # load data inpath
  81. "/zhiyun/shihaihong/tmp/erp_u_sale_pay_full/*" overwrite into
  82. table ods_shihaihong.erp_u_sale_pay_full
  83. partition(createtime='$day');# "
  84. echo "验证数据"
  85. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  86. select count(1) from ods_shihaihong.u_sale_pay_full;
  87. "
  88. echo "抽取完成"
复制代码
      任务调理:        
       编辑    GLUE IDE,   执行一次:      
       增量抽取:      
  1. #!/bin/bash
  2. day=$(date -d "yesterday" +%Y-%m-%d)
  3. if [ $1 != "" ]; then
  4. day=$(date -d "$1 -1 day" +%Y-%m-%d);
  5. fi;
  6. echo "抽取的日期为 $day"
  7. echo "生成增量配置文件"
  8. mkdir -p /zhiyun/shihaihong/jobs
  9. echo '
  10. {
  11. "job": {
  12. "content": [
  13. {
  14. "reader": {
  15. "name": "mysqlreader",
  16. "parameter": {
  17. "column": ["*"],
  18. "connection": [
  19. {
  20. "jdbcUrl": [
  21. "jdbc:mysql://zhiyun.pub:233
  22. 06/erp?useSSL=false"
  23. ],
  24. "table": [
  25. "select u_sale_pay.*,stamp
  26. from u_sale_pay left join u_sale_m on
  27. u_sale_pay.saleno=u_sale_m.saleno where stamp between '\'"$day"
  28. 00:00:00\'' and '\'"$day" 23:59:59\'' and id>0 "
  29. ]
  30. }
  31. ],
  32. "password": "zhiyun",
  33. "username": "zhiyun"
  34. }},
  35. "writer": {
  36. "name": "hdfswriter",
  37. "parameter": {
  38. "column": [
  39. {"name":
  40. "id","type":"int"},
  41. {"name":"saleno","type":"string"},
  42. {"name":"cardno","type":"string"},
  43. {"name":"netsum","type":"string"},
  44. {"name":"paytype","type":"string"},
  45. {"name":"bak1","type":"string"},
  46. {"name":"stamp","type":"string"}
  47. ],
  48. "defaultFS": "hdfs://cdh02:8020",
  49. "fieldDelimiter": "\t",
  50. "fileName": "erp_u_sale_pay_inc.data",
  51. "fileType": "orc",
  52. "path":
  53. "/zhiyun/shihaihong/tmp/erp_u_sale_pay_inc",
  54. "writeMode": "truncate"
  55. }
  56. }
  57. }
  58. ],
  59. "setting": {
  60. "speed": {
  61. "channel": "3"
  62. }
  63. }
  64. }
  65. }
  66. ' > /zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
  67. echo "开始抽取"
  68. hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_play_inc
  69. python /opt/datax/bin/datax.py
  70. /zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
  71. echo "hive 建表"
  72. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
  73. "/zhiyun/shihaihong/ods";
  74. create external table if not exists
  75. ods_shihaihong.erp_u_sale_play_inc(
  76. id int,
  77. saleno string,
  78. cardno string,
  79. netsum string,
  80. paytype string,
  81. bak1 string
  82. ) partitioned by (stamp string)
  83. row format delimited fields terminated by "\t"
  84. lines terminated by "\n"
  85. stored as orc
  86. location "/zhiyun/shihaihong/ods/erp_u_sale_play_inc";
  87. '
  88. echo "加载数据"
  89. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  90. load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_play_inc/*'
  91. overwrite into table ods_shihaihong.erp_u_sale_play_inc
  92. partition(stamp='"$day"');
  93. "
  94. echo "验证数据"
  95. beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
  96. show partitions ods_shihaihong.erp_u_sale_play_inc;
  97. select count(1) from ods_shihaihong.erp_u_sale_play_inc where
  98. stamp = '"$day"';
  99. select * from ods_shihaihong.erp_u_sale_play_inc where stamp =
  100. '"$day"' limit 5;
  101. "
  102. echo "抽取完成"
复制代码
      任务调理:      
       执行一次:      
      

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南七星之家

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表