DataX二次开发——新增HiveReader插件

打印 上一主题 下一主题

主题 905|帖子 905|积分 2715

一、研发背景

    DataX官方开源的版本支持HDFS文件的读写,并没有支持基于JDBC的Hive数据读写,很多时候一些数据同步不太方便,比如在读取Hive之前先执行一些sql、读取一些Hive的视图数据、或者在数据同步时执行一段固定的SQL,将SQL执行结果写入下游等各种场景,实际上还是需要Hive插件来支持。而在实际工作中,我们也遇到了类似的一些情况需要二次开发DataX以支持此类场景。本插件已在生产环境稳定运行一年有余,现分享给大家,如有问题也可联系我(qq:1821088755)。
二、HiveReader插件介绍

    hivereader插件比较简单,共有三个类,两个配置文件。其中:

  • HiveReader:实现DataX框架核心方法,是具体逻辑。
  • HiveReaderErrorCode:继承了DataX框架的ErrorCode类,是用于统一异常处理DataXException类中调用,具体是新增了一个枚举值。
  • HiveConnByKerberos:是在检测到Hive具备Kerberos认证要求时,进行认证的工具类。
  • plugin.json:DataX插件固定的配置文件,用于指定插件的入口类。
  • plugin_job_template.json:二次开发插件,一般需要提供一下具体的使用方式,此json文件即为HiveReader插件的配置方式说明。
 
   2.1 HiveReader类

    首先是HiveReader类,需要注意的是一些常量或枚举值,需要自行添加,其中DataBaseType枚举类中,需要新增Hive枚举项并添加Hive的驱动类全路径,具体见注释,另外就是Kerberos认证相关的几个配置,一个是keytab的路径,一个是krb5.conf的路径,另外一个是principle的值。
  1. package com.alibaba.datax.plugin.reader.hivereader;
  2. import com.alibaba.datax.common.base.Key;
  3. import com.alibaba.datax.common.plugin.RecordSender;
  4. import com.alibaba.datax.common.spi.Reader;
  5. import com.alibaba.datax.common.util.Configuration;
  6. import com.alibaba.datax.rdbms.reader.CommonRdbmsReader;
  7. import com.alibaba.datax.rdbms.util.DataBaseType;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.hadoop.security.authentication.util.KerberosName;
  10. import java.lang.reflect.Field;
  11. import java.util.List;
  12. import static com.alibaba.datax.common.base.Constant.DEFAULT_FETCH_SIZE;//2048,可根据条件自己取值
  13. import static com.alibaba.datax.common.base.Key.FETCH_SIZE; // 参数名:"fetchSize"
  14. @Slf4j
  15. public class HiveReader
  16.         extends Reader
  17. {
  18.     //此处需现在com.sinosig.plumber.rdbms.util.DataBaseType枚举类中添加Hive类型,内容为:Hive("hive2", "org.apache.hive.jdbc.HiveDriver"),
  19.     private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive;
  20.     public static class Job
  21.             extends Reader.Job
  22.     {
  23.         private Configuration originalConfig = null;
  24.         private CommonRdbmsReader.Job commonRdbmsReaderJob;
  25.         @Override
  26.         public void init()
  27.         {
  28.             this.originalConfig = getPluginJobConf();
  29.             Boolean haveKerberos = this.originalConfig.getBool(Key.HAVE_KERBEROS, false);
  30.             if (haveKerberos) {
  31.                 log.info("检测到kerberos认证,正在进行认证");
  32.                 org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
  33.                 String kerberosKeytabFilePath =  this.originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
  34.                 String kerberosPrincipal =  this.originalConfig.getString(Key.KERBEROS_PRINCIPAL);
  35.                 String krb5Path =  this.originalConfig.getString(Key.KRB5_CONF_FILE_PATH);
  36.                 hadoopConf.set("hadoop.security.authentication", "kerberos");
  37.                 hadoopConf.set("hive.security.authentication", "kerberos");
  38.                 hadoopConf.set("hadoop.security.authorization", "true");
  39.                 System.setProperty("java.security.krb5.conf",krb5Path);
  40.                 refreshConfig();
  41.                 HiveConnByKerberos.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf,krb5Path);
  42.             }
  43.             this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
  44.             this.originalConfig = commonRdbmsReaderJob.init(originalConfig);
  45.         }
  46.         @Override
  47.         public void preCheck()
  48.         {
  49.             this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE);
  50.         }
  51.         @Override
  52.         public List<Configuration> split(int adviceNumber)
  53.         {
  54.             return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber);
  55.         }
  56.         @Override
  57.         public void post()
  58.         {
  59.             this.commonRdbmsReaderJob.post(originalConfig);
  60.         }
  61.         @Override
  62.         public void destroy()
  63.         {
  64.             this.commonRdbmsReaderJob.destroy(originalConfig);
  65.         }
  66.     }
  67.     public static class Task
  68.             extends Reader.Task
  69.     {
  70.         private Configuration readerSliceConfig;
  71.         private CommonRdbmsReader.Task commonRdbmsReaderTask;
  72.         @Override
  73.         public void init()
  74.         {
  75.             this.readerSliceConfig = getPluginJobConf();
  76.             this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId());
  77.             this.commonRdbmsReaderTask.init(this.readerSliceConfig);
  78.         }
  79.         @Override
  80.         public void startRead(RecordSender recordSender)
  81.         {
  82.             int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE);
  83.             this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);
  84.         }
  85.         @Override
  86.         public void post()
  87.         {
  88.             this.commonRdbmsReaderTask.post(readerSliceConfig);
  89.         }
  90.         @Override
  91.         public void destroy()
  92.         {
  93.             this.commonRdbmsReaderTask.destroy(readerSliceConfig);
  94.         }
  95.     }
  96.     /** 刷新krb内容信息 */
  97.     public static void refreshConfig() {
  98.         try {
  99.             sun.security.krb5.Config.refresh();
  100.             Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");
  101.             defaultRealmField.setAccessible(true);
  102.             defaultRealmField.set(
  103.                     null,
  104.                     org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());
  105.             // reload java.security.auth.login.config
  106.             javax.security.auth.login.Configuration.setConfiguration(null);
  107.         } catch (Exception e) {
  108.             log.warn(
  109.                     "resetting default realm failed, current default realm will still be used.", e);
  110.         }
  111.     }
  112. }
