马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
一、研发背景
DataX官方开源的版本支持HDFS文件的读写,但是截止目前,并没有支持Parquet文件的读写,得益于DataX出色的数据同步性能,去年公司的项目大部分采用了DataX作为数据同步工具,但是从CDH集群同步Parquet或者将其他数据源的数据以Parquet格式写入HDFS,这两个常用场景没有进行支持。因此只能自己动手,补充HdfsReader和HdfsWriter插件,以支持Parquet文件的读写。
二、HdfsReader插件
本插件比较简单,一共五个类,具体类名及对应修改项如下:
- DFSUtil:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
- HdfsConstant:增加Parquet文件类的枚举项。
- HdfsReader:增加判断是否配置为Parquet文件类型的判断条件分支。
- HdfsReaderErrorCode:无需更改。
- Type:无需更改。
按需修改其中四个类即可,具体代码如下:
DFSUtil
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.google.common.primitives.Ints;
- import com.google.common.primitives.Longs;
- import com.alibaba.datax.common.base.Key;
- import com.alibaba.datax.common.element.BoolColumn;
- import com.alibaba.datax.common.element.BytesColumn;
- import com.alibaba.datax.common.element.Column;
- import com.alibaba.datax.common.element.ColumnEntry;
- import com.alibaba.datax.common.element.DateColumn;
- import com.alibaba.datax.common.element.DoubleColumn;
- import com.alibaba.datax.common.element.LongColumn;
- import com.alibaba.datax.common.element.Record;
- import com.alibaba.datax.common.element.StringColumn;
- import com.alibaba.datax.common.exception.DataXException;
- import com.alibaba.datax.common.plugin.RecordSender;
- import com.alibaba.datax.common.plugin.TaskPluginCollector;
- import com.alibaba.datax.common.util.Configuration;
- import com.alibaba.datax.storage.reader.StorageReaderErrorCode;
- import com.alibaba.datax.storage.reader.StorageReaderUtil;
- import org.apache.avro.Conversions;
- import org.apache.avro.Schema;
- import org.apache.avro.generic.GenericData;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
- import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
- import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
- import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
- import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
- import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
- import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
- import org.apache.hadoop.hive.ql.io.RCFile;
- import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
- import org.apache.hadoop.hive.ql.io.orc.OrcFile;
- import org.apache.hadoop.hive.ql.io.orc.Reader;
- import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
- import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.security.UserGroupInformation;
- import org.apache.hadoop.util.ReflectionUtils;
- import org.apache.orc.TypeDescription;
- import org.apache.parquet.avro.AvroParquetReader;
- import org.apache.parquet.example.data.Group;
- import org.apache.parquet.hadoop.ParquetReader;
- import org.apache.parquet.hadoop.example.GroupReadSupport;
- import org.apache.parquet.hadoop.util.HadoopInputFile;
- import org.apache.parquet.io.api.Binary;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.io.InputStream;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.nio.ByteBuffer;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Set;
- import java.util.concurrent.TimeUnit;
- import static com.alibaba.datax.common.base.Key.COLUMN;
- import static com.alibaba.datax.common.base.Key.NULL_FORMAT;
- public class DFSUtil
- {
- private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);
- // the offset of julian, 2440588 is 1970/1/1
- private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
- private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
- private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
- private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
- private final org.apache.hadoop.conf.Configuration hadoopConf;
- private final boolean haveKerberos;
- private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>();
- private String specifiedFileType = null;
- private String kerberosKeytabFilePath;
- private String kerberosPrincipal;
- public DFSUtil(Configuration taskConfig)
- {
- hadoopConf = new org.apache.hadoop.conf.Configuration();
- //io.file.buffer.size 性能参数
- //http://blog.csdn.net/yangjl38/article/details/7583374
- Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
- JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
- if (null != hadoopSiteParams) {
- Set<String> paramKeys = hadoopSiteParams.getKeys();
- for (String each : paramKeys) {
- hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
- }
- }
- hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));
- //是否有Kerberos认证
- this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
- if (haveKerberos) {
- this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
- this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
- this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
- }
- this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
- LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf));
- }
- private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath)
- {
- if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
- UserGroupInformation.setConfiguration(hadoopConf);
- try {
- UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
- }
- catch (Exception e) {
- String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
- kerberosKeytabFilePath, kerberosPrincipal);
- throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
- }
- }
- }
- /**
- * 获取指定路径列表下符合条件的所有文件的绝对路径
- *
- * @param srcPaths 路径列表
- * @param specifiedFileType 指定文件类型
- * @return set of string
- */
- public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType)
- {
- this.specifiedFileType = specifiedFileType;
- if (!srcPaths.isEmpty()) {
- for (String eachPath : srcPaths) {
- LOG.info("get HDFS all files in path = [{}]", eachPath);
- getHDFSAllFiles(eachPath);
- }
- }
- return sourceHDFSAllFilesList;
- }
- private void addSourceFileIfNotEmpty(FileStatus f)
- {
- if (f.isFile()) {
- String filePath = f.getPath().toString();
- if (f.getLen() > 0) {
- addSourceFileByType(filePath);
- }
- else {
- LOG.warn("文件[{}]长度为0,将会跳过不作处理!", filePath);
- }
- }
- }
- public void getHDFSAllFiles(String hdfsPath)
- {
- try {
- FileSystem hdfs = FileSystem.get(hadoopConf);
- //判断hdfsPath是否包含正则符号
- if (hdfsPath.contains("*") || hdfsPath.contains("?")) {
- Path path = new Path(hdfsPath);
- FileStatus[] stats = hdfs.globStatus(path);
- for (FileStatus f : stats) {
- if (f.isFile()) {
- addSourceFileIfNotEmpty(f);
- }
- else if (f.isDirectory()) {
- getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
- }
- }
- }
- else {
- getHDFSAllFilesNORegex(hdfsPath, hdfs);
- }
- }
- catch (IOException e) {
- String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," +
- "是否有读写权限,网络是否已断开!", hdfsPath);
- LOG.error(message);
- throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e);
- }
- }
- private void getHDFSAllFilesNORegex(String path, FileSystem hdfs)
- throws IOException
- {
- // 获取要读取的文件的根目录
- Path listFiles = new Path(path);
- // If the network disconnected, this method will retry 45 times
- // each time the retry interval for 20 seconds
- // 获取要读取的文件的根目录的所有二级子文件目录
- FileStatus[] stats = hdfs.listStatus(listFiles);
- for (FileStatus f : stats) {
- // 判断是不是目录,如果是目录,递归调用
- if (f.isDirectory()) {
- LOG.info("[{}] 是目录, 递归获取该目录下的文件", f.getPath());
- getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
- }
- else if (f.isFile()) {
- addSourceFileIfNotEmpty(f);
- }
- else {
- String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。", f.getPath());
- LOG.info(message);
- }
- }
- }
- // 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList
- private void addSourceFileByType(String filePath)
- {
- // 检查file的类型和用户配置的fileType类型是否一致
- boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
- if (isMatchedFileType) {
- String msg = String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType);
- LOG.info(msg);
- sourceHDFSAllFilesList.add(filePath);
- }
- else {
- String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +
- "请确认您配置的目录下面所有文件的类型均为[%s]"
- , filePath, this.specifiedFileType);
- LOG.error(message);
- throw DataXException.asDataXException(
- HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
- }
- }
- public InputStream getInputStream(String filepath)
- {
- InputStream inputStream;
- Path path = new Path(filepath);
- try {
- FileSystem fs = FileSystem.get(hadoopConf);
- //If the network disconnected, this method will retry 45 times
- //each time the retry interval for 20 seconds
- inputStream = fs.open(path);
- return inputStream;
- }
- catch (IOException e) {
- String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath);
- throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
- }
- }
- public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,
- RecordSender recordSender, TaskPluginCollector taskPluginCollector)
- {
- LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath);
- Path seqFilePath = new Path(sourceSequenceFilePath);
- try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf,
- SequenceFile.Reader.file(seqFilePath))) {
- //获取SequenceFile.Reader实例
- //获取key 与 value
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
- Text value = new Text();
- while (reader.next(key, value)) {
- if (StringUtils.isNotBlank(value.toString())) {
- StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString());
- }
- }
- }
- catch (Exception e) {
- String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath);
- LOG.error(message);
- throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e);
- }
- }
- public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
- RecordSender recordSender, TaskPluginCollector taskPluginCollector)
- {
- LOG.info("Start Read rc-file [{}].", sourceRcFilePath);
- List<ColumnEntry> column = StorageReaderUtil
- .getListColumnEntry(readerSliceConfig, COLUMN);
- // warn: no default value '\N'
- String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
- Path rcFilePath = new Path(sourceRcFilePath);
- RCFileRecordReader recordReader = null;
- try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) {
- long fileLen = fs.getFileStatus(rcFilePath).getLen();
- FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);
- recordReader = new RCFileRecordReader(hadoopConf, split);
- LongWritable key = new LongWritable();
- BytesRefArrayWritable value = new BytesRefArrayWritable();
- Text txt = new Text();
- while (recordReader.next(key, value)) {
- String[] sourceLine = new String[value.size()];
- txt.clear();
- for (int i = 0; i < value.size(); i++) {
- BytesRefWritable v = value.get(i);
- txt.set(v.getData(), v.getStart(), v.getLength());
- sourceLine[i] = txt.toString();
- }
- StorageReaderUtil.transportOneRecord(recordSender,
- column, sourceLine, nullFormat, taskPluginCollector);
- }
- }
- catch (IOException e) {
- String message = String.format("读取文件[%s]时出错", sourceRcFilePath);
- LOG.error(message);
- throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e);
- }
- finally {
- try {
- if (recordReader != null) {
- recordReader.close();
- LOG.info("Finally, Close RCFileRecordReader.");
- }
- }
- catch (IOException e) {
- LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage()));
- }
- }
- }
- public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
- RecordSender recordSender, TaskPluginCollector taskPluginCollector)
- {
- LOG.info("Start Read orc-file [{}].", sourceOrcFilePath);
- List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
- String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
- try {
- Path orcFilePath = new Path(sourceOrcFilePath);
- Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));
- TypeDescription schema = reader.getSchema();
- assert column != null;
- if (column.isEmpty()) {
- for (int i = 0; i < schema.getChildren().size(); i++) {
- ColumnEntry columnEntry = new ColumnEntry();
- columnEntry.setIndex(i);
- columnEntry.setType(schema.getChildren().get(i).getCategory().getName());
- column.add(columnEntry);
- }
- }
- VectorizedRowBatch rowBatch = schema.createRowBatch(1024);
- org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));
- while (rowIterator.nextBatch(rowBatch)) {
- transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);
- }
- }
- catch (Exception e) {
- String message = String.format("从orc-file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
- , sourceOrcFilePath);
- LOG.error(message);
- throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
- }
- }
- private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender,
- TaskPluginCollector taskPluginCollector, String nullFormat)
- {
- Record record;
- for (int row = 0; row < rowBatch.size; row++) {
- record = recordSender.createRecord();
- try {
- for (ColumnEntry column : columns) {
- Column columnGenerated;
- if (column.getValue() != null) {
- if (!"null".equals(column.getValue())) {
- columnGenerated = new StringColumn(column.getValue());
- }
- else {
- columnGenerated = new StringColumn();
- }
- record.addColumn(columnGenerated);
- continue;
- }
- int i = column.getIndex();
- String columnType = column.getType().toUpperCase();
- ColumnVector col = rowBatch.cols[i];
- Type type = Type.valueOf(columnType);
- if (col.isNull[row]) {
- record.addColumn(new StringColumn(null));
- continue;
- }
- switch (type) {
- case INT:
- case LONG:
- case BOOLEAN:
- case BIGINT:
- columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]);
- break;
- case DATE:
- columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row]));
- break;
- case DOUBLE:
- columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]);
- break;
- case DECIMAL:
- columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue());
- break;
- case BINARY:
- BytesColumnVector b = (BytesColumnVector) col;
- byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]);
- columnGenerated = new BytesColumn(val);
- break;
- case TIMESTAMP:
- columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row));
- break;
- default:
- // type is string or other
- String v = ((BytesColumnVector) col).toString(row);
- columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v);
- break;
- }
- record.addColumn(columnGenerated);
- }
- recordSender.sendToWriter(record);
- }
- catch (Exception e) {
- if (e instanceof DataXException) {
- throw (DataXException) e;
- }
- taskPluginCollector.collectDirtyRecord(record, e.getMessage());
- }
- }
- }
- public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
- RecordSender recordSender, TaskPluginCollector taskPluginCollector)
- {
- LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath);
- List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
- String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
- Path parquetFilePath = new Path(sourceParquetFilePath);
- hadoopConf.set("parquet.avro.readInt96AsFixed", "true");
- JobConf conf = new JobConf(hadoopConf);
- GenericData decimalSupport = new GenericData();
- decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
- try (ParquetReader<GenericData.Record> reader = AvroParquetReader
- .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))
- .withDataModel(decimalSupport)
- .withConf(conf)
- .build()) {
- GenericData.Record gRecord = reader.read();
- Schema schema = gRecord.getSchema();
- if (null == column || column.isEmpty()) {
- column = new ArrayList<>(schema.getFields().size());
- String sType;
- // 用户没有填写具体的字段信息,需要从parquet文件构建
- for (int i = 0; i < schema.getFields().size(); i++) {
- ColumnEntry columnEntry = new ColumnEntry();
- columnEntry.setIndex(i);
- Schema type;
- if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {
- type = schema.getFields().get(i).schema().getTypes().get(1);
- }
- else {
- type = schema.getFields().get(i).schema();
- }
- sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
- if (sType.startsWith("timestamp")) {
- columnEntry.setType("timestamp");
- }
- else {
- columnEntry.setType(sType);
- }
- column.add(columnEntry);
- }
- }
- while (gRecord != null) {
- transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);
- gRecord = reader.read();
- }
- }
- catch (IOException e) {
- String message = String.format("从parquet file文件路径[%s]中读取数据发生异常,请联系系统管理员。"
- , sourceParquetFilePath);
- LOG.error(message);
- throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
- }
- }
- /*
- * create a transport record for Parquet file
- *
- *
- */
- private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender,
- TaskPluginCollector taskPluginCollector, String nullFormat)
- {
- Record record = recordSender.createRecord();
- Column columnGenerated;
- int scale = 10;
- try {
- for (ColumnEntry columnEntry : columnConfigs) {
- String columnType = columnEntry.getType();
- Integer columnIndex = columnEntry.getIndex();
- String columnConst = columnEntry.getValue();
- String columnValue = null;
- if (null != columnIndex) {
- if (null != gRecord.get(columnIndex)) {
- columnValue = gRecord.get(columnIndex).toString();
- }
- else {
- record.addColumn(new StringColumn(null));
- continue;
- }
- }
- else {
- columnValue = columnConst;
- }
- if (columnType.startsWith("decimal(")) {
- String ps = columnType.replace("decimal(", "").replace(")", "");
- columnType = "decimal";
- if (ps.contains(",")) {
- scale = Integer.parseInt(ps.split(",")[1].trim());
- }
- else {
- scale = 0;
- }
- }
- Type type = Type.valueOf(columnType.toUpperCase());
- if (StringUtils.equals(columnValue, nullFormat)) {
- columnValue = null;
- }
- try {
- switch (type) {
- case STRING:
- columnGenerated = new StringColumn(columnValue);
- break;
- case INT:
- case LONG:
- columnGenerated = new LongColumn(columnValue);
- break;
- case DOUBLE:
- columnGenerated = new DoubleColumn(columnValue);
- break;
- case DECIMAL:
- if (null == columnValue) {
- columnGenerated = new DoubleColumn((Double) null);
- }
- else {
- columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));
- }
- break;
- case BOOLEAN:
- columnGenerated = new BoolColumn(columnValue);
- break;
- case DATE:
- if (columnValue == null) {
- columnGenerated = new DateColumn((Date) null);
- }
- else {
- String formatString = columnEntry.getFormat();
- if (StringUtils.isNotBlank(formatString)) {
- // 用户自己配置的格式转换
- SimpleDateFormat format = new SimpleDateFormat(
- formatString);
- columnGenerated = new DateColumn(
- format.parse(columnValue));
- }
- else {
- // 框架尝试转换
- columnGenerated = new DateColumn(new StringColumn(columnValue).asDate());
- }
- }
- break;
- case TIMESTAMP:
- if (null == columnValue) {
- columnGenerated = new DateColumn();
- }
- else if (columnValue.startsWith("[")) {
- // INT96 https://github.com/apache/parquet-mr/pull/901
- GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex);
- Date date = new Date(getTimestampMills(fixed.bytes()));
- columnGenerated = new DateColumn(date);
- }
- else {
- columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
- }
- break;
- case BINARY:
- columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
- break;
- default:
- String errorMessage = String.format("您配置的列类型暂不支持 : [%s]", columnType);
- LOG.error(errorMessage);
- throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage);
- }
- }
- catch (Exception e) {
- throw new IllegalArgumentException(String.format(
- "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e));
- }
- record.addColumn(columnGenerated);
- } // end for
- recordSender.sendToWriter(record);
- }
- catch (IllegalArgumentException | IndexOutOfBoundsException iae) {
- taskPluginCollector.collectDirtyRecord(record, iae.getMessage());
- }
- catch (Exception e) {
- if (e instanceof DataXException) {
- throw (DataXException) e;
- }
- // 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
- taskPluginCollector.collectDirtyRecord(record, e.getMessage());
- }
- }
- private TypeDescription getOrcSchema(String filePath)
- {
- Path path = new Path(filePath);
- try {
- Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));
- // return reader.getTypes().get(0).getSubtypesCount()
- return reader.getSchema();
- }
- catch (IOException e) {
- String message = "读取orc-file column列数失败,请联系系统管理员";
- throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
- }
- }
- public boolean checkHdfsFileType(String filepath, String specifiedFileType)
- {
- Path file = new Path(filepath);
- try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) {
- if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) {
- return isORCFile(file, fs, in);
- }
- else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) {
- return isRCFile(filepath, in);
- }
- else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) {
- return isSequenceFile(file, in);
- }
- else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) {
- return isParquetFile(file);
- }
- else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV)
- || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) {
- return true;
- }
- }
- catch (Exception e) {
- String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," +
- "请检查您文件类型和文件是否正确。", filepath, HdfsConstant.SUPPORT_FILE_TYPE);
- LOG.error(message);
- throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
- }
- return false;
- }
- // 判断file是否是ORC File
- private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in)
- {
- try {
- // figure out the size of the file using the option or filesystem
- long size = fs.getFileStatus(file).getLen();
- //read last bytes into buffer to get PostScript
- int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
- in.seek(size - readSize);
- ByteBuffer buffer = ByteBuffer.allocate(readSize);
- in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- //read the PostScript
- //get length of PostScript
- int psLen = buffer.get(readSize - 1) & 0xff;
- String orcMagic = org.apache.orc.OrcFile.MAGIC;
- int len = orcMagic.length();
- if (psLen < len + 1) {
- return false;
- }
- int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1
- - len;
- byte[] array = buffer.array();
- // now look for the magic string at the end of the postscript.
- if (Text.decode(array, offset, len).equals(orcMagic)) {
- return true;
- }
- else {
- // If it isn't there, this may be the 0.11.0 version of ORC.
- // Read the first 3 bytes of the file to check for the header
- in.seek(0);
- byte[] header = new byte[len];
- in.readFully(header, 0, len);
- // if it isn't there, this isn't an ORC file
- if (Text.decode(header, 0, len).equals(orcMagic)) {
- return true;
- }
- }
- }
- catch (IOException e) {
- LOG.info("检查文件类型: [{}] 不是ORC File.", file);
- }
- return false;
- }
- // 判断file是否是RC file
- private boolean isRCFile(String filepath, FSDataInputStream in)
- {
- // The first version of RCFile used the sequence file header.
- final byte[] originalMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
- // The 'magic' bytes at the beginning of the RCFile
- final byte[] rcMagic = {(byte) 'R', (byte) 'C', (byte) 'F'};
- // the version that was included with the original magic, which is mapped
- // into ORIGINAL_VERSION
- final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
- // All the versions should be place in this list.
- final int ORIGINAL_VERSION = 0; // version with SEQ
- // version with RCF
- // final int NEW_MAGIC_VERSION = 1
- // final int CURRENT_VERSION = NEW_MAGIC_VERSION
- final int CURRENT_VERSION = 1;
- byte version;
- byte[] magic = new byte[rcMagic.length];
- try {
- in.seek(0);
- in.readFully(magic);
- if (Arrays.equals(magic, originalMagic)) {
- if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
- return false;
- }
- version = ORIGINAL_VERSION;
- }
- else {
- if (!Arrays.equals(magic, rcMagic)) {
- return false;
- }
- // Set 'version'
- version = in.readByte();
- if (version > CURRENT_VERSION) {
- return false;
- }
- }
- if (version == ORIGINAL_VERSION) {
- try {
- Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in));
- Class<?> valCls = hadoopConf.getClassByName(Text.readString(in));
- if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) {
- return false;
- }
- }
- catch (ClassNotFoundException e) {
- return false;
- }
- }
- // boolean decompress = in.readBoolean(); // is compressed?
- if (version == ORIGINAL_VERSION) {
- // is block-compressed? it should be always false.
- boolean blkCompressed = in.readBoolean();
- return !blkCompressed;
- }
- return true;
- }
- catch (IOException e) {
- LOG.info("检查文件类型: [{}] 不是RC File.", filepath);
- }
- return false;
- }
- // 判断file是否是Sequence file
- private boolean isSequenceFile(Path filepath, FSDataInputStream in)
- {
- final byte[] seqMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
- byte[] magic = new byte[seqMagic.length];
- try {
- in.seek(0);
- in.readFully(magic);
- return Arrays.equals(magic, seqMagic);
- }
- catch (IOException e) {
- LOG.info("检查文件类型: [{}] 不是Sequence File.", filepath);
- }
- return false;
- }
- //判断是否为parquet(考虑判断parquet文件的schema是否不为空)
- private boolean isParquetFile(Path file)
- {
- try {
- GroupReadSupport readSupport = new GroupReadSupport();
- ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, file);
- ParquetReader<Group> build = reader.build();
- if (build.read() != null) {
- return true;
- }
- }
- catch (IOException e) {
- LOG.info("检查文件类型: [{}] 不是Parquet File.", file);
- }
- return false;
- }
- /**
- * Returns GMT's timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos).
- *
- * @param timestampBinary INT96 parquet timestamp
- * @return timestamp in millis, GMT timezone
- */
- public static long getTimestampMillis(Binary timestampBinary)
- {
- if (timestampBinary.length() != 12) {
- return 0;
- }
- byte[] bytes = timestampBinary.getBytes();
- return getTimestampMills(bytes);
- }
- public static long getTimestampMills(byte[] bytes)
- {
- assert bytes.length == 12;
- // little endian encoding - need to invert byte order
- long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
- int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);
- return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
- }
- private static long julianDayToMillis(int julianDay)
- {
- return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
- }
- }
复制代码 HdfsConstant
- package com.alibaba.datax.plugin.reader.hdfsreader;
- import com.alibaba.datax.common.base.Constant;
- import java.util.Arrays;
- import java.util.List;
- public class HdfsConstant
- extends Constant
- {
- public static final String SOURCE_FILES = "sourceFiles";
- public static final String TEXT = "TEXT";
- public static final String ORC = "ORC";
- public static final String CSV = "CSV";
- public static final String SEQ = "SEQ";
- public static final String RC = "RC";
- public static final String PARQUET = "PARQUET"; //新增parquet文件类型
- public static final String HDFS_DEFAULT_KEY = "fs.defaultFS";
- public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
- protected static final List<String> SUPPORT_FILE_TYPE =
- Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET);
- private HdfsConstant() {}
- }
复制代码 HdfsReader
- package com.alibaba.datax.plugin.reader.hdfsreader;
- import com.alibaba.datax.common.base.Key;
- import com.alibaba.datax.common.exception.DataXException;
- import com.alibaba.datax.common.plugin.RecordSender;
- import com.alibaba.datax.common.spi.Reader;
- import com.alibaba.datax.common.util.Configuration;
- import com.alibaba.datax.storage.reader.StorageReaderUtil;
- import com.alibaba.datax.storage.util.FileHelper;
- import org.apache.commons.io.Charsets;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.InputStream;
- import java.nio.charset.UnsupportedCharsetException;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.List;
- import static com.alibaba.datax.common.base.Key.COLUMN;
- import static com.alibaba.datax.common.base.Key.ENCODING;
- import static com.alibaba.datax.common.base.Key.INDEX;
- import static com.alibaba.datax.common.base.Key.TYPE;
- import static com.alibaba.datax.common.base.Key.VALUE;
- public class HdfsReader
- extends Reader
- {
- /**
- * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
- * <p>
- * 整个 Reader 执行流程是:
- * <pre>
- * Job类init-->prepare-->split
- * Task类init-->prepare-->startRead-->post-->destroy
- * Task类init-->prepare-->startRead-->post-->destroy
- * Job类post-->destroy
- * </pre>
- */
- public static class Job
- extends Reader.Job
- {
- private static final Logger LOG = LoggerFactory.getLogger(Job.class);
- private Configuration readerOriginConfig = null;
- private HashSet<String> sourceFiles;
- private String specifiedFileType = null;
- private DFSUtil dfsUtil = null;
- private List<String> path = null;
- @Override
- public void init()
- {
- LOG.info("init() begin...");
- this.readerOriginConfig = getPluginJobConf();
- validate();
- dfsUtil = new DFSUtil(readerOriginConfig);
- LOG.info("init() ok and end...");
- }
- public void validate()
- {
- readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
- // path check
- String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
- if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) {
- path = Collections.singletonList(pathInString);
- }
- else {
- path = readerOriginConfig.getList(Key.PATH, String.class);
- if (null == path || path.isEmpty()) {
- throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待读取的源目录或文件");
- }
- for (String eachPath : path) {
- if (!eachPath.startsWith("/")) {
- String message = String.format("请检查参数path:[%s],需要配置为绝对路径", eachPath);
- LOG.error(message);
- throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message);
- }
- }
- }
- specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase();
- if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) {
- String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 几种格式的文件";
- throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message);
- }
- String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8");
- try {
- Charsets.toCharset(encoding);
- }
- catch (UnsupportedCharsetException uce) {
- throw DataXException.asDataXException(
- HdfsReaderErrorCode.ILLEGAL_VALUE,
- String.format("不支持的编码格式 : [%s]", encoding), uce);
- }
- catch (Exception e) {
- throw DataXException.asDataXException(
- HdfsReaderErrorCode.ILLEGAL_VALUE,
- String.format("运行配置异常 : %s", e.getMessage()), e);
- }
- //check Kerberos
- boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false);
- if (haveKerberos) {
- readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
- readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE);
- }
- // validate the Columns
- validateColumns();
- // validate compress
- String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE");
- if ("gzip".equalsIgnoreCase(compress)) {
- // correct to gz
- readerOriginConfig.set(Key.COMPRESS, "gz");
- }
- }
- private void validateColumns()
- {
- // 检测是column 是否为 ["*"] 若是则填为空
- List<Configuration> column = this.readerOriginConfig.getListConfiguration(COLUMN);
- if (null != column && 1 == column.size()
- && (""*"".equals(column.get(0).toString()) || "'*'".equals(column.get(0).toString()))) {
- readerOriginConfig.set(COLUMN, new ArrayList<String>());
- }
- else {
- // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value
- List<Configuration> columns = readerOriginConfig.getListConfiguration(COLUMN);
- if (null == columns || columns.isEmpty()) {
- throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns");
- }
- for (Configuration eachColumnConf : columns) {
- eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
- Integer columnIndex = eachColumnConf.getInt(INDEX);
- String columnValue = eachColumnConf.getString(VALUE);
- if (null == columnIndex && null == columnValue) {
- throw DataXException.asDataXException(
- HdfsReaderErrorCode.NO_INDEX_VALUE,
- "由于您配置了type, 则至少需要配置 index 或 value, 当前配置为:" + eachColumnConf);
- }
- if (null != columnIndex && null != columnValue) {
- throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE,
- "您混合配置了index, value, 每一列同时仅能选择其中一种");
- }
- }
- }
- }
- @Override
- public void prepare()
- {
- LOG.info("prepare(), start to getAllFiles...");
- this.sourceFiles = (HashSet<String>) dfsUtil.getAllFiles(path, specifiedFileType);
- LOG.info("您即将读取的文件数为: [{}], 列表为: [{}]", sourceFiles.size(), sourceFiles);
- }
- @Override
- public List<Configuration> split(int adviceNumber)
- {
- LOG.info("split() begin...");
- List<Configuration> readerSplitConfigs = new ArrayList<>();
- // warn:每个slice拖且仅拖一个文件,
- int splitNumber = sourceFiles.size();
- if (0 == splitNumber) {
- throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
- String.format("未能找到待读取的文件,请确认您的配置项path: %s", readerOriginConfig.getString(Key.PATH)));
- }
- List<List<String>> splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber);
- for (List<String> files : splitSourceFiles) {
- Configuration splitConfig = readerOriginConfig.clone();
- splitConfig.set(HdfsConstant.SOURCE_FILES, files);
- readerSplitConfigs.add(splitConfig);
- }
- return readerSplitConfigs;
- }
- @Override
- public void post()
- {
- //
- }
- @Override
- public void destroy()
- {
- //
- }
- }
- public static class Task
- extends Reader.Task
- {
- private static final Logger LOG = LoggerFactory.getLogger(Task.class);
- private Configuration taskConfig;
- private List<String> sourceFiles;
- private String specifiedFileType;
- private DFSUtil dfsUtil = null;
- @Override
- public void init()
- {
- this.taskConfig = getPluginJobConf();
- this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class);
- this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
- this.dfsUtil = new DFSUtil(taskConfig);
- }
- @Override
- public void prepare()
- {
- //
- }
- @Override
- public void startRead(RecordSender recordSender)
- {
- LOG.info("read start");
- for (String sourceFile : this.sourceFiles) {
- LOG.info("reading file : [{}]", sourceFile);
- if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) {
- InputStream inputStream = dfsUtil.getInputStream(sourceFile);
- StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector());
- }
- else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) {
- dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
- }
- else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) {
- dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
- }
- else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) {
- dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
- }
- else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) {
- dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
- }
- else {
- String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六种格式的文件," +
- "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE, RC, PARQUET";
- throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
- }
- if (recordSender != null) {
- recordSender.flush();
- }
- }
- LOG.info("end read source files...");
- }
- @Override
- public void post()
- {
- //
- }
- @Override
- public void destroy()
- {
- //
- }
- }
- }
复制代码 HdfsWriter插件
本插件比较简单,一共五个类,具体类名及对应修改项如下:
- HdfsHelper:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
- HdfsWriter:增加Parquet文件类的枚举项。
- SupportHiveDataType:无需更改。
- HdfsWriterErrorCode:无需更改。
- Type:无需更改。
按需修改其中四个类即可,具体代码如下:
HdfsHelper
[code]package com.alibaba.datax.plugin.writer.hdfswriter;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.alibaba.datax.common.base.Constant;import com.alibaba.datax.common.base.Key;import com.alibaba.datax.common.element.Column;import com.alibaba.datax.common.element.Record;import com.alibaba.datax.common.exception.DataXException;import com.alibaba.datax.common.plugin.RecordReceiver;import com.alibaba.datax.common.plugin.TaskPluginCollector;import com.alibaba.datax.common.util.Configuration;import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;import org.apache.avro.Conversions;import org.apache.avro.LogicalTypes;import org.apache.avro.Schema;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.avro.generic.GenericRecordBuilder;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.Validate;import org.apache.commons.lang3.tuple.MutablePair;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.hadoop.hive.common.type.HiveDecimal;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobContext;import org.apache.hadoop.mapred.RecordWriter;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.security.authentication.util.KerberosName;import org.apache.orc.CompressionKind;import org.apache.orc.OrcFile;import org.apache.orc.TypeDescription;import org.apache.orc.Writer;import org.apache.parquet.avro.AvroParquetWriter;import org.apache.parquet.column.ParquetProperties;import org.apache.parquet.hadoop.ParquetWriter;import org.apache.parquet.hadoop.metadata.CompressionCodecName;import org.apache.parquet.schema.OriginalType;import org.apache.parquet.schema.PrimitiveType;import org.apache.parquet.schema.Types;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.FileNotFoundException;import java.io.IOException;import java.lang.reflect.Field;import java.math.BigDecimal;import java.nio.charset.StandardCharsets;import java.sql.Timestamp;import java.text.SimpleDateFormat;import java.util.*;public class HdfsHelper{ public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class); public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS"; private FileSystem fileSystem = null; private JobConf conf = null; private org.apache.hadoop.conf.Configuration hadoopConf = null; // Kerberos private boolean haveKerberos = false; private String kerberosKeytabFilePath; private String kerberosPrincipal; private String krb5ConfPath; public static MutablePair transportOneRecord( Record record, char fieldDelimiter, List columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector); //保存 MutablePair transportResult = new MutablePair(); transportResult.setRight(false); Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter)); transportResult.setRight(transportResultList.getRight()); transportResult.setLeft(recordResult); return transportResult; } public static MutablePair transportOneRecord( Record record, List columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair transportResult = new MutablePair(); transportResult.setRight(false); List recordList = new ArrayList(); int recordLength = record.getColumnNumber(); if (0 != recordLength) { Column column; for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); if (null != column.getRawData()) { String rowData = column.getRawData().toString(); SupportHiveDataType columnType = SupportHiveDataType.valueOf( columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase()); //根据writer端类型配置做类型转换 try { switch (columnType) { case TINYINT: recordList.add(Byte.valueOf(rowData)); break; case SMALLINT: recordList.add(Short.valueOf(rowData)); break; case INT: case INTEGER: recordList.add(Integer.valueOf(rowData)); break; case BIGINT: recordList.add(column.asLong()); break; case FLOAT: recordList.add(Float.valueOf(rowData)); break; case DOUBLE: recordList.add(column.asDouble()); break; case STRING: case VARCHAR: case CHAR: recordList.add(column.asString()); break; case DECIMAL: recordList.add(HiveDecimal.create(column.asBigDecimal())); break; case BOOLEAN: recordList.add(column.asBoolean()); break; case DATE: recordList.add(org.apache.hadoop.hive.common.type.Date.valueOf(column.asString())); break; case TIMESTAMP: recordList.add(Timestamp.valueOf(column.asString())); break; case BINARY: recordList.add(column.asBytes()); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format( "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", columnsConfiguration.get(i).getString(Key.NAME), columnsConfiguration.get(i).getString(Key.TYPE))); } } catch (Exception e) { // warn: 此处认为脏数据 e.printStackTrace(); String message = String.format( "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].", columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData()); taskPluginCollector.collectDirtyRecord(record, message); transportResult.setRight(true); break; } } else { // warn: it's all ok if nullFormat is null recordList.add(null); } } } transportResult.setLeft(recordList); return transportResult; } public static GenericRecord transportParRecord( Record record, List columnsConfiguration, TaskPluginCollector taskPluginCollector, GenericRecordBuilder builder) { int recordLength = record.getColumnNumber(); if (0 != recordLength) { Column column; for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); String colName = columnsConfiguration.get(i).getString(Key.NAME); String typename = columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase(); if (null == column || column.getRawData() == null) { builder.set(colName, null); } else { String rowData = column.getRawData().toString(); SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename); //根据writer端类型配置做类型转换 try { switch (columnType) { case INT: case INTEGER: builder.set(colName, Integer.valueOf(rowData)); break; case LONG: builder.set(colName, column.asLong()); break; case FLOAT: builder.set(colName, Float.valueOf(rowData)); break; case DOUBLE: builder.set(colName, column.asDouble()); break; case STRING: builder.set(colName, column.asString()); break; case DECIMAL: builder.set(colName, new BigDecimal(column.asString()).setScale(columnsConfiguration.get(i).getInt(Key.SCALE), BigDecimal.ROUND_HALF_UP)); break; case BOOLEAN: builder.set(colName, column.asBoolean()); break; case BINARY: builder.set(colName, column.asBytes()); break; case TIMESTAMP: builder.set(colName, column.asLong() / 1000); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format( "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", columnsConfiguration.get(i).getString(Key.NAME), columnsConfiguration.get(i).getString(Key.TYPE))); } } catch (Exception e) { // warn: 此处认为脏数据 String message = String.format( "字段类型转换错误:目标字段为[%s]类型,实际字段值为[%s].", columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData()); taskPluginCollector.collectDirtyRecord(record, message); break; } } } } return builder.build(); } public static String generateParquetSchemaFromColumnAndType(List columns) { Map decimalColInfo = new HashMap(16); ColumnTypeUtil.DecimalInfo PARQUET_DEFAULT_DECIMAL_INFO = new ColumnTypeUtil.DecimalInfo(10, 2); Types.MessageTypeBuilder typeBuilder = Types.buildMessage(); for (Configuration column : columns) { String name = column.getString("name"); String colType = column.getString("type"); Validate.notNull(name, "column.name can't be null"); Validate.notNull(colType, "column.type can't be null"); switch (colType.toLowerCase()) { case "tinyint": case "smallint": case "int": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(name); break; case "bigint": case "long": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(name); break; case "float": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(name); break; case "double": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(name); break; case "binary": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name); break; case "char": case "varchar": case "string": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(name); break; case "boolean": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(name); break; case "timestamp": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT96).named(name); break; case "date": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).as(OriginalType.DATE).named(name); break; default: if (ColumnTypeUtil.isDecimalType(colType)) { ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(colType, PARQUET_DEFAULT_DECIMAL_INFO); typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .as(OriginalType.DECIMAL) .precision(decimalInfo.getPrecision()) .scale(decimalInfo.getScale()) .length(HdfsUtil.computeMinBytesForPrecision(decimalInfo.getPrecision())) .named(name); decimalColInfo.put(name, decimalInfo); } else { typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name); } break; } } return typeBuilder.named("m").toString(); } public FileSystem getFileSystem(String defaultFS, Configuration taskConfig) { this.hadoopConf = new org.apache.hadoop.conf.Configuration(); Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG); JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG)); if (null != hadoopSiteParams) { Set paramKeys = hadoopSiteParams.getKeys(); for (String each : paramKeys) { hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); } } this.hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS); //是否有Kerberos认证 this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { LOG.info("krb5.conf路径:【{}】 \n keytab路径:【{}】 \n principal:【{}】\n", taskConfig.getString(Key. KRB5_CONF_FILE_PATH), taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH), taskConfig.getString(Key.KERBEROS_PRINCIPAL)); this.krb5ConfPath = taskConfig.getString(Key. KRB5_CONF_FILE_PATH); this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL); LOG.info("检测到kerberos认证,正在进行认证"); } System.setProperty("java.security.krb5.conf",krb5ConfPath); System.setProperty("sun.security.krb5.Config",krb5ConfPath); refreshConfig(); this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath,hadoopConf,this.krb5ConfPath); conf = new JobConf(hadoopConf); try { fileSystem = FileSystem.get(conf); } catch (IOException e) { String message = String.format("获取FileSystem时发生网络IO异常,请检查您的网络是否正常!HDFS地址:[message:defaultFS = %s]", defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } catch (Exception e) { String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]", "message:defaultFS =" + defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } if (null == fileSystem) { String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [message:defaultFS = %s]", defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, message); } return fileSystem; } /** 刷新krb内容信息 */ public static void refreshConfig() { try { sun.security.krb5.Config.refresh(); Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm"); defaultRealmField.setAccessible(true); defaultRealmField.set( null, org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm()); // reload java.security.auth.login.config javax.security.auth.login.Configuration.setConfiguration(null); } catch (Exception e) { LOG.warn( "resetting default realm failed, current default realm will still be used.", e); } } public void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5ConfPath) { hadoopConf.set("hadoop.security.authentication", "kerberos"); hadoopConf.set("hive.security.authentication", "kerberos"); hadoopConf.set("hadoop.security.authorization", "true"); hadoopConf.set("dfs.permissions","false"); hadoopConf.set("hadoop.security.auth_to_local","RULE:[2:$1@$0](.*@CDHDEV.COM)s/.*/hadoop/ \n" + " DEFAULT"); if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); KerberosName.resetDefaultRealm(); try { LOG.info("开始认证"); UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { LOG.info("kerberos认证失败"); String message = String.format("kerberos认证失败,请检查 " + "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]", kerberosKeytabFilePath, kerberosPrincipal); e.printStackTrace(); throw DataXException.asDataXException(HdfsWriterErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } /** * 获取指定目录下的文件列表 * * @param dir 需要搜索的目录 * @return 文件数组,文件是全路径, * eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile */ public Path[] hdfsDirList(String dir) { Path path = new Path(dir); Path[] files; try { FileStatus[] status = fileSystem.listStatus(path); files = new Path[status.length]; for (int i = 0; i < status.length; i++) { files = status.getPath(); } } catch (IOException e) { String message = String.format("获取目录[%s]文件列表时发生网络IO异常,请检查您的网络是否正常!", dir); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return files; }// public boolean isPathExists(String filePath) {//// Path path = new Path(filePath);// boolean exist;// try {// exist = fileSystem.exists(path);// }// catch (IOException e) {// String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!",// "message:filePath =" + filePath);// e.printStackTrace();// LOG.error(message);// throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);// }// return exist;// } public boolean isPathDir(String filePath) { Path path = new Path(filePath); boolean isDir; try { isDir = fileSystem.getFileStatus(path).isDirectory(); } catch (IOException e) { String message = String.format("判断路径[%s]是否是目录时发生网络IO异常,请检查您的网络是否正常!", filePath); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return isDir; } public void deleteFilesFromDir(Path dir) { try { final RemoteIterator files = fileSystem.listFiles(dir, false); while (files.hasNext()) { final LocatedFileStatus next = files.next(); fileSystem.deleteOnExit(next.getPath()); } } catch (FileNotFoundException fileNotFoundException) { throw new DataXException(HdfsWriterErrorCode.FILE_NOT_FOUND, fileNotFoundException.getMessage()); } catch (IOException ioException) { throw new DataXException(HdfsWriterErrorCode.IO_ERROR, ioException.getMessage()); } } public void deleteDir(Path path) { LOG.info("start delete tmp dir [{}] .", path); try { if (fileSystem.exists(path)) { fileSystem.delete(path, true); } } catch (Exception e) { LOG.error("删除临时目录[{}]时发生IO异常,请检查您的网络是否正常!", path); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } LOG.info("finish delete tmp dir [{}] .", path); } /** * move all files in sourceDir to targetDir * * @param sourceDir the source directory * @param targetDir the target directory */ public void moveFilesToDest(Path sourceDir, Path targetDir) { try { final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir); for (FileStatus file : fileStatuses) { if (file.isFile() && file.getLen() > 0) { LOG.info("start move file [{}] to dir [{}].", file.getPath(), targetDir.getName()); fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName())); } } } catch (IOException e) { throw DataXException.asDataXException(HdfsWriterErrorCode.IO_ERROR, e); } LOG.info("finish move file(s)."); } //关闭FileSystem public void closeFileSystem() { try { fileSystem.close(); } catch (IOException e) { LOG.error("关闭FileSystem时发生IO异常,请检查您的网络是否正常!"); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } } // 写text file类型文件 public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER); List columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase().trim(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0"; conf.set(JobContext.TASK_ATTEMPT_ID, attempt); if (!"NONE".equals(compress)) { // fileName must remove suffix, because the FileOutputFormat will add suffix fileName = fileName.substring(0, fileName.lastIndexOf(".")); Class |