东湖之滨 发表于 2023-2-13 22:08:51

DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写

一、研发背景

    DataX官方开源的版本支持HDFS文件的读写,但是截止目前,并没有支持Parquet文件的读写,得益于DataX出色的数据同步性能,去年公司的项目大部分采用了DataX作为数据同步工具,但是从CDH集群同步Parquet或者将其他数据源的数据以Parquet格式写入HDFS,这两个常用场景没有进行支持。因此只能自己动手,补充HdfsReader和HdfsWriter插件,以支持Parquet文件的读写。
二、HdfsReader插件

    本插件比较简单,一共五个类,具体类名及对应修改项如下:

[*]DFSUtil:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
[*]HdfsConstant:增加Parquet文件类的枚举项。
[*]HdfsReader:增加判断是否配置为Parquet文件类型的判断条件分支。
[*]HdfsReaderErrorCode:无需更改。
[*]Type:无需更改。
    按需修改其中四个类即可,具体代码如下:
    DFSUtil

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.ColumnEntry;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.storage.reader.StorageReaderErrorCode;
import com.alibaba.datax.storage.reader.StorageReaderUtil;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.TypeDescription;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.alibaba.datax.common.base.Key.COLUMN;
import static com.alibaba.datax.common.base.Key.NULL_FORMAT;


public class DFSUtil
{
    private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);

    // the offset of julian, 2440588 is 1970/1/1
    private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
    private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
    private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

    private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
    private final org.apache.hadoop.conf.Configuration hadoopConf;
    private final boolean haveKerberos;
    private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>();
    private String specifiedFileType = null;
    private String kerberosKeytabFilePath;
    private String kerberosPrincipal;

    public DFSUtil(Configuration taskConfig)
    {
      hadoopConf = new org.apache.hadoop.conf.Configuration();
      //io.file.buffer.size 性能参数
      //http://blog.csdn.net/yangjl38/article/details/7583374
      Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
      JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
      if (null != hadoopSiteParams) {
            Set<String> paramKeys = hadoopSiteParams.getKeys();
            for (String each : paramKeys) {
                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
            }
      }
      hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));

      //是否有Kerberos认证
      this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
      if (haveKerberos) {
            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
            this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
      }
      this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);

      LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf));
    }

    private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath)
    {
      if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
            UserGroupInformation.setConfiguration(hadoopConf);
            try {
                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
            }
            catch (Exception e) {
                String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
                        kerberosKeytabFilePath, kerberosPrincipal);
                throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
            }
      }
    }

    /**
   * 获取指定路径列表下符合条件的所有文件的绝对路径
   *
   * @param srcPaths 路径列表
   * @param specifiedFileType 指定文件类型
   * @return set of string
   */
    public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType)
    {

      this.specifiedFileType = specifiedFileType;

      if (!srcPaths.isEmpty()) {
            for (String eachPath : srcPaths) {
                LOG.info("get HDFS all files in path = [{}]", eachPath);
                getHDFSAllFiles(eachPath);
            }
      }
      return sourceHDFSAllFilesList;
    }

    private void addSourceFileIfNotEmpty(FileStatus f)
    {
      if (f.isFile()) {
            String filePath = f.getPath().toString();
            if (f.getLen() > 0) {
                addSourceFileByType(filePath);
            }
            else {
                LOG.warn("文件[{}]长度为0,将会跳过不作处理!", filePath);
            }
      }
    }

    public void getHDFSAllFiles(String hdfsPath)
    {

      try {
            FileSystem hdfs = FileSystem.get(hadoopConf);
            //判断hdfsPath是否包含正则符号
            if (hdfsPath.contains("*") || hdfsPath.contains("?")) {
                Path path = new Path(hdfsPath);
                FileStatus[] stats = hdfs.globStatus(path);
                for (FileStatus f : stats) {
                  if (f.isFile()) {
                        addSourceFileIfNotEmpty(f);
                  }
                  else if (f.isDirectory()) {
                        getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
                  }
                }
            }
            else {
                getHDFSAllFilesNORegex(hdfsPath, hdfs);
            }
      }
      catch (IOException e) {
            String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," +
                  "是否有读写权限,网络是否已断开!", hdfsPath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e);
      }
    }

    private void getHDFSAllFilesNORegex(String path, FileSystem hdfs)
            throws IOException
    {

      // 获取要读取的文件的根目录
      Path listFiles = new Path(path);

      // If the network disconnected, this method will retry 45 times
      // each time the retry interval for 20 seconds
      // 获取要读取的文件的根目录的所有二级子文件目录
      FileStatus[] stats = hdfs.listStatus(listFiles);

      for (FileStatus f : stats) {
            // 判断是不是目录,如果是目录,递归调用
            if (f.isDirectory()) {
                LOG.info("[{}] 是目录, 递归获取该目录下的文件", f.getPath());
                getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
            }
            else if (f.isFile()) {
                addSourceFileIfNotEmpty(f);
            }
            else {
                String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。", f.getPath());
                LOG.info(message);
            }
      }
    }

    // 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList
    private void addSourceFileByType(String filePath)
    {
      // 检查file的类型和用户配置的fileType类型是否一致
      boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);

      if (isMatchedFileType) {
            String msg = String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType);
            LOG.info(msg);
            sourceHDFSAllFilesList.add(filePath);
      }
      else {
            String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +
                            "请确认您配置的目录下面所有文件的类型均为[%s]"
                  , filePath, this.specifiedFileType);
            LOG.error(message);
            throw DataXException.asDataXException(
                  HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
      }
    }

    public InputStream getInputStream(String filepath)
    {
      InputStream inputStream;
      Path path = new Path(filepath);
      try {
            FileSystem fs = FileSystem.get(hadoopConf);
            //If the network disconnected, this method will retry 45 times
            //each time the retry interval for 20 seconds
            inputStream = fs.open(path);
            return inputStream;
      }
      catch (IOException e) {
            String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
      }
    }

    public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
      LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath);

      Path seqFilePath = new Path(sourceSequenceFilePath);
      try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf,
                SequenceFile.Reader.file(seqFilePath))) {
            //获取SequenceFile.Reader实例
            //获取key 与 value
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
            Text value = new Text();
            while (reader.next(key, value)) {
                if (StringUtils.isNotBlank(value.toString())) {
                  StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString());
                }
            }
      }
      catch (Exception e) {
            String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e);
      }
    }

    public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
      LOG.info("Start Read rc-file [{}].", sourceRcFilePath);
      List<ColumnEntry> column = StorageReaderUtil
                .getListColumnEntry(readerSliceConfig, COLUMN);
      // warn: no default value '\N'
      String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

      Path rcFilePath = new Path(sourceRcFilePath);
      RCFileRecordReader recordReader = null;
      try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) {
            long fileLen = fs.getFileStatus(rcFilePath).getLen();
            FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);
            recordReader = new RCFileRecordReader(hadoopConf, split);
            LongWritable key = new LongWritable();
            BytesRefArrayWritable value = new BytesRefArrayWritable();
            Text txt = new Text();
            while (recordReader.next(key, value)) {
                String[] sourceLine = new String;
                txt.clear();
                for (int i = 0; i < value.size(); i++) {
                  BytesRefWritable v = value.get(i);
                  txt.set(v.getData(), v.getStart(), v.getLength());
                  sourceLine = txt.toString();
                }
                StorageReaderUtil.transportOneRecord(recordSender,
                        column, sourceLine, nullFormat, taskPluginCollector);
            }
      }
      catch (IOException e) {
            String message = String.format("读取文件[%s]时出错", sourceRcFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e);
      }
      finally {
            try {
                if (recordReader != null) {
                  recordReader.close();
                  LOG.info("Finally, Close RCFileRecordReader.");
                }
            }
            catch (IOException e) {
                LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage()));
            }
      }
    }

    public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
      LOG.info("Start Read orc-file [{}].", sourceOrcFilePath);
      List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
      String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

      try {
            Path orcFilePath = new Path(sourceOrcFilePath);
            Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));
            TypeDescription schema = reader.getSchema();
            assert column != null;
            if (column.isEmpty()) {
                for (int i = 0; i < schema.getChildren().size(); i++) {
                  ColumnEntry columnEntry = new ColumnEntry();
                  columnEntry.setIndex(i);
                  columnEntry.setType(schema.getChildren().get(i).getCategory().getName());
                  column.add(columnEntry);
                }
            }

            VectorizedRowBatch rowBatch = schema.createRowBatch(1024);
            org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));
            while (rowIterator.nextBatch(rowBatch)) {
                transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);
            }
      }
      catch (Exception e) {
            String message = String.format("从orc-file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                  , sourceOrcFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
      }
    }

    private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender,
            TaskPluginCollector taskPluginCollector, String nullFormat)
    {
      Record record;
      for (int row = 0; row < rowBatch.size; row++) {
            record = recordSender.createRecord();
            try {
                for (ColumnEntry column : columns) {

                  Column columnGenerated;
                  if (column.getValue() != null) {
                        if (!"null".equals(column.getValue())) {
                            columnGenerated = new StringColumn(column.getValue());
                        }
                        else {
                            columnGenerated = new StringColumn();
                        }
                        record.addColumn(columnGenerated);
                        continue;
                  }
                  int i = column.getIndex();
                  String columnType = column.getType().toUpperCase();
                  ColumnVector col = rowBatch.cols;
                  Type type = Type.valueOf(columnType);
                  if (col.isNull) {
                        record.addColumn(new StringColumn(null));
                        continue;
                  }
                  switch (type) {
                        case INT:
                        case LONG:
                        case BOOLEAN:
                        case BIGINT:
                            columnGenerated = new LongColumn(((LongColumnVector) col).vector);
                            break;
                        case DATE:
                            columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector));
                            break;
                        case DOUBLE:
                            columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector);
                            break;
                        case DECIMAL:
                            columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector.doubleValue());
                            break;
                        case BINARY:
                            BytesColumnVector b = (BytesColumnVector) col;
                            byte[] val = Arrays.copyOfRange(b.vector, b.start, b.start + b.length);
                            columnGenerated = new BytesColumn(val);
                            break;
                        case TIMESTAMP:
                            columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row));
                            break;
                        default:
                            // type is string or other
                            String v = ((BytesColumnVector) col).toString(row);
                            columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v);
                            break;
                  }
                  record.addColumn(columnGenerated);
                }
                recordSender.sendToWriter(record);
            }
            catch (Exception e) {
                if (e instanceof DataXException) {
                  throw (DataXException) e;
                }
                taskPluginCollector.collectDirtyRecord(record, e.getMessage());
            }
      }
    }

    public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
      LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath);
      List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
      String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
      Path parquetFilePath = new Path(sourceParquetFilePath);

      hadoopConf.set("parquet.avro.readInt96AsFixed", "true");
      JobConf conf = new JobConf(hadoopConf);

      GenericData decimalSupport = new GenericData();
      decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
      try (ParquetReader<GenericData.Record> reader = AvroParquetReader
                .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))
                .withDataModel(decimalSupport)
                .withConf(conf)
                .build()) {
            GenericData.Record gRecord = reader.read();
            Schema schema = gRecord.getSchema();

            if (null == column || column.isEmpty()) {
                column = new ArrayList<>(schema.getFields().size());
                String sType;
                // 用户没有填写具体的字段信息,需要从parquet文件构建
                for (int i = 0; i < schema.getFields().size(); i++) {
                  ColumnEntry columnEntry = new ColumnEntry();
                  columnEntry.setIndex(i);
                  Schema type;
                  if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {
                        type = schema.getFields().get(i).schema().getTypes().get(1);
                  }
                  else {
                        type = schema.getFields().get(i).schema();
                  }
                  sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
                  if (sType.startsWith("timestamp")) {
                        columnEntry.setType("timestamp");
                  }
                  else {
                        columnEntry.setType(sType);
                  }
                  column.add(columnEntry);
                }
            }
            while (gRecord != null) {
                transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);
                gRecord = reader.read();
            }
      }
      catch (IOException e) {
            String message = String.format("从parquet file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                  , sourceParquetFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
      }
    }

    /*
   * create a transport record for Parquet file
   *
   *
   */
    private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender,
            TaskPluginCollector taskPluginCollector, String nullFormat)
    {
      Record record = recordSender.createRecord();
      Column columnGenerated;
      int scale = 10;
      try {
            for (ColumnEntry columnEntry : columnConfigs) {
                String columnType = columnEntry.getType();
                Integer columnIndex = columnEntry.getIndex();
                String columnConst = columnEntry.getValue();
                String columnValue = null;
                if (null != columnIndex) {
                  if (null != gRecord.get(columnIndex)) {
                        columnValue = gRecord.get(columnIndex).toString();
                  }
                  else {
                        record.addColumn(new StringColumn(null));
                        continue;
                  }
                }
                else {
                  columnValue = columnConst;
                }
                if (columnType.startsWith("decimal(")) {
                  String ps = columnType.replace("decimal(", "").replace(")", "");
                  columnType = "decimal";
                  if (ps.contains(",")) {
                        scale = Integer.parseInt(ps.split(",").trim());
                  }
                  else {
                        scale = 0;
                  }
                }
                Type type = Type.valueOf(columnType.toUpperCase());
                if (StringUtils.equals(columnValue, nullFormat)) {
                  columnValue = null;
                }
                try {
                  switch (type) {
                        case STRING:
                            columnGenerated = new StringColumn(columnValue);
                            break;
                        case INT:
                        case LONG:
                            columnGenerated = new LongColumn(columnValue);
                            break;
                        case DOUBLE:
                            columnGenerated = new DoubleColumn(columnValue);
                            break;
                        case DECIMAL:
                            if (null == columnValue) {
                              columnGenerated = new DoubleColumn((Double) null);
                            }
                            else {
                              columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));
                            }
                            break;
                        case BOOLEAN:
                            columnGenerated = new BoolColumn(columnValue);
                            break;
                        case DATE:
                            if (columnValue == null) {
                              columnGenerated = new DateColumn((Date) null);
                            }
                            else {
                              String formatString = columnEntry.getFormat();
                              if (StringUtils.isNotBlank(formatString)) {
                                    // 用户自己配置的格式转换
                                    SimpleDateFormat format = new SimpleDateFormat(
                                          formatString);
                                    columnGenerated = new DateColumn(
                                          format.parse(columnValue));
                              }
                              else {
                                    // 框架尝试转换
                                    columnGenerated = new DateColumn(new StringColumn(columnValue).asDate());
                              }
                            }
                            break;
                        case TIMESTAMP:
                            if (null == columnValue) {
                              columnGenerated = new DateColumn();
                            }
                            else if (columnValue.startsWith("[")) {
                              // INT96 https://github.com/apache/parquet-mr/pull/901
                              GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex);
                              Date date = new Date(getTimestampMills(fixed.bytes()));
                              columnGenerated = new DateColumn(date);
                            }
                            else {
                              columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
                            }
                            break;
                        case BINARY:
                            columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
                            break;
                        default:
                            String errorMessage = String.format("您配置的列类型暂不支持 : [%s]", columnType);
                            LOG.error(errorMessage);
                            throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage);
                  }
                }
                catch (Exception e) {
                  throw new IllegalArgumentException(String.format(
                            "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e));
                }
                record.addColumn(columnGenerated);
            } // end for

            recordSender.sendToWriter(record);
      }
      catch (IllegalArgumentException | IndexOutOfBoundsException iae) {
            taskPluginCollector.collectDirtyRecord(record, iae.getMessage());
      }
      catch (Exception e) {
            if (e instanceof DataXException) {
                throw (DataXException) e;
            }
            // 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
            taskPluginCollector.collectDirtyRecord(record, e.getMessage());
      }
    }

    private TypeDescription getOrcSchema(String filePath)
    {
      Path path = new Path(filePath);
      try {
            Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));
//            return reader.getTypes().get(0).getSubtypesCount()
            return reader.getSchema();
      }
      catch (IOException e) {
            String message = "读取orc-file column列数失败,请联系系统管理员";
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
      }
    }

    public boolean checkHdfsFileType(String filepath, String specifiedFileType)
    {

      Path file = new Path(filepath);

      try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) {
            if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) {
                return isORCFile(file, fs, in);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) {
                return isRCFile(filepath, in);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) {

                return isSequenceFile(file, in);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) {
                return isParquetFile(file);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV)
                  || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) {
                return true;
            }
      }
      catch (Exception e) {
            String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," +
                  "请检查您文件类型和文件是否正确。", filepath, HdfsConstant.SUPPORT_FILE_TYPE);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
      }
      return false;
    }

    // 判断file是否是ORC File
    private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in)
    {
      try {
            // figure out the size of the file using the option or filesystem
            long size = fs.getFileStatus(file).getLen();

            //read last bytes into buffer to get PostScript
            int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
            in.seek(size - readSize);
            ByteBuffer buffer = ByteBuffer.allocate(readSize);
            in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
                  buffer.remaining());

            //read the PostScript
            //get length of PostScript
            int psLen = buffer.get(readSize - 1) & 0xff;
            String orcMagic = org.apache.orc.OrcFile.MAGIC;
            int len = orcMagic.length();
            if (psLen < len + 1) {
                return false;
            }
            int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1
                  - len;
            byte[] array = buffer.array();
            // now look for the magic string at the end of the postscript.
            if (Text.decode(array, offset, len).equals(orcMagic)) {
                return true;
            }
            else {
                // If it isn't there, this may be the 0.11.0 version of ORC.
                // Read the first 3 bytes of the file to check for the header
                in.seek(0);
                byte[] header = new byte;
                in.readFully(header, 0, len);
                // if it isn't there, this isn't an ORC file
                if (Text.decode(header, 0, len).equals(orcMagic)) {
                  return true;
                }
            }
      }
      catch (IOException e) {
            LOG.info("检查文件类型: [{}] 不是ORC File.", file);
      }
      return false;
    }

    // 判断file是否是RC file
    private boolean isRCFile(String filepath, FSDataInputStream in)
    {

      // The first version of RCFile used the sequence file header.
      final byte[] originalMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
      // The 'magic' bytes at the beginning of the RCFile
      final byte[] rcMagic = {(byte) 'R', (byte) 'C', (byte) 'F'};
      // the version that was included with the original magic, which is mapped
      // into ORIGINAL_VERSION
      final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
      // All the versions should be place in this list.
      final int ORIGINAL_VERSION = 0;// version with SEQ
      // version with RCF
      // final int NEW_MAGIC_VERSION = 1
      // final int CURRENT_VERSION = NEW_MAGIC_VERSION
      final int CURRENT_VERSION = 1;
      byte version;

      byte[] magic = new byte;
      try {
            in.seek(0);
            in.readFully(magic);

            if (Arrays.equals(magic, originalMagic)) {
                if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
                  return false;
                }
                version = ORIGINAL_VERSION;
            }
            else {
                if (!Arrays.equals(magic, rcMagic)) {
                  return false;
                }

                // Set 'version'
                version = in.readByte();
                if (version > CURRENT_VERSION) {
                  return false;
                }
            }

            if (version == ORIGINAL_VERSION) {
                try {
                  Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in));
                  Class<?> valCls = hadoopConf.getClassByName(Text.readString(in));
                  if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) {
                        return false;
                  }
                }
                catch (ClassNotFoundException e) {
                  return false;
                }
            }
//            boolean decompress = in.readBoolean(); // is compressed?
            if (version == ORIGINAL_VERSION) {
                // is block-compressed? it should be always false.
                boolean blkCompressed = in.readBoolean();
                return !blkCompressed;
            }
            return true;
      }
      catch (IOException e) {
            LOG.info("检查文件类型: [{}] 不是RC File.", filepath);
      }
      return false;
    }

    // 判断file是否是Sequence file
    private boolean isSequenceFile(Path filepath, FSDataInputStream in)
    {
      final byte[] seqMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
      byte[] magic = new byte;
      try {
            in.seek(0);
            in.readFully(magic);
            return Arrays.equals(magic, seqMagic);
      }
      catch (IOException e) {
            LOG.info("检查文件类型: [{}] 不是Sequence File.", filepath);
      }
      return false;
    }

    //判断是否为parquet(考虑判断parquet文件的schema是否不为空)
    private boolean isParquetFile(Path file)
    {
      try {
            GroupReadSupport readSupport = new GroupReadSupport();
            ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, file);
            ParquetReader<Group> build = reader.build();
            if (build.read() != null) {
                return true;
            }
      }
      catch (IOException e) {
            LOG.info("检查文件类型: [{}] 不是Parquet File.", file);
      }
      return false;
    }

    /**
   * Returns GMT's timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos).
   *
   * @param timestampBinary INT96 parquet timestamp
   * @return timestamp in millis, GMT timezone
   */
    public static long getTimestampMillis(Binary timestampBinary)
    {
      if (timestampBinary.length() != 12) {
            return 0;
      }
      byte[] bytes = timestampBinary.getBytes();

      return getTimestampMills(bytes);
    }

    public static long getTimestampMills(byte[] bytes)
    {
      assert bytes.length == 12;
      // little endian encoding - need to invert byte order
      long timeOfDayNanos = Longs.fromBytes(bytes, bytes, bytes, bytes, bytes, bytes, bytes, bytes);
      int julianDay = Ints.fromBytes(bytes, bytes, bytes, bytes);

      return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
    }

    private static long julianDayToMillis(int julianDay)
    {
      return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
    }
}HdfsConstant

package com.alibaba.datax.plugin.reader.hdfsreader;

import com.alibaba.datax.common.base.Constant;
import java.util.Arrays;
import java.util.List;

public class HdfsConstant
      extends Constant
{

    public static final String SOURCE_FILES = "sourceFiles";
    public static final String TEXT = "TEXT";
    public static final String ORC = "ORC";
    public static final String CSV = "CSV";
    public static final String SEQ = "SEQ";
    public static final String RC = "RC";
    public static final String PARQUET = "PARQUET"; //新增parquet文件类型
    public static final String HDFS_DEFAULT_KEY = "fs.defaultFS";
    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
    protected static final List<String> SUPPORT_FILE_TYPE =
            Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET);

    private HdfsConstant() {}
}HdfsReader

package com.alibaba.datax.plugin.reader.hdfsreader;

import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.storage.reader.StorageReaderUtil;
import com.alibaba.datax.storage.util.FileHelper;
import org.apache.commons.io.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

import static com.alibaba.datax.common.base.Key.COLUMN;
import static com.alibaba.datax.common.base.Key.ENCODING;
import static com.alibaba.datax.common.base.Key.INDEX;
import static com.alibaba.datax.common.base.Key.TYPE;
import static com.alibaba.datax.common.base.Key.VALUE;

public class HdfsReader
      extends Reader
{

    /**
   * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
   * <p>
   * 整个 Reader 执行流程是:
   * <pre>
   * Job类init-->prepare-->split
   * Task类init-->prepare-->startRead-->post-->destroy
   * Task类init-->prepare-->startRead-->post-->destroy
   * Job类post-->destroy
   * </pre>
   */
    public static class Job
            extends Reader.Job
    {
      private static final Logger LOG = LoggerFactory.getLogger(Job.class);

      private Configuration readerOriginConfig = null;
      private HashSet<String> sourceFiles;
      private String specifiedFileType = null;
      private DFSUtil dfsUtil = null;
      private List<String> path = null;

      @Override
      public void init()
      {

            LOG.info("init() begin...");
            this.readerOriginConfig = getPluginJobConf();
            validate();
            dfsUtil = new DFSUtil(readerOriginConfig);
            LOG.info("init() ok and end...");
      }

      public void validate()
      {
            readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);

            // path check
            String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
            if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) {
                path = Collections.singletonList(pathInString);
            }
            else {
                path = readerOriginConfig.getList(Key.PATH, String.class);
                if (null == path || path.isEmpty()) {
                  throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待读取的源目录或文件");
                }
                for (String eachPath : path) {
                  if (!eachPath.startsWith("/")) {
                        String message = String.format("请检查参数path:[%s],需要配置为绝对路径", eachPath);
                        LOG.error(message);
                        throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message);
                  }
                }
            }

            specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase();
            if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) {
                String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 几种格式的文件";
                throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message);
            }

            String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8");

            try {
                Charsets.toCharset(encoding);
            }
            catch (UnsupportedCharsetException uce) {
                throw DataXException.asDataXException(
                        HdfsReaderErrorCode.ILLEGAL_VALUE,
                        String.format("不支持的编码格式 : [%s]", encoding), uce);
            }
            catch (Exception e) {
                throw DataXException.asDataXException(
                        HdfsReaderErrorCode.ILLEGAL_VALUE,
                        String.format("运行配置异常 : %s", e.getMessage()), e);
            }
            //check Kerberos
            boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false);
            if (haveKerberos) {
                readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
                readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE);
            }

            // validate the Columns
            validateColumns();

            // validate compress
            String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE");
            if ("gzip".equalsIgnoreCase(compress)) {
                // correct to gz
                readerOriginConfig.set(Key.COMPRESS, "gz");
            }
      }

      private void validateColumns()
      {

            // 检测是column 是否为 ["*"] 若是则填为空
            List<Configuration> column = this.readerOriginConfig.getListConfiguration(COLUMN);
            if (null != column && 1 == column.size()
                  && ("\"*\"".equals(column.get(0).toString()) || "'*'".equals(column.get(0).toString()))) {
                readerOriginConfig.set(COLUMN, new ArrayList<String>());
            }
            else {
                // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value
                List<Configuration> columns = readerOriginConfig.getListConfiguration(COLUMN);

                if (null == columns || columns.isEmpty()) {
                  throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns");
                }

                for (Configuration eachColumnConf : columns) {
                  eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
                  Integer columnIndex = eachColumnConf.getInt(INDEX);
                  String columnValue = eachColumnConf.getString(VALUE);

                  if (null == columnIndex && null == columnValue) {
                        throw DataXException.asDataXException(
                              HdfsReaderErrorCode.NO_INDEX_VALUE,
                              "由于您配置了type, 则至少需要配置 index 或 value, 当前配置为:" + eachColumnConf);
                  }

                  if (null != columnIndex && null != columnValue) {
                        throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE,
                              "您混合配置了index, value, 每一列同时仅能选择其中一种");
                  }
                }
            }
      }

      @Override
      public void prepare()
      {
            LOG.info("prepare(), start to getAllFiles...");
            this.sourceFiles = (HashSet<String>) dfsUtil.getAllFiles(path, specifiedFileType);
            LOG.info("您即将读取的文件数为: [{}], 列表为: [{}]", sourceFiles.size(), sourceFiles);
      }

      @Override
      public List<Configuration> split(int adviceNumber)
      {

            LOG.info("split() begin...");
            List<Configuration> readerSplitConfigs = new ArrayList<>();
            // warn:每个slice拖且仅拖一个文件,
            int splitNumber = sourceFiles.size();
            if (0 == splitNumber) {
                throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
                        String.format("未能找到待读取的文件,请确认您的配置项path: %s", readerOriginConfig.getString(Key.PATH)));
            }

            List<List<String>> splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber);
            for (List<String> files : splitSourceFiles) {
                Configuration splitConfig = readerOriginConfig.clone();
                splitConfig.set(HdfsConstant.SOURCE_FILES, files);
                readerSplitConfigs.add(splitConfig);
            }

            return readerSplitConfigs;
      }

      @Override
      public void post()
      {
            //
      }

      @Override
      public void destroy()
      {
            //
      }
    }

    public static class Task
            extends Reader.Task
    {

      private static final Logger LOG = LoggerFactory.getLogger(Task.class);
      private Configuration taskConfig;
      private List<String> sourceFiles;
      private String specifiedFileType;
      private DFSUtil dfsUtil = null;

      @Override
      public void init()
      {

            this.taskConfig = getPluginJobConf();
            this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class);
            this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
            this.dfsUtil = new DFSUtil(taskConfig);
      }

      @Override
      public void prepare()
      {
            //
      }

      @Override
      public void startRead(RecordSender recordSender)
      {

            LOG.info("read start");
            for (String sourceFile : this.sourceFiles) {
                LOG.info("reading file : [{}]", sourceFile);

                if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) {
                  InputStream inputStream = dfsUtil.getInputStream(sourceFile);
                  StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) {

                  dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) {

                  dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) {

                  dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) {
                  dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else {
                  String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六种格式的文件," +
                            "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE, RC, PARQUET";
                  throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
                }

                if (recordSender != null) {
                  recordSender.flush();
                }
            }

            LOG.info("end read source files...");
      }

      @Override
      public void post()
      {
            //
      }

      @Override
      public void destroy()
      {
            //
      }
    }
}HdfsWriter插件

    本插件比较简单,一共五个类,具体类名及对应修改项如下:

[*]HdfsHelper:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
[*]HdfsWriter:增加Parquet文件类的枚举项。
[*]SupportHiveDataType:无需更改。
[*]HdfsWriterErrorCode:无需更改。
[*]Type:无需更改。
    按需修改其中四个类即可,具体代码如下:
HdfsHelper

package com.alibaba.datax.plugin.writer.hdfswriter;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.datax.common.base.Constant;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.element.Column;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordReceiver;import com.alibaba.datax.common.plugin.TaskPluginCollector;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;import org.apache.avro.Conversions;import org.apache.avro.LogicalTypes;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.avro.generic.GenericRecordBuilder;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.Validate;import org.apache.commons.lang3.tuple.MutablePair;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.hadoop.hive.common.type.HiveDecimal;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobContext;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.security.authentication.util.KerberosName;import org.apache.orc.CompressionKind;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import org.apache.parquet.avro.AvroParquetWriter;import org.apache.parquet.column.ParquetProperties;import org.apache.parquet.hadoop.ParquetWriter;import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType;import org.apache.parquet.schema.Types;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Field;import java.math.BigDecimal;import java.nio.charset.StandardCharsets;import java.sql.Timestamp;import java.text.SimpleDateFormat;import java.util.*;public class HdfsHelper{    public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class);    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";    public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS";    private FileSystem fileSystem = null;    private JobConf conf = null;    private org.apache.hadoop.conf.Configuration hadoopConf = null;    // Kerberos    private boolean haveKerberos = false;    private String kerberosKeytabFilePath;    private String kerberosPrincipal;    private String krb5ConfPath;    public static MutablePair transportOneRecord(            Record record, char fieldDelimiter, List columnsConfiguration, TaskPluginCollector taskPluginCollector)    {      MutablePair transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector);      //保存      MutablePair transportResult = new MutablePair();      transportResult.setRight(false);      Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));      transportResult.setRight(transportResultList.getRight());      transportResult.setLeft(recordResult);      return transportResult;    }    public static MutablePair transportOneRecord(            Record record, List columnsConfiguration,            TaskPluginCollector taskPluginCollector)    {      MutablePair transportResult = new MutablePair();      transportResult.setRight(false);      List recordList = new ArrayList();      int recordLength = record.getColumnNumber();      if (0 != recordLength) {            Column column;            for (int i = 0; i < recordLength; i++) {                column = record.getColumn(i);                if (null != column.getRawData()) {                  String rowData = column.getRawData().toString();                  SupportHiveDataType columnType = SupportHiveDataType.valueOf(                            columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());                  //根据writer端类型配置做类型转换                  try {                        switch (columnType) {                            case TINYINT:                              recordList.add(Byte.valueOf(rowData));                              break;                            case SMALLINT:                              recordList.add(Short.valueOf(rowData));                              break;                            case INT:                            case INTEGER:                              recordList.add(Integer.valueOf(rowData));                              break;                            case BIGINT:                              recordList.add(column.asLong());                              break;                            case FLOAT:                              recordList.add(Float.valueOf(rowData));                              break;                            case DOUBLE:                              recordList.add(column.asDouble());                              break;                            case STRING:                            case VARCHAR:                            case CHAR:                              recordList.add(column.asString());                              break;                            case DECIMAL:                              recordList.add(HiveDecimal.create(column.asBigDecimal()));                              break;                            case BOOLEAN:                              recordList.add(column.asBoolean());                              break;                            case DATE:                              recordList.add(org.apache.hadoop.hive.common.type.Date.valueOf(column.asString()));                              break;                            case TIMESTAMP:                              recordList.add(Timestamp.valueOf(column.asString()));                              break;                            case BINARY:                              recordList.add(column.asBytes());                              break;                            default:                              throw DataXException                                        .asDataXException(                                                HdfsWriterErrorCode.ILLEGAL_VALUE,                                                String.format(                                                      "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",                                                      columnsConfiguration.get(i).getString(Key.NAME),                                                      columnsConfiguration.get(i).getString(Key.TYPE)));                        }                  }                  catch (Exception e) {                        // warn: 此处认为脏数据                        e.printStackTrace();                        String message = String.format(                              "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].",                              columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());                        taskPluginCollector.collectDirtyRecord(record, message);                        transportResult.setRight(true);                        break;                  }                }                else {                  // warn: it's all ok if nullFormat is null                  recordList.add(null);                }            }      }      transportResult.setLeft(recordList);      return transportResult;    }    public static GenericRecord transportParRecord(            Record record, List columnsConfiguration,            TaskPluginCollector taskPluginCollector, GenericRecordBuilder builder)    {      int recordLength = record.getColumnNumber();      if (0 != recordLength) {            Column column;            for (int i = 0; i < recordLength; i++) {                column = record.getColumn(i);                String colName = columnsConfiguration.get(i).getString(Key.NAME);                String typename = columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase();                if (null == column || column.getRawData() == null) {                  builder.set(colName, null);                }                else {                  String rowData = column.getRawData().toString();                  SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename);                  //根据writer端类型配置做类型转换                  try {                        switch (columnType) {                            case INT:                            case INTEGER:                              builder.set(colName, Integer.valueOf(rowData));                              break;                            case LONG:                              builder.set(colName, column.asLong());                              break;                            case FLOAT:                              builder.set(colName, Float.valueOf(rowData));                              break;                            case DOUBLE:                              builder.set(colName, column.asDouble());                              break;                            case STRING:                              builder.set(colName, column.asString());                              break;                            case DECIMAL:                              builder.set(colName, new BigDecimal(column.asString()).setScale(columnsConfiguration.get(i).getInt(Key.SCALE), BigDecimal.ROUND_HALF_UP));                              break;                            case BOOLEAN:                              builder.set(colName, column.asBoolean());                              break;                            case BINARY:                              builder.set(colName, column.asBytes());                              break;                            case TIMESTAMP:                              builder.set(colName, column.asLong() / 1000);                              break;                            default:                              throw DataXException                                        .asDataXException(                                                HdfsWriterErrorCode.ILLEGAL_VALUE,                                                String.format(                                                      "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.",                                                      columnsConfiguration.get(i).getString(Key.NAME),                                                      columnsConfiguration.get(i).getString(Key.TYPE)));                        }                  }                  catch (Exception e) {                        // warn: 此处认为脏数据                        String message = String.format(                              "字段类型转换错误:目标字段为[%s]类型,实际字段值为[%s].",                              columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());                        taskPluginCollector.collectDirtyRecord(record, message);                        break;                  }                }            }      }      return builder.build();    }    public static String generateParquetSchemaFromColumnAndType(List columns) {      Map decimalColInfo = new HashMap(16);      ColumnTypeUtil.DecimalInfo PARQUET_DEFAULT_DECIMAL_INFO = new ColumnTypeUtil.DecimalInfo(10, 2);      Types.MessageTypeBuilder typeBuilder = Types.buildMessage();      for (Configuration column : columns) {            String name = column.getString("name");            String colType = column.getString("type");            Validate.notNull(name, "column.name can't be null");            Validate.notNull(colType, "column.type can't be null");            switch (colType.toLowerCase()) {                case "tinyint":                case "smallint":                case "int":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(name);                  break;                case "bigint":                case "long":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(name);                  break;                case "float":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(name);                  break;                case "double":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(name);                  break;                case "binary":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);                  break;                case "char":                case "varchar":                case "string":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(name);                  break;                case "boolean":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(name);                  break;                case "timestamp":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT96).named(name);                  break;                case "date":                  typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).as(OriginalType.DATE).named(name);                  break;                default:                  if (ColumnTypeUtil.isDecimalType(colType)) {                        ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(colType, PARQUET_DEFAULT_DECIMAL_INFO);                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)                              .as(OriginalType.DECIMAL)                              .precision(decimalInfo.getPrecision())                              .scale(decimalInfo.getScale())                              .length(HdfsUtil.computeMinBytesForPrecision(decimalInfo.getPrecision()))                              .named(name);                        decimalColInfo.put(name, decimalInfo);                  } else {                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);                  }                  break;            }      }      return typeBuilder.named("m").toString();    }    public FileSystem getFileSystem(String defaultFS, Configuration taskConfig)    {      this.hadoopConf = new org.apache.hadoop.conf.Configuration();      Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);      JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));      if (null != hadoopSiteParams) {            Set paramKeys = hadoopSiteParams.getKeys();            for (String each : paramKeys) {                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));            }      }      this.hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS);      //是否有Kerberos认证      this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);      if (haveKerberos) {            LOG.info("krb5.conf路径:【{}】 \n keytab路径:【{}】 \n principal:【{}】\n",                  taskConfig.getString(Key. KRB5_CONF_FILE_PATH),                  taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH),                  taskConfig.getString(Key.KERBEROS_PRINCIPAL));            this.krb5ConfPath = taskConfig.getString(Key. KRB5_CONF_FILE_PATH);            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);            LOG.info("检测到kerberos认证,正在进行认证");      }      System.setProperty("java.security.krb5.conf",krb5ConfPath);      System.setProperty("sun.security.krb5.Config",krb5ConfPath);      refreshConfig();      this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath,hadoopConf,this.krb5ConfPath);      conf = new JobConf(hadoopConf);      try {            fileSystem = FileSystem.get(conf);      }      catch (IOException e) {            String message = String.format("获取FileSystem时发生网络IO异常,请检查您的网络是否正常!HDFS地址:",                  defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);      }      catch (Exception e) {            String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",                  "message:defaultFS =" + defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);      }      if (null == fileSystem) {            String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: ",                  defaultFS);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, message);      }      return fileSystem;    }    /** 刷新krb内容信息 */    public static void refreshConfig() {      try {            sun.security.krb5.Config.refresh();            Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");            defaultRealmField.setAccessible(true);            defaultRealmField.set(                  null,                  org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());            // reload java.security.auth.login.config            javax.security.auth.login.Configuration.setConfiguration(null);      } catch (Exception e) {            LOG.warn(                  "resetting default realm failed, current default realm will still be used.", e);      }    }    public   void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5ConfPath) {      hadoopConf.set("hadoop.security.authentication", "kerberos");      hadoopConf.set("hive.security.authentication", "kerberos");      hadoopConf.set("hadoop.security.authorization", "true");      hadoopConf.set("dfs.permissions","false");      hadoopConf.set("hadoop.security.auth_to_local","RULE:(.*@CDHDEV.COM)s/.*/hadoop/ \n" +                "      DEFAULT");      if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {            UserGroupInformation.setConfiguration(hadoopConf);            KerberosName.resetDefaultRealm();            try {                LOG.info("开始认证");                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);            }            catch (Exception e) {                LOG.info("kerberos认证失败");                String message = String.format("kerberos认证失败,请检查 " +                              "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",                        kerberosKeytabFilePath, kerberosPrincipal);                e.printStackTrace();                throw DataXException.asDataXException(HdfsWriterErrorCode.KERBEROS_LOGIN_ERROR, message, e);            }      }    }    /**   * 获取指定目录下的文件列表   *   * @param dir 需要搜索的目录   * @return 文件数组,文件是全路径,   * eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile   */    public Path[] hdfsDirList(String dir)    {      Path path = new Path(dir);      Path[] files;      try {            FileStatus[] status = fileSystem.listStatus(path);            files = new Path;            for (int i = 0; i < status.length; i++) {                files = status.getPath();            }      }      catch (IOException e) {            String message = String.format("获取目录[%s]文件列表时发生网络IO异常,请检查您的网络是否正常!", dir);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);      }      return files;    }//    public boolean isPathExists(String filePath) {////      Path path = new Path(filePath);//      boolean exist;//      try {//            exist = fileSystem.exists(path);//      }//      catch (IOException e) {//            String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!",//                  "message:filePath =" + filePath);//            e.printStackTrace();//            LOG.error(message);//            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);//      }//      return exist;//    }    public boolean isPathDir(String filePath)    {      Path path = new Path(filePath);      boolean isDir;      try {            isDir = fileSystem.getFileStatus(path).isDirectory();      }      catch (IOException e) {            String message = String.format("判断路径[%s]是否是目录时发生网络IO异常,请检查您的网络是否正常!", filePath);            LOG.error(message);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);      }      return isDir;    }    public void deleteFilesFromDir(Path dir)    {      try {            final RemoteIterator files = fileSystem.listFiles(dir, false);            while (files.hasNext()) {                final LocatedFileStatus next = files.next();                fileSystem.deleteOnExit(next.getPath());            }      }      catch (FileNotFoundException fileNotFoundException) {            throw new DataXException(HdfsWriterErrorCode.FILE_NOT_FOUND, fileNotFoundException.getMessage());      }      catch (IOException ioException) {            throw new DataXException(HdfsWriterErrorCode.IO_ERROR, ioException.getMessage());      }    }    public void deleteDir(Path path)    {      LOG.info("start delete tmp dir [{}] .", path);      try {            if (fileSystem.exists(path)) {                fileSystem.delete(path, true);            }      }      catch (Exception e) {            LOG.error("删除临时目录[{}]时发生IO异常,请检查您的网络是否正常!", path);            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);      }      LOG.info("finish delete tmp dir [{}] .", path);    }    /**   * move all files in sourceDir to targetDir   *   * @param sourceDir the source directory   * @param targetDir the target directory   */    public void moveFilesToDest(Path sourceDir, Path targetDir)    {      try {            final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir);            for (FileStatus file : fileStatuses) {                if (file.isFile() && file.getLen() > 0) {                  LOG.info("start move file [{}] to dir [{}].", file.getPath(), targetDir.getName());                  fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName()));                }            }      }      catch (IOException e) {            throw DataXException.asDataXException(HdfsWriterErrorCode.IO_ERROR, e);      }      LOG.info("finish move file(s).");    }    //关闭FileSystem    public void closeFileSystem()    {      try {            fileSystem.close();      }      catch (IOException e) {            LOG.error("关闭FileSystem时发生IO异常,请检查您的网络是否正常!");            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);      }    }    // 写text file类型文件    public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,                                 TaskPluginCollector taskPluginCollector)    {      char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);      List columns = config.getListConfiguration(Key.COLUMN);      String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase().trim();      SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");      String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0";      conf.set(JobContext.TASK_ATTEMPT_ID, attempt);      if (!"NONE".equals(compress)) {            // fileName must remove suffix, because the FileOutputFormat will add suffix            fileName = fileName.substring(0, fileName.lastIndexOf("."));            Class
页: [1]
查看完整版本: DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写