复制代码
 2.2 HiveConnByKerberos类

    HiveConnByKerberos类比较简单,是一个通用的Kerberos认证的接口。
  1. package com.alibaba.datax.plugin.reader.hivereader;
  2. import com.alibaba.datax.common.exception.PlumberException;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.apache.hadoop.security.UserGroupInformation;
  6. @Slf4j
  7. public class HiveConnByKerberos {
  8.     public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf) {
  9.         System.setProperty("java.security.krb5.conf",krb5conf);
  10.         if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
  11.             UserGroupInformation.setConfiguration(hadoopConf);
  12.             try {
  13.                 UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
  14.             }
  15.             catch (Exception e) {
  16.                 log.error("kerberos认证失败");
  17.                 String message = String.format("kerberos认证失败,请检查 " +
  18.                                 "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",
  19.                         kerberosKeytabFilePath, kerberosPrincipal);
  20.                 e.printStackTrace();
  21.                 throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
  22.             }
  23.         }
  24.     }
  25. }
复制代码
 
2.3 HiveReaderErrorCode类

    HiveReaderErrorCode类,主要就是集成ErrorCode类,并添加一个枚举项,这块可直接在ErrorCode类添加,也可使用此类,为固定写法。
  1. package com.alibaba.datax.plugin.reader.hivereader;
  2. import com.alibaba.datax.common.spi.ErrorCode;
  3. public enum HiveReaderErrorCode
  4.         implements ErrorCode
  5. {
  6.     KERBEROS_LOGIN_ERROR("HiveReader-13", "KERBEROS认证失败");
  7.     private final String code;
  8.     private final String description;
  9.     HiveReaderErrorCode(String code, String description)
  10.     {
  11.         this.code = code;
  12.         this.description = description;
  13.     }
  14.     @Override
  15.     public String getCode()
  16.     {
  17.         return this.code;
  18.     }
  19.     @Override
  20.     public String getDescription()
  21.     {
  22.         return this.description;
  23.     }
  24.     @Override
  25.     public String toString()
  26.     {
  27.         return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
  28.     }
  29. }
复制代码
2.4 plugin.json文件
  1. {
  2.   "name": "hivereader",
  3.   "class": "com.alibaba.datax.plugin.reader.hivereader.HiveReader",
  4.   "description": "Retrieve data from Hive via jdbc",
  5.   "developer": "wxm"
  6. }
复制代码
2.5 plugin_job_template.json文件

    这块需要注意的一个问题是,如果Kerberos认证的Hive连接URL有两种方式,如果是基于zookeeper的方式,则需保证运行DataX服务的节点与zookeeper节点网络是打通的,并且一定不要忘记写上具体的Hive库名。
  1. {
  2.   "name": "hivereader",
  3.   "parameter": {
  4.     "column": [
  5.       "*"
  6.     ],
  7.     "username": "hive",
  8.     "password": "",<br>    "preSql":"show databases;",
  9.     "connection": [
  10.       {
  11.         "jdbcUrl": [
  12.           "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM"
  13.         ],
  14.         "table": [
  15.           "hive_reader"
  16.         ]
  17.       }
  18.     ],
  19.     "where": "logdate='20211013'" ,
  20.     "haveKerberos": true,
  21.     "kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab",
  22.     "kerberosPrincipal": "hive@EXAMPLE.COM"
  23.   }
  24. }
复制代码
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

北冰洋以北

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表