ToB企服应用市场:ToB评测及商务社交产业平台

标题: DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写 [打印本页]

作者: 东湖之滨    时间: 2023-2-13 22:08
标题: DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写
一、研发背景

    DataX官方开源的版本支持HDFS文件的读写,但是截止目前,并没有支持Parquet文件的读写,得益于DataX出色的数据同步性能,去年公司的项目大部分采用了DataX作为数据同步工具,但是从CDH集群同步Parquet或者将其他数据源的数据以Parquet格式写入HDFS,这两个常用场景没有进行支持。因此只能自己动手,补充HdfsReader和HdfsWriter插件,以支持Parquet文件的读写。
二、HdfsReader插件

    本插件比较简单,一共五个类,具体类名及对应修改项如下:
    按需修改其中四个类即可,具体代码如下:
    DFSUtil
  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.google.common.primitives.Ints;
  4. import com.google.common.primitives.Longs;
  5. import com.alibaba.datax.common.base.Key;
  6. import com.alibaba.datax.common.element.BoolColumn;
  7. import com.alibaba.datax.common.element.BytesColumn;
  8. import com.alibaba.datax.common.element.Column;
  9. import com.alibaba.datax.common.element.ColumnEntry;
  10. import com.alibaba.datax.common.element.DateColumn;
  11. import com.alibaba.datax.common.element.DoubleColumn;
  12. import com.alibaba.datax.common.element.LongColumn;
  13. import com.alibaba.datax.common.element.Record;
  14. import com.alibaba.datax.common.element.StringColumn;
  15. import com.alibaba.datax.common.exception.DataXException;
  16. import com.alibaba.datax.common.plugin.RecordSender;
  17. import com.alibaba.datax.common.plugin.TaskPluginCollector;
  18. import com.alibaba.datax.common.util.Configuration;
  19. import com.alibaba.datax.storage.reader.StorageReaderErrorCode;
  20. import com.alibaba.datax.storage.reader.StorageReaderUtil;
  21. import org.apache.avro.Conversions;
  22. import org.apache.avro.Schema;
  23. import org.apache.avro.generic.GenericData;
  24. import org.apache.commons.lang3.StringUtils;
  25. import org.apache.hadoop.fs.FSDataInputStream;
  26. import org.apache.hadoop.fs.FileStatus;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
  30. import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
  31. import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
  32. import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
  33. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
  34. import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
  35. import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
  36. import org.apache.hadoop.hive.ql.io.RCFile;
  37. import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
  38. import org.apache.hadoop.hive.ql.io.orc.OrcFile;
  39. import org.apache.hadoop.hive.ql.io.orc.Reader;
  40. import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
  41. import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
  42. import org.apache.hadoop.io.LongWritable;
  43. import org.apache.hadoop.io.SequenceFile;
  44. import org.apache.hadoop.io.Text;
  45. import org.apache.hadoop.io.Writable;
  46. import org.apache.hadoop.mapred.FileSplit;
  47. import org.apache.hadoop.mapred.JobConf;
  48. import org.apache.hadoop.security.UserGroupInformation;
  49. import org.apache.hadoop.util.ReflectionUtils;
  50. import org.apache.orc.TypeDescription;
  51. import org.apache.parquet.avro.AvroParquetReader;
  52. import org.apache.parquet.example.data.Group;
  53. import org.apache.parquet.hadoop.ParquetReader;
  54. import org.apache.parquet.hadoop.example.GroupReadSupport;
  55. import org.apache.parquet.hadoop.util.HadoopInputFile;
  56. import org.apache.parquet.io.api.Binary;
  57. import org.slf4j.Logger;
  58. import org.slf4j.LoggerFactory;
  59. import java.io.IOException;
  60. import java.io.InputStream;
  61. import java.math.BigDecimal;
  62. import java.math.RoundingMode;
  63. import java.nio.ByteBuffer;
  64. import java.text.SimpleDateFormat;
  65. import java.util.ArrayList;
  66. import java.util.Arrays;
  67. import java.util.Date;
  68. import java.util.HashSet;
  69. import java.util.List;
  70. import java.util.Set;
  71. import java.util.concurrent.TimeUnit;
  72. import static com.alibaba.datax.common.base.Key.COLUMN;
  73. import static com.alibaba.datax.common.base.Key.NULL_FORMAT;
  74. public class DFSUtil
  75. {
  76.     private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);
  77.     // the offset of julian, 2440588 is 1970/1/1
  78.     private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
  79.     private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
  80.     private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
  81.     private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
  82.     private final org.apache.hadoop.conf.Configuration hadoopConf;
  83.     private final boolean haveKerberos;
  84.     private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>();
  85.     private String specifiedFileType = null;
  86.     private String kerberosKeytabFilePath;
  87.     private String kerberosPrincipal;
  88.     public DFSUtil(Configuration taskConfig)
  89.     {
  90.         hadoopConf = new org.apache.hadoop.conf.Configuration();
  91.         //io.file.buffer.size 性能参数
  92.         //http://blog.csdn.net/yangjl38/article/details/7583374
  93.         Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
  94.         JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
  95.         if (null != hadoopSiteParams) {
  96.             Set<String> paramKeys = hadoopSiteParams.getKeys();
  97.             for (String each : paramKeys) {
  98.                 hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
  99.             }
  100.         }
  101.         hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));
  102.         //是否有Kerberos认证
  103.         this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
  104.         if (haveKerberos) {
  105.             this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
  106.             this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
  107.             this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
  108.         }
  109.         this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
  110.         LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf));
  111.     }
  112.     private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath)
  113.     {
  114.         if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
  115.             UserGroupInformation.setConfiguration(hadoopConf);
  116.             try {
  117.                 UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
  118.             }
  119.             catch (Exception e) {
  120.                 String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
  121.                         kerberosKeytabFilePath, kerberosPrincipal);
  122.                 throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
  123.             }
  124.         }
  125.     }
  126.     /**
  127.      * 获取指定路径列表下符合条件的所有文件的绝对路径
  128.      *
  129.      * @param srcPaths 路径列表
  130.      * @param specifiedFileType 指定文件类型
  131.      * @return set of string
  132.      */
  133.     public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType)
  134.     {
  135.         this.specifiedFileType = specifiedFileType;
  136.         if (!srcPaths.isEmpty()) {
  137.             for (String eachPath : srcPaths) {
  138.                 LOG.info("get HDFS all files in path = [{}]", eachPath);
  139.                 getHDFSAllFiles(eachPath);
  140.             }
  141.         }
  142.         return sourceHDFSAllFilesList;
  143.     }
  144.     private void addSourceFileIfNotEmpty(FileStatus f)
  145.     {
  146.         if (f.isFile()) {
  147.             String filePath = f.getPath().toString();
  148.             if (f.getLen() > 0) {
  149.                 addSourceFileByType(filePath);
  150.             }
  151.             else {
  152.                 LOG.warn("文件[{}]长度为0,将会跳过不作处理!", filePath);
  153.             }
  154.         }
  155.     }
  156.     public void getHDFSAllFiles(String hdfsPath)
  157.     {
  158.         try {
  159.             FileSystem hdfs = FileSystem.get(hadoopConf);
  160.             //判断hdfsPath是否包含正则符号
  161.             if (hdfsPath.contains("*") || hdfsPath.contains("?")) {
  162.                 Path path = new Path(hdfsPath);
  163.                 FileStatus[] stats = hdfs.globStatus(path);
  164.                 for (FileStatus f : stats) {
  165.                     if (f.isFile()) {
  166.                         addSourceFileIfNotEmpty(f);
  167.                     }
  168.                     else if (f.isDirectory()) {
  169.                         getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
  170.                     }
  171.                 }
  172.             }
  173.             else {
  174.                 getHDFSAllFilesNORegex(hdfsPath, hdfs);
  175.             }
  176.         }
  177.         catch (IOException e) {
  178.             String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," +
  179.                     "是否有读写权限,网络是否已断开!", hdfsPath);
  180.             LOG.error(message);
  181.             throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e);
  182.         }
  183.     }
  184.     private void getHDFSAllFilesNORegex(String path, FileSystem hdfs)
  185.             throws IOException
  186.     {
  187.         // 获取要读取的文件的根目录
  188.         Path listFiles = new Path(path);
  189.         // If the network disconnected, this method will retry 45 times
  190.         // each time the retry interval for 20 seconds
  191.         // 获取要读取的文件的根目录的所有二级子文件目录
  192.         FileStatus[] stats = hdfs.listStatus(listFiles);
  193.         for (FileStatus f : stats) {
  194.             // 判断是不是目录,如果是目录,递归调用
  195.             if (f.isDirectory()) {
  196.                 LOG.info("[{}] 是目录, 递归获取该目录下的文件", f.getPath());
  197.                 getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
  198.             }
  199.             else if (f.isFile()) {
  200.                 addSourceFileIfNotEmpty(f);
  201.             }
  202.             else {
  203.                 String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。", f.getPath());
  204.                 LOG.info(message);
  205.             }
  206.         }
  207.     }
  208.     // 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList
  209.     private void addSourceFileByType(String filePath)
  210.     {
  211.         // 检查file的类型和用户配置的fileType类型是否一致
  212.         boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
  213.         if (isMatchedFileType) {
  214.             String msg = String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType);
  215.             LOG.info(msg);
  216.             sourceHDFSAllFilesList.add(filePath);
  217.         }
  218.         else {
  219.             String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +
  220.                             "请确认您配置的目录下面所有文件的类型均为[%s]"
  221.                     , filePath, this.specifiedFileType);
  222.             LOG.error(message);
  223.             throw DataXException.asDataXException(
  224.                     HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
  225.         }
  226.     }
  227.     public InputStream getInputStream(String filepath)
  228.     {
  229.         InputStream inputStream;
  230.         Path path = new Path(filepath);
  231.         try {
  232.             FileSystem fs = FileSystem.get(hadoopConf);
  233.             //If the network disconnected, this method will retry 45 times
  234.             //each time the retry interval for 20 seconds
  235.             inputStream = fs.open(path);
  236.             return inputStream;
  237.         }
  238.         catch (IOException e) {
  239.             String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath);
  240.             throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
  241.         }
  242.     }
  243.     public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,
  244.             RecordSender recordSender, TaskPluginCollector taskPluginCollector)
  245.     {
  246.         LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath);
  247.         Path seqFilePath = new Path(sourceSequenceFilePath);
  248.         try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf,
  249.                 SequenceFile.Reader.file(seqFilePath))) {
  250.             //获取SequenceFile.Reader实例
  251.             //获取key 与 value
  252.             Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
  253.             Text value = new Text();
  254.             while (reader.next(key, value)) {
  255.                 if (StringUtils.isNotBlank(value.toString())) {
  256.                     StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString());
  257.                 }
  258.             }
  259.         }
  260.         catch (Exception e) {
  261.             String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath);
  262.             LOG.error(message);
  263.             throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e);
  264.         }
  265.     }
  266.     public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
  267.             RecordSender recordSender, TaskPluginCollector taskPluginCollector)
  268.     {
  269.         LOG.info("Start Read rc-file [{}].", sourceRcFilePath);
  270.         List<ColumnEntry> column = StorageReaderUtil
  271.                 .getListColumnEntry(readerSliceConfig, COLUMN);
  272.         // warn: no default value '\N'
  273.         String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
  274.         Path rcFilePath = new Path(sourceRcFilePath);
  275.         RCFileRecordReader recordReader = null;
  276.         try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) {
  277.             long fileLen = fs.getFileStatus(rcFilePath).getLen();
  278.             FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);
  279.             recordReader = new RCFileRecordReader(hadoopConf, split);
  280.             LongWritable key = new LongWritable();
  281.             BytesRefArrayWritable value = new BytesRefArrayWritable();
  282.             Text txt = new Text();
  283.             while (recordReader.next(key, value)) {
  284.                 String[] sourceLine = new String[value.size()];
  285.                 txt.clear();
  286.                 for (int i = 0; i < value.size(); i++) {
  287.                     BytesRefWritable v = value.get(i);
  288.                     txt.set(v.getData(), v.getStart(), v.getLength());
  289.                     sourceLine[i] = txt.toString();
  290.                 }
  291.                 StorageReaderUtil.transportOneRecord(recordSender,
  292.                         column, sourceLine, nullFormat, taskPluginCollector);
  293.             }
  294.         }
  295.         catch (IOException e) {
  296.             String message = String.format("读取文件[%s]时出错", sourceRcFilePath);
  297.             LOG.error(message);
  298.             throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e);
  299.         }
  300.         finally {
  301.             try {
  302.                 if (recordReader != null) {
  303.                     recordReader.close();
  304.                     LOG.info("Finally, Close RCFileRecordReader.");
  305.                 }
  306.             }
  307.             catch (IOException e) {
  308.                 LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage()));
  309.             }
  310.         }
  311.     }
  312.     public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
  313.             RecordSender recordSender, TaskPluginCollector taskPluginCollector)
  314.     {
  315.         LOG.info("Start Read orc-file [{}].", sourceOrcFilePath);
  316.         List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
  317.         String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
  318.         try {
  319.             Path orcFilePath = new Path(sourceOrcFilePath);
  320.             Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));
  321.             TypeDescription schema = reader.getSchema();
  322.             assert column != null;
  323.             if (column.isEmpty()) {
  324.                 for (int i = 0; i < schema.getChildren().size(); i++) {
  325.                     ColumnEntry columnEntry = new ColumnEntry();
  326.                     columnEntry.setIndex(i);
  327.                     columnEntry.setType(schema.getChildren().get(i).getCategory().getName());
  328.                     column.add(columnEntry);
  329.                 }
  330.             }
  331.             VectorizedRowBatch rowBatch = schema.createRowBatch(1024);
  332.             org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));
  333.             while (rowIterator.nextBatch(rowBatch)) {
  334.                 transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);
  335.             }
  336.         }
  337.         catch (Exception e) {
  338.             String message = String.format("从orc-file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
  339.                     , sourceOrcFilePath);
  340.             LOG.error(message);
  341.             throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
  342.         }
  343.     }
  344.     private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender,
  345.             TaskPluginCollector taskPluginCollector, String nullFormat)
  346.     {
  347.         Record record;
  348.         for (int row = 0; row < rowBatch.size; row++) {
  349.             record = recordSender.createRecord();
  350.             try {
  351.                 for (ColumnEntry column : columns) {
  352.                     Column columnGenerated;
  353.                     if (column.getValue() != null) {
  354.                         if (!"null".equals(column.getValue())) {
  355.                             columnGenerated = new StringColumn(column.getValue());
  356.                         }
  357.                         else {
  358.                             columnGenerated = new StringColumn();
  359.                         }
  360.                         record.addColumn(columnGenerated);
  361.                         continue;
  362.                     }
  363.                     int i = column.getIndex();
  364.                     String columnType = column.getType().toUpperCase();
  365.                     ColumnVector col = rowBatch.cols[i];
  366.                     Type type = Type.valueOf(columnType);
  367.                     if (col.isNull[row]) {
  368.                         record.addColumn(new StringColumn(null));
  369.                         continue;
  370.                     }
  371.                     switch (type) {
  372.                         case INT:
  373.                         case LONG:
  374.                         case BOOLEAN:
  375.                         case BIGINT:
  376.                             columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]);
  377.                             break;
  378.                         case DATE:
  379.                             columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row]));
  380.                             break;
  381.                         case DOUBLE:
  382.                             columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]);
  383.                             break;
  384.                         case DECIMAL:
  385.                             columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue());
  386.                             break;
  387.                         case BINARY:
  388.                             BytesColumnVector b = (BytesColumnVector) col;
  389.                             byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]);
  390.                             columnGenerated = new BytesColumn(val);
  391.                             break;
  392.                         case TIMESTAMP:
  393.                             columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row));
  394.                             break;
  395.                         default:
  396.                             // type is string or other
  397.                             String v = ((BytesColumnVector) col).toString(row);
  398.                             columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v);
  399.                             break;
  400.                     }
  401.                     record.addColumn(columnGenerated);
  402.                 }
  403.                 recordSender.sendToWriter(record);
  404.             }
  405.             catch (Exception e) {
  406.                 if (e instanceof DataXException) {
  407.                     throw (DataXException) e;
  408.                 }
  409.                 taskPluginCollector.collectDirtyRecord(record, e.getMessage());
  410.             }
  411.         }
  412.     }
  413.     public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
  414.             RecordSender recordSender, TaskPluginCollector taskPluginCollector)
  415.     {
  416.         LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath);
  417.         List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
  418.         String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
  419.         Path parquetFilePath = new Path(sourceParquetFilePath);
  420.         hadoopConf.set("parquet.avro.readInt96AsFixed", "true");
  421.         JobConf conf = new JobConf(hadoopConf);
  422.         GenericData decimalSupport = new GenericData();
  423.         decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
  424.         try (ParquetReader<GenericData.Record> reader = AvroParquetReader
  425.                 .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))
  426.                 .withDataModel(decimalSupport)
  427.                 .withConf(conf)
  428.                 .build()) {
  429.             GenericData.Record gRecord = reader.read();
  430.             Schema schema = gRecord.getSchema();
  431.             if (null == column || column.isEmpty()) {
  432.                 column = new ArrayList<>(schema.getFields().size());
  433.                 String sType;
  434.                 // 用户没有填写具体的字段信息,需要从parquet文件构建
  435.                 for (int i = 0; i < schema.getFields().size(); i++) {
  436.                     ColumnEntry columnEntry = new ColumnEntry();
  437.                     columnEntry.setIndex(i);
  438.                     Schema type;
  439.                     if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {
  440.                         type = schema.getFields().get(i).schema().getTypes().get(1);
  441.                     }
  442.                     else {
  443.                         type = schema.getFields().get(i).schema();
  444.                     }
  445.                     sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
  446.                     if (sType.startsWith("timestamp")) {
  447.                         columnEntry.setType("timestamp");
  448.                     }
  449.                     else {
  450.                         columnEntry.setType(sType);
  451.                     }
  452.                     column.add(columnEntry);
  453.                 }
  454.             }
  455.             while (gRecord != null) {
  456.                 transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);
  457.                 gRecord = reader.read();
  458.             }
  459.         }
  460.         catch (IOException e) {
  461.             String message = String.format("从parquet file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
  462.                     , sourceParquetFilePath);
  463.             LOG.error(message);
  464.             throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
  465.         }
  466.     }
  467.     /*
  468.      * create a transport record for Parquet file
  469.      *
  470.      *
  471.      */
  472.     private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender,
  473.             TaskPluginCollector taskPluginCollector, String nullFormat)
  474.     {
  475.         Record record = recordSender.createRecord();
  476.         Column columnGenerated;
  477.         int scale = 10;
  478.         try {
  479.             for (ColumnEntry columnEntry : columnConfigs) {
  480.                 String columnType = columnEntry.getType();
  481.                 Integer columnIndex = columnEntry.getIndex();
  482.                 String columnConst = columnEntry.getValue();
  483.                 String columnValue = null;
  484.                 if (null != columnIndex) {
  485.                     if (null != gRecord.get(columnIndex)) {
  486.                         columnValue = gRecord.get(columnIndex).toString();
  487.                     }
  488.                     else {
  489.                         record.addColumn(new StringColumn(null));
  490.                         continue;
  491.                     }
  492.                 }
  493.                 else {
  494.                     columnValue = columnConst;
  495.                 }
  496.                 if (columnType.startsWith("decimal(")) {
  497.                     String ps = columnType.replace("decimal(", "").replace(")", "");
  498.                     columnType = "decimal";
  499.                     if (ps.contains(",")) {
  500.                         scale = Integer.parseInt(ps.split(",")[1].trim());
  501.                     }
  502.                     else {
  503.                         scale = 0;
  504.                     }
  505.                 }
  506.                 Type type = Type.valueOf(columnType.toUpperCase());
  507.                 if (StringUtils.equals(columnValue, nullFormat)) {
  508.                     columnValue = null;
  509.                 }
  510.                 try {
  511.                     switch (type) {
  512.                         case STRING:
  513.                             columnGenerated = new StringColumn(columnValue);
  514.                             break;
  515.                         case INT:
  516.                         case LONG:
  517.                             columnGenerated = new LongColumn(columnValue);
  518.                             break;
  519.                         case DOUBLE:
  520.                             columnGenerated = new DoubleColumn(columnValue);
  521.                             break;
  522.                         case DECIMAL:
  523.                             if (null == columnValue) {
  524.                                 columnGenerated = new DoubleColumn((Double) null);
  525.                             }
  526.                             else {
  527.                                 columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));
  528.                             }
  529.                             break;
  530.                         case BOOLEAN:
  531.                             columnGenerated = new BoolColumn(columnValue);
  532.                             break;
  533.                         case DATE:
  534.                             if (columnValue == null) {
  535.                                 columnGenerated = new DateColumn((Date) null);
  536.                             }
  537.                             else {
  538.                                 String formatString = columnEntry.getFormat();
  539.                                 if (StringUtils.isNotBlank(formatString)) {
  540.                                     // 用户自己配置的格式转换
  541.                                     SimpleDateFormat format = new SimpleDateFormat(
  542.                                             formatString);
  543.                                     columnGenerated = new DateColumn(
  544.                                             format.parse(columnValue));
  545.                                 }
  546.                                 else {
  547.                                     // 框架尝试转换
  548.                                     columnGenerated = new DateColumn(new StringColumn(columnValue).asDate());
  549.                                 }
  550.                             }
  551.                             break;
  552.                         case TIMESTAMP:
  553.                             if (null == columnValue) {
  554.                                 columnGenerated = new DateColumn();
  555.                             }
  556.                             else if (columnValue.startsWith("[")) {
  557.                                 // INT96 https://github.com/apache/parquet-mr/pull/901
  558.                                 GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex);
  559.                                 Date date = new Date(getTimestampMills(fixed.bytes()));
  560.                                 columnGenerated = new DateColumn(date);
  561.                             }
  562.                             else {
  563.                                 columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
  564.                             }
  565.                             break;
  566.                         case BINARY:
  567.                             columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
  568.                             break;
  569.                         default:
  570.                             String errorMessage = String.format("您配置的列类型暂不支持 : [%s]", columnType);
  571.                             LOG.error(errorMessage);
  572.                             throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage);
  573.                     }
  574.                 }
  575.                 catch (Exception e) {
  576.                     throw new IllegalArgumentException(String.format(
  577.                             "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e));
  578.                 }
  579.                 record.addColumn(columnGenerated);
  580.             } // end for
  581.             recordSender.sendToWriter(record);
  582.         }
  583.         catch (IllegalArgumentException | IndexOutOfBoundsException iae) {
  584.             taskPluginCollector.collectDirtyRecord(record, iae.getMessage());
  585.         }
  586.         catch (Exception e) {
  587.             if (e instanceof DataXException) {
  588.                 throw (DataXException) e;
  589.             }
  590.             // 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
  591.             taskPluginCollector.collectDirtyRecord(record, e.getMessage());
  592.         }
  593.     }
  594.     private TypeDescription getOrcSchema(String filePath)
  595.     {
  596.         Path path = new Path(filePath);
  597.         try {
  598.             Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));
  599. //            return reader.getTypes().get(0).getSubtypesCount()
  600.             return reader.getSchema();
  601.         }
  602.         catch (IOException e) {
  603.             String message = "读取orc-file column列数失败,请联系系统管理员";
  604.             throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
  605.         }
  606.     }
  607.     public boolean checkHdfsFileType(String filepath, String specifiedFileType)
  608.     {
  609.         Path file = new Path(filepath);
  610.         try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) {
  611.             if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) {
  612.                 return isORCFile(file, fs, in);
  613.             }
  614.             else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) {
  615.                 return isRCFile(filepath, in);
  616.             }
  617.             else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) {
  618.                 return isSequenceFile(file, in);
  619.             }
  620.             else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) {
  621.                 return isParquetFile(file);
  622.             }
  623.             else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV)
  624.                     || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) {
  625.                 return true;
  626.             }
  627.         }
  628.         catch (Exception e) {
  629.             String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," +
  630.                     "请检查您文件类型和文件是否正确。", filepath, HdfsConstant.SUPPORT_FILE_TYPE);
  631.             LOG.error(message);
  632.             throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
  633.         }
  634.         return false;
  635.     }
  636.     // 判断file是否是ORC File
  637.     private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in)
  638.     {
  639.         try {
  640.             // figure out the size of the file using the option or filesystem
  641.             long size = fs.getFileStatus(file).getLen();
  642.             //read last bytes into buffer to get PostScript
  643.             int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
  644.             in.seek(size - readSize);
  645.             ByteBuffer buffer = ByteBuffer.allocate(readSize);
  646.             in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
  647.                     buffer.remaining());
  648.             //read the PostScript
  649.             //get length of PostScript
  650.             int psLen = buffer.get(readSize - 1) & 0xff;
  651.             String orcMagic = org.apache.orc.OrcFile.MAGIC;
  652.             int len = orcMagic.length();
  653.             if (psLen < len + 1) {
  654.                 return false;
  655.             }
  656.             int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1
  657.                     - len;
  658.             byte[] array = buffer.array();
  659.             // now look for the magic string at the end of the postscript.
  660.             if (Text.decode(array, offset, len).equals(orcMagic)) {
  661.                 return true;
  662.             }
  663.             else {
  664.                 // If it isn't there, this may be the 0.11.0 version of ORC.
  665.                 // Read the first 3 bytes of the file to check for the header
  666.                 in.seek(0);
  667.                 byte[] header = new byte[len];
  668.                 in.readFully(header, 0, len);
  669.                 // if it isn't there, this isn't an ORC file
  670.                 if (Text.decode(header, 0, len).equals(orcMagic)) {
  671.                     return true;
  672.                 }
  673.             }
  674.         }
  675.         catch (IOException e) {
  676.             LOG.info("检查文件类型: [{}] 不是ORC File.", file);
  677.         }
  678.         return false;
  679.     }
  680.     // 判断file是否是RC file
  681.     private boolean isRCFile(String filepath, FSDataInputStream in)
  682.     {
  683.         // The first version of RCFile used the sequence file header.
  684.         final byte[] originalMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
  685.         // The 'magic' bytes at the beginning of the RCFile
  686.         final byte[] rcMagic = {(byte) 'R', (byte) 'C', (byte) 'F'};
  687.         // the version that was included with the original magic, which is mapped
  688.         // into ORIGINAL_VERSION
  689.         final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
  690.         // All the versions should be place in this list.
  691.         final int ORIGINAL_VERSION = 0;  // version with SEQ
  692.         // version with RCF
  693.         // final int NEW_MAGIC_VERSION = 1
  694.         // final int CURRENT_VERSION = NEW_MAGIC_VERSION
  695.         final int CURRENT_VERSION = 1;
  696.         byte version;
  697.         byte[] magic = new byte[rcMagic.length];
  698.         try {
  699.             in.seek(0);
  700.             in.readFully(magic);
  701.             if (Arrays.equals(magic, originalMagic)) {
  702.                 if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
  703.                     return false;
  704.                 }
  705.                 version = ORIGINAL_VERSION;
  706.             }
  707.             else {
  708.                 if (!Arrays.equals(magic, rcMagic)) {
  709.                     return false;
  710.                 }
  711.                 // Set 'version'
  712.                 version = in.readByte();
  713.                 if (version > CURRENT_VERSION) {
  714.                     return false;
  715.                 }
  716.             }
  717.             if (version == ORIGINAL_VERSION) {
  718.                 try {
  719.                     Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in));
  720.                     Class<?> valCls = hadoopConf.getClassByName(Text.readString(in));
  721.                     if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) {
  722.                         return false;
  723.                     }
  724.                 }
  725.                 catch (ClassNotFoundException e) {
  726.                     return false;
  727.                 }
  728.             }
  729. //            boolean decompress = in.readBoolean(); // is compressed?
  730.             if (version == ORIGINAL_VERSION) {
  731.                 // is block-compressed? it should be always false.
  732.                 boolean blkCompressed = in.readBoolean();
  733.                 return !blkCompressed;
  734.             }
  735.             return true;
  736.         }
  737.         catch (IOException e) {
  738.             LOG.info("检查文件类型: [{}] 不是RC File.", filepath);
  739.         }
  740.         return false;
  741.     }
  742.     // 判断file是否是Sequence file
  743.     private boolean isSequenceFile(Path filepath, FSDataInputStream in)
  744.     {
  745.         final byte[] seqMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
  746.         byte[] magic = new byte[seqMagic.length];
  747.         try {
  748.             in.seek(0);
  749.             in.readFully(magic);
  750.             return Arrays.equals(magic, seqMagic);
  751.         }
  752.         catch (IOException e) {
  753.             LOG.info("检查文件类型: [{}] 不是Sequence File.", filepath);
  754.         }
  755.         return false;
  756.     }
  757.     //判断是否为parquet(考虑判断parquet文件的schema是否不为空)
  758.     private boolean isParquetFile(Path file)
  759.     {
  760.         try {
  761.             GroupReadSupport readSupport = new GroupReadSupport();
  762.             ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, file);
  763.             ParquetReader<Group> build = reader.build();
  764.             if (build.read() != null) {
  765.                 return true;
  766.             }
  767.         }
  768.         catch (IOException e) {
  769.             LOG.info("检查文件类型: [{}] 不是Parquet File.", file);
  770.         }
  771.         return false;
  772.     }
  773.     /**
  774.      * Returns GMT's timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos).
  775.      *
  776.      * @param timestampBinary INT96 parquet timestamp
  777.      * @return timestamp in millis, GMT timezone
  778.      */
  779.     public static long getTimestampMillis(Binary timestampBinary)
  780.     {
  781.         if (timestampBinary.length() != 12) {
  782.             return 0;
  783.         }
  784.         byte[] bytes = timestampBinary.getBytes();
  785.         return getTimestampMills(bytes);
  786.     }
  787.     public static long getTimestampMills(byte[] bytes)
  788.     {
  789.         assert bytes.length == 12;
  790.         // little endian encoding - need to invert byte order
  791.         long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
  792.         int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);
  793.         return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
  794.     }
  795.     private static long julianDayToMillis(int julianDay)
  796.     {
  797.         return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
  798.     }
  799. }
复制代码
HdfsConstant
  1. package com.alibaba.datax.plugin.reader.hdfsreader;
  2. import com.alibaba.datax.common.base.Constant;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. public class HdfsConstant
  6.         extends Constant
  7. {
  8.     public static final String SOURCE_FILES = "sourceFiles";
  9.     public static final String TEXT = "TEXT";
  10.     public static final String ORC = "ORC";
  11.     public static final String CSV = "CSV";
  12.     public static final String SEQ = "SEQ";
  13.     public static final String RC = "RC";
  14.     public static final String PARQUET = "PARQUET"; //新增parquet文件类型
  15.     public static final String HDFS_DEFAULT_KEY = "fs.defaultFS";
  16.     public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
  17.     protected static final List<String> SUPPORT_FILE_TYPE =
  18.             Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET);
  19.     private HdfsConstant() {}
  20. }
复制代码
HdfsReader
  1. package com.alibaba.datax.plugin.reader.hdfsreader;
  2. import com.alibaba.datax.common.base.Key;
  3. import com.alibaba.datax.common.exception.DataXException;
  4. import com.alibaba.datax.common.plugin.RecordSender;
  5. import com.alibaba.datax.common.spi.Reader;
  6. import com.alibaba.datax.common.util.Configuration;
  7. import com.alibaba.datax.storage.reader.StorageReaderUtil;
  8. import com.alibaba.datax.storage.util.FileHelper;
  9. import org.apache.commons.io.Charsets;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import java.io.InputStream;
  13. import java.nio.charset.UnsupportedCharsetException;
  14. import java.util.ArrayList;
  15. import java.util.Collections;
  16. import java.util.HashSet;
  17. import java.util.List;
  18. import static com.alibaba.datax.common.base.Key.COLUMN;
  19. import static com.alibaba.datax.common.base.Key.ENCODING;
  20. import static com.alibaba.datax.common.base.Key.INDEX;
  21. import static com.alibaba.datax.common.base.Key.TYPE;
  22. import static com.alibaba.datax.common.base.Key.VALUE;
  23. public class HdfsReader
  24.         extends Reader
  25. {
  26.     /**
  27.      * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
  28.      * <p>
  29.      * 整个 Reader 执行流程是:
  30.      * <pre>
  31.      * Job类init-->prepare-->split
  32.      * Task类init-->prepare-->startRead-->post-->destroy
  33.      * Task类init-->prepare-->startRead-->post-->destroy
  34.      * Job类post-->destroy
  35.      * </pre>
  36.      */
  37.     public static class Job
  38.             extends Reader.Job
  39.     {
  40.         private static final Logger LOG = LoggerFactory.getLogger(Job.class);
  41.         private Configuration readerOriginConfig = null;
  42.         private HashSet<String> sourceFiles;
  43.         private String specifiedFileType = null;
  44.         private DFSUtil dfsUtil = null;
  45.         private List<String> path = null;
  46.         @Override
  47.         public void init()
  48.         {
  49.             LOG.info("init() begin...");
  50.             this.readerOriginConfig = getPluginJobConf();
  51.             validate();
  52.             dfsUtil = new DFSUtil(readerOriginConfig);
  53.             LOG.info("init() ok and end...");
  54.         }
  55.         public void validate()
  56.         {
  57.             readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
  58.             // path check
  59.             String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
  60.             if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) {
  61.                 path = Collections.singletonList(pathInString);
  62.             }
  63.             else {
  64.                 path = readerOriginConfig.getList(Key.PATH, String.class);
  65.                 if (null == path || path.isEmpty()) {
  66.                     throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待读取的源目录或文件");
  67.                 }
  68.                 for (String eachPath : path) {
  69.                     if (!eachPath.startsWith("/")) {
  70.                         String message = String.format("请检查参数path:[%s],需要配置为绝对路径", eachPath);
  71.                         LOG.error(message);
  72.                         throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message);
  73.                     }
  74.                 }
  75.             }
  76.             specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase();
  77.             if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) {
  78.                 String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 几种格式的文件";
  79.                 throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message);
  80.             }
  81.             String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8");
  82.             try {
  83.                 Charsets.toCharset(encoding);
  84.             }
  85.             catch (UnsupportedCharsetException uce) {
  86.                 throw DataXException.asDataXException(
  87.                         HdfsReaderErrorCode.ILLEGAL_VALUE,
  88.                         String.format("不支持的编码格式 : [%s]", encoding), uce);
  89.             }
  90.             catch (Exception e) {
  91.                 throw DataXException.asDataXException(
  92.                         HdfsReaderErrorCode.ILLEGAL_VALUE,
  93.                         String.format("运行配置异常 : %s", e.getMessage()), e);
  94.             }
  95.             //check Kerberos
  96.             boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false);
  97.             if (haveKerberos) {
  98.                 readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
  99.                 readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE);
  100.             }
  101.             // validate the Columns
  102.             validateColumns();
  103.             // validate compress
  104.             String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE");
  105.             if ("gzip".equalsIgnoreCase(compress)) {
  106.                 // correct to gz
  107.                 readerOriginConfig.set(Key.COMPRESS, "gz");
  108.             }
  109.         }
  110.         private void validateColumns()
  111.         {
  112.             // 检测是column 是否为 ["*"] 若是则填为空
  113.             List<Configuration> column = this.readerOriginConfig.getListConfiguration(COLUMN);
  114.             if (null != column && 1 == column.size()
  115.                     && (""*"".equals(column.get(0).toString()) || "'*'".equals(column.get(0).toString()))) {
  116.                 readerOriginConfig.set(COLUMN, new ArrayList<String>());
  117.             }
  118.             else {
  119.                 // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value
  120.                 List<Configuration> columns = readerOriginConfig.getListConfiguration(COLUMN);
  121.                 if (null == columns || columns.isEmpty()) {
  122.                     throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns");
  123.                 }
  124.                 for (Configuration eachColumnConf : columns) {
  125.                     eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
  126.                     Integer columnIndex = eachColumnConf.getInt(INDEX);
  127.                     String columnValue = eachColumnConf.getString(VALUE);
  128.                     if (null == columnIndex && null == columnValue) {
  129.                         throw DataXException.asDataXException(
  130.                                 HdfsReaderErrorCode.NO_INDEX_VALUE,
  131.                                 "由于您配置了type, 则至少需要配置 index 或 value, 当前配置为:" + eachColumnConf);
  132.                     }
  133.                     if (null != columnIndex && null != columnValue) {
  134.                         throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE,
  135.                                 "您混合配置了index, value, 每一列同时仅能选择其中一种");
  136.                     }
  137.                 }
  138.             }
  139.         }
  140.         @Override
  141.         public void prepare()
  142.         {
  143.             LOG.info("prepare(), start to getAllFiles...");
  144.             this.sourceFiles = (HashSet<String>) dfsUtil.getAllFiles(path, specifiedFileType);
  145.             LOG.info("您即将读取的文件数为: [{}], 列表为: [{}]", sourceFiles.size(), sourceFiles);
  146.         }
  147.         @Override
  148.         public List<Configuration> split(int adviceNumber)
  149.         {
  150.             LOG.info("split() begin...");
  151.             List<Configuration> readerSplitConfigs = new ArrayList<>();
  152.             // warn:每个slice拖且仅拖一个文件,
  153.             int splitNumber = sourceFiles.size();
  154.             if (0 == splitNumber) {
  155.                 throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
  156.                         String.format("未能找到待读取的文件,请确认您的配置项path: %s", readerOriginConfig.getString(Key.PATH)));
  157.             }
  158.             List<List<String>> splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber);
  159.             for (List<String> files : splitSourceFiles) {
  160.                 Configuration splitConfig = readerOriginConfig.clone();
  161.                 splitConfig.set(HdfsConstant.SOURCE_FILES, files);
  162.                 readerSplitConfigs.add(splitConfig);
  163.             }
  164.             return readerSplitConfigs;
  165.         }
  166.         @Override
  167.         public void post()
  168.         {
  169.             //
  170.         }
  171.         @Override
  172.         public void destroy()
  173.         {
  174.             //
  175.         }
  176.     }
  177.     public static class Task
  178.             extends Reader.Task
  179.     {
  180.         private static final Logger LOG = LoggerFactory.getLogger(Task.class);
  181.         private Configuration taskConfig;
  182.         private List<String> sourceFiles;
  183.         private String specifiedFileType;
  184.         private DFSUtil dfsUtil = null;
  185.         @Override
  186.         public void init()
  187.         {
  188.             this.taskConfig = getPluginJobConf();
  189.             this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class);
  190.             this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
  191.             this.dfsUtil = new DFSUtil(taskConfig);
  192.         }
  193.         @Override
  194.         public void prepare()
  195.         {
  196.             //
  197.         }
  198.         @Override
  199.         public void startRead(RecordSender recordSender)
  200.         {
  201.             LOG.info("read start");
  202.             for (String sourceFile : this.sourceFiles) {
  203.                 LOG.info("reading file : [{}]", sourceFile);
  204.                 if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) {
  205.                     InputStream inputStream = dfsUtil.getInputStream(sourceFile);
  206.                     StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector());
  207.                 }
  208.                 else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) {
  209.                     dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
  210.                 }
  211.                 else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) {
  212.                     dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
  213.                 }
  214.                 else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) {
  215.                     dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
  216.                 }
  217.                 else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) {
  218.                     dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
  219.                 }
  220.                 else {
  221.                     String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六种格式的文件," +
  222.                             "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE, RC, PARQUET";
  223.                     throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
  224.                 }
  225.                 if (recordSender != null) {
  226.                     recordSender.flush();
  227.                 }
  228.             }
  229.             LOG.info("end read source files...");
  230.         }
  231.         @Override
  232.         public void post()
  233.         {
  234.             //
  235.         }
  236.         @Override
  237.         public void destroy()
  238.         {
  239.             //
  240.         }
  241.     }
  242. }
复制代码
HdfsWriter插件

    本插件比较简单,一共五个类,具体类名及对应修改项如下:
    按需修改其中四个类即可,具体代码如下:
HdfsHelper

[code]package com.alibaba.datax.plugin.writer.hdfswriter;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.datax.common.base.Constant;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.element.Column;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordReceiver;import com.alibaba.datax.common.plugin.TaskPluginCollector;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;import org.apache.avro.Conversions;import org.apache.avro.LogicalTypes;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.avro.generic.GenericRecordBuilder;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.Validate;import org.apache.commons.lang3.tuple.MutablePair;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.hadoop.hive.common.type.HiveDecimal;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobContext;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.security.authentication.util.KerberosName;import org.apache.orc.CompressionKind;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import org.apache.parquet.avro.AvroParquetWriter;import org.apache.parquet.column.ParquetProperties;import org.apache.parquet.hadoop.ParquetWriter;import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType;import org.apache.parquet.schema.Types;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Field;import java.math.BigDecimal;import java.nio.charset.StandardCharsets;import java.sql.Timestamp;import java.text.SimpleDateFormat;import java.util.*;public class HdfsHelper{    public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class);    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";    public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS";    private FileSystem fileSystem = null;    private JobConf conf = null;    private org.apache.hadoop.conf.Configuration hadoopConf = null;    // Kerberos    private boolean haveKerberos = false;    private String kerberosKeytabFilePath;    private String kerberosPrincipal;    private String krb5ConfPath;    public static MutablePair transportOneRecord(            Record record, char fieldDelimiter, List columnsConfiguration, TaskPluginCollector taskPluginCollector)    {        MutablePair transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector);        //保存        MutablePair transportResult = new MutablePair();        transportResult.setRight(false);        Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));        transportResult.setRight(transportResultList.getRight());        transportResult.setLeft(recordResult);        return transportResult;    }    public static MutablePair transportOneRecord(            Record record, List columnsConfiguration,            TaskPluginCollector taskPluginCollector)    {        MutablePair transportResult = new MutablePair();        transportResult.setRight(false);        List recordList = new ArrayList();        int recordLength = record.getColumnNumber();        if (0 != recordLength) {            Column column;            for (int i = 0; i < recordLength; i++) {                column = record.getColumn(i);                if (null != column.getRawData()) {                    String rowData = column.getRawData().toString();                    SupportHiveDataType columnType = SupportHiveDataType.valueOf(                            columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());                    //根据writer端类型配置做类型转换                    try {                        switch (columnType) {                            case TINYINT:                                recordList.add(Byte.valueOf(rowData));                                break;                            case SMALLINT:                                recordList.add(Short.valueOf(rowData));                                break;                            case INT:                            case INTEGER:                                recordList.add(Integer.valueOf(rowData));                                break;                            case BIGINT:                                recordList.add(column.asLong());                                break;                            case FLOAT:                                recordList.add(Float.valueOf(rowData));                                break;                            case DOUBLE:                                recordList.add(column.asDouble());                                break;                            case STRING:                            case VARCHAR:                            case CHAR:                                recordList.add(column.asString());                                break;                            case DECIMAL:                                recordList.add(HiveDecimal.create(column.asBigDecimal()));                                break;                            case BOOLEAN:                                recordList.add(column.asBoolean());                                break;                            case DATE:                                recordList.add(org.apache.hadoop.hive.common.type.Date.valueOf(column.asString()));                                break;                            case TIMESTAMP:                                recordList.add(Timestamp.valueOf(column.asString()));                                break;                            case BINARY:                                recordList.add(column.asBytes());                                break;                            default:                                throw DataXException                                        .asDataXException(                                                HdfsWriterErrorCode.ILLEGAL_VALUE,                                                String.format(                                                        "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",                                                        columnsConfiguration.get(i).getString(Key.NAME),                                                        columnsConfiguration.get(i).getString(Key.TYPE)));                        }                    }                    catch (Exception e) {                        // warn: 此处认为脏数据                        e.printStackTrace();                        String message = String.format(                                "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].",                                columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());                        taskPluginCollector.collectDirtyRecord(record, message);                        transportResult.setRight(true);                        break;                    }                }                else {                    // warn: it's all ok if nullFormat is null                    recordList.add(null);                }            }        }        transportResult.setLeft(recordList);        return transportResult;    }    public static GenericRecord transportParRecord(            Record record, List columnsConfiguration,            TaskPluginCollector taskPluginCollector, GenericRecordBuilder builder)    {        int recordLength = record.getColumnNumber();        if (0 != recordLength) {            Column column;            for (int i = 0; i < recordLength; i++) {                column = record.getColumn(i);                String colName = columnsConfiguration.get(i).getString(Key.NAME);                String typename = columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase();                if (null == column || column.getRawData() == null) {                    builder.set(colName, null);                }                else {                    String rowData = column.getRawData().toString();                    SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename);                    //根据writer端类型配置做类型转换                    try {                        switch (columnType) {                            case INT:                            case INTEGER:                                builder.set(colName, Integer.valueOf(rowData));                                break;                            case LONG:                                builder.set(colName, column.asLong());                                break;                            case FLOAT:                                builder.set(colName, Float.valueOf(rowData));                                break;                            case DOUBLE:                                builder.set(colName, column.asDouble());                                break;                            case STRING:                                builder.set(colName, column.asString());                                break;                            case DECIMAL:                                builder.set(colName, new BigDecimal(column.asString()).setScale(columnsConfiguration.get(i).getInt(Key.SCALE), BigDecimal.ROUND_HALF_UP));                                break;                            case BOOLEAN:                                builder.set(colName, column.asBoolean());                                break;                            case BINARY:                                builder.set(colName, column.asBytes());                                break;                            case TIMESTAMP:                                builder.set(colName, column.asLong() / 1000);                                break;                            default:                                throw DataXException                                        .asDataXException(                                                HdfsWriterErrorCode.ILLEGAL_VALUE,                                                String.format(                                                        "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",                                                        columnsConfiguration.get(i).getString(Key.NAME),                                                        columnsConfiguration.get(i).getString(Key.TYPE)));                        }                    }                    catch (Exception e) {                        // warn: 此处认为脏数据                        String message = String.format(                                "字段类型转换错误:目标字段为[%s]类型,实际字段值为[%s].",                                columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());                        taskPluginCollector.collectDirtyRecord(record, message);                        break;                    }                }            }        }        return builder.build();    }    public static String generateParquetSchemaFromColumnAndType(List columns) {        Map decimalColInfo = new HashMap(16);        ColumnTypeUtil.DecimalInfo PARQUET_DEFAULT_DECIMAL_INFO = new ColumnTypeUtil.DecimalInfo(10, 2);        Types.MessageTypeBuilder typeBuilder = Types.buildMessage();        for (Configuration column : columns) {            String name = column.getString("name");            String colType = column.getString("type");            Validate.notNull(name, "column.name can't be null");            Validate.notNull(colType, "column.type can't be null");            switch (colType.toLowerCase()) {                case "tinyint":                case "smallint":                case "int":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(name);                    break;                case "bigint":                case "long":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(name);                    break;                case "float":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(name);                    break;                case "double":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(name);                    break;                case "binary":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);                    break;                case "char":                case "varchar":                case "string":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(name);                    break;                case "boolean":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(name);                    break;                case "timestamp":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT96).named(name);                    break;                case "date":                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).as(OriginalType.DATE).named(name);                    break;                default:                    if (ColumnTypeUtil.isDecimalType(colType)) {                        ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(colType, PARQUET_DEFAULT_DECIMAL_INFO);                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)                                .as(OriginalType.DECIMAL)                                .precision(decimalInfo.getPrecision())                                .scale(decimalInfo.getScale())                                .length(HdfsUtil.computeMinBytesForPrecision(decimalInfo.getPrecision()))                                .named(name);                        decimalColInfo.put(name, decimalInfo);                    } else {                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);                    }                    break;            }        }        return typeBuilder.named("m").toString();    }    public FileSystem getFileSystem(String defaultFS, Configuration taskConfig)    {        this.hadoopConf = new org.apache.hadoop.conf.Configuration();        Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);        JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));        if (null != hadoopSiteParams) {            Set paramKeys = hadoopSiteParams.getKeys();            for (String each : paramKeys) {                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));            }        }        this.hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS);        //是否有Kerberos认证        this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);        if (haveKerberos) {            LOG.info("krb5.conf路径:【{}】 \n keytab路径:【{}】 \n principal:【{}】\n",                    taskConfig.getString(Key. KRB5_CONF_FILE_PATH),                    taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH),                    taskConfig.getString(Key.KERBEROS_PRINCIPAL));            this.krb5ConfPath = taskConfig.getString(Key. KRB5_CONF_FILE_PATH);            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);            LOG.info("检测到kerberos认证,正在进行认证");        }        System.setProperty("java.security.krb5.conf",krb5ConfPath);        System.setProperty("sun.security.krb5.Config",krb5ConfPath);        refreshConfig();        this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath,hadoopConf,this.krb5ConfPath);        conf = new JobConf(hadoopConf);        try {            fileSystem = FileSystem.get(conf);        }        catch (IOException e) {            String message = String.format("获取FileSystem时发生网络IO异常,请检查您的网络是否正常!HDFS地址:[message:defaultFS = %s]",                    defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        catch (Exception e) {            String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",                    "message:defaultFS =" + defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        if (null == fileSystem) {            String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [message:defaultFS = %s]",                    defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, message);        }        return fileSystem;    }    /** 刷新krb内容信息 */    public static void refreshConfig() {        try {            sun.security.krb5.Config.refresh();            Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");            defaultRealmField.setAccessible(true);            defaultRealmField.set(                    null,                    org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());            // reload java.security.auth.login.config            javax.security.auth.login.Configuration.setConfiguration(null);        } catch (Exception e) {            LOG.warn(                    "resetting default realm failed, current default realm will still be used.", e);        }    }    public   void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5ConfPath) {        hadoopConf.set("hadoop.security.authentication", "kerberos");        hadoopConf.set("hive.security.authentication", "kerberos");        hadoopConf.set("hadoop.security.authorization", "true");        hadoopConf.set("dfs.permissions","false");        hadoopConf.set("hadoop.security.auth_to_local","RULE:[2:$1@$0](.*@CDHDEV.COM)s/.*/hadoop/ \n" +                "        DEFAULT");        if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {            UserGroupInformation.setConfiguration(hadoopConf);            KerberosName.resetDefaultRealm();            try {                LOG.info("开始认证");                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);            }            catch (Exception e) {                LOG.info("kerberos认证失败");                String message = String.format("kerberos认证失败,请检查 " +                                "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",                        kerberosKeytabFilePath, kerberosPrincipal);                e.printStackTrace();                throw DataXException.asDataXException(HdfsWriterErrorCode.KERBEROS_LOGIN_ERROR, message, e);            }        }    }    /**     * 获取指定目录下的文件列表     *     * @param dir 需要搜索的目录     * @return 文件数组,文件是全路径,     * eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile     */    public Path[] hdfsDirList(String dir)    {        Path path = new Path(dir);        Path[] files;        try {            FileStatus[] status = fileSystem.listStatus(path);            files = new Path[status.length];            for (int i = 0; i < status.length; i++) {                files = status.getPath();            }        }        catch (IOException e) {            String message = String.format("获取目录[%s]文件列表时发生网络IO异常,请检查您的网络是否正常!", dir);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        return files;    }//    public boolean isPathExists(String filePath) {////        Path path = new Path(filePath);//        boolean exist;//        try {//            exist = fileSystem.exists(path);//        }//        catch (IOException e) {//            String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!",//                    "message:filePath =" + filePath);//            e.printStackTrace();//            LOG.error(message);//            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);//        }//        return exist;//    }    public boolean isPathDir(String filePath)    {        Path path = new Path(filePath);        boolean isDir;        try {            isDir = fileSystem.getFileStatus(path).isDirectory();        }        catch (IOException e) {            String message = String.format("判断路径[%s]是否是目录时发生网络IO异常,请检查您的网络是否正常!", filePath);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        return isDir;    }    public void deleteFilesFromDir(Path dir)    {        try {            final RemoteIterator files = fileSystem.listFiles(dir, false);            while (files.hasNext()) {                final LocatedFileStatus next = files.next();                fileSystem.deleteOnExit(next.getPath());            }        }        catch (FileNotFoundException fileNotFoundException) {            throw new DataXException(HdfsWriterErrorCode.FILE_NOT_FOUND, fileNotFoundException.getMessage());        }        catch (IOException ioException) {            throw new DataXException(HdfsWriterErrorCode.IO_ERROR, ioException.getMessage());        }    }    public void deleteDir(Path path)    {        LOG.info("start delete tmp dir [{}] .", path);        try {            if (fileSystem.exists(path)) {                fileSystem.delete(path, true);            }        }        catch (Exception e) {            LOG.error("删除临时目录[{}]时发生IO异常,请检查您的网络是否正常!", path);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }        LOG.info("finish delete tmp dir [{}] .", path);    }    /**     * move all files in sourceDir to targetDir     *     * @param sourceDir the source directory     * @param targetDir the target directory     */    public void moveFilesToDest(Path sourceDir, Path targetDir)    {        try {            final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir);            for (FileStatus file : fileStatuses) {                if (file.isFile() && file.getLen() > 0) {                    LOG.info("start move file [{}] to dir [{}].", file.getPath(), targetDir.getName());                    fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName()));                }            }        }        catch (IOException e) {            throw DataXException.asDataXException(HdfsWriterErrorCode.IO_ERROR, e);        }        LOG.info("finish move file(s).");    }    //关闭FileSystem    public void closeFileSystem()    {        try {            fileSystem.close();        }        catch (IOException e) {            LOG.error("关闭FileSystem时发生IO异常,请检查您的网络是否正常!");            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);        }    }    // 写text file类型文件    public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,                                   TaskPluginCollector taskPluginCollector)    {        char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);        List columns = config.getListConfiguration(Key.COLUMN);        String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase().trim();        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");        String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0";        conf.set(JobContext.TASK_ATTEMPT_ID, attempt);        if (!"NONE".equals(compress)) {            // fileName must remove suffix, because the FileOutputFormat will add suffix            fileName = fileName.substring(0, fileName.lastIndexOf("."));            Class




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4