使用Mapreduce天生Hfile,并通过Bulkload导入到Hbase+源码剖析

[复制链接]
发表于 2025-8-12 17:03:19 | 显示全部楼层 |阅读模式

  • Mapper类:将输入数据转换为HBase的KeyValue格式
  1. import org.apache.hadoop.hbase.KeyValue;
  2. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  3. import org.apache.hadoop.hbase.util.Bytes;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import java.io.IOException;
  8. public class HFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
  9.    
  10.     private byte[] columnFamily;
  11.     private byte[] columnQualifier;
  12.     private ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
  13.     @Override
  14.     protected void setup(Context context) {
  15.         // 配置列族和列名(根据实际情况修改)
  16.         columnFamily = Bytes.toBytes("cf");
  17.         columnQualifier = Bytes.toBytes("col");
  18.     }
  19.     @Override
  20.     protected void map(LongWritable key, Text value, Context context)
  21.             throws IOException, InterruptedException {
  22.         
  23.         String[] fields = value.toString().split(","); // 假设输入格式为: rowkey,value
  24.         if (fields.length < 2) return;
  25.         String row = fields[0];
  26.         String val = fields[1];
  27.         
  28.         rowKey.set(Bytes.toBytes(row));
  29.         
  30.         // 创建KeyValue对象(时间戳可自定义或自动生成)
  31.         KeyValue kv = new KeyValue(
  32.             Bytes.toBytes(row),    // Row key
  33.             columnFamily,          // Column family
  34.             columnQualifier,       // Column qualifier
  35.             System.currentTimeMillis(), // Timestamp
  36.             Bytes.toBytes(val)     // Value
  37.         );
  38.         
  39.         context.write(rowKey, kv);
  40.     }
  41. }
复制代码


  •   Driver类:配置MapReduce作业
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.hbase.HBaseConfiguration;
  4. import org.apache.hadoop.hbase.TableName;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. public class HFileGenerator {
  13.     public static void main(String[] args) throws Exception {
  14.         Configuration conf = HBaseConfiguration.create();
  15.         // 设置HBase ZooKeeper地址(根据实际情况修改)
  16.         conf.set("hbase.zookeeper.quorum", "zk1.example.com,zk2.example.com");
  17.         
  18.         Job job = Job.getInstance(conf, "Generate HFile for HBase");
  19.         job.setJarByClass(HFileGenerator.class);
  20.         
  21.         // 设置Mapper
  22.         job.setMapperClass(HFileMapper.class);
  23.         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  24.         job.setMapOutputValueClass(KeyValue.class);
  25.         
  26.         // 设置输入格式和路径
  27.         job.setInputFormatClass(TextInputFormat.class);
  28.         FileInputFormat.addInputPath(job, new Path(args[0]));  // 输入路径
  29.         
  30.         // 设置输出格式
  31.         job.setOutputFormatClass(HFileOutputFormat2.class);
  32.         FileOutputFormat.setOutputPath(job, new Path(args[1])); // HFile输出路径
  33.         
  34.         // 获取HBase表信息
  35.         Connection conn = ConnectionFactory.createConnection(conf);
  36.         TableName tableName = TableName.valueOf("your_table_name"); // 目标表名
  37.         
  38.         // 配置HFileOutputFormat
  39.         HFileOutputFormat2.configureIncrementalLoad(
  40.             job,
  41.             conn.getTable(tableName),
  42.             conn.getRegionLocator(tableName)
  43.         );
  44.         
  45.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  46.     }
  47. }
复制代码
这里重点看一下org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class源码,打开一看有一个解释:
  
  1. /**
  2. * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
  3. * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
  4. * forcibly roll all HFiles being written.
  5. * <p>
  6. * Using this class as part of a MapReduce job is best done using
  7. * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
  8. */
复制代码


  • 写入 HFiles,传入的 Cell 必须按顺序到达。将当前时间作为文件的序列 ID 写入。在创建的 HFile 上设置“主要压缩完成”属性。调用 write(null, null) 将逼迫滚动所有正在写入的 HFiles。 (说明写入前需要排好序。!!!!!)
  思考:需不需要手动设置ReduceTask?

不需要,HFileOutputFormat2.configureIncrementalLoad()会根据MapTask输出的Value范例,自动设置Reducer。
   支持的Value范例有:KeyValue.class、MapReduceExtendedCell.class、Put.class、Text.class。
  别的,还会根据表的region情况,设置reduce数目和Partitioner分区器
  1. static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
  2.     Class<? extends OutputFormat<?, ?>> cls) throws IOException {
  3.     ...........
  4.     // Based on the configured map output class, set the correct reducer to properly
  5.     // sort the incoming values.
  6.     // TODO it would be nice to pick one or the other of these formats.
  7.     if (
  8.       KeyValue.class.equals(job.getMapOutputValueClass())
  9.         || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
  10.     ) {
  11.       job.setReducerClass(CellSortReducer.class);
  12.     } else if (Put.class.equals(job.getMapOutputValueClass())) {
  13.       job.setReducerClass(PutSortReducer.class);
  14.     } else if (Text.class.equals(job.getMapOutputValueClass())) {
  15.       job.setReducerClass(TextSortReducer.class);
  16.     } else {
  17.       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
  18.     }
  19.     ...........
  20.     List<ImmutableBytesWritable> startKeys =
  21.       getRegionStartKeys(regionLocators, writeMultipleTables);
  22.     // Use table's region boundaries for TOP split points.
  23.     LOG.info("Configuring " + startKeys.size() + " reduce partitions "
  24.       + "to match current region count for all tables");
  25.     job.setNumReduceTasks(startKeys.size());
  26.     configurePartitioner(job, startKeys, writeMultipleTables);
  27.     ...........
  28. }
复制代码
初始化Partitioner: 

  1. /**
  2.    * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
  3.    * <code>splitPoints</code>. Cleans up the partitions file after job exists.
  4.    */
  5.   static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
  6.     boolean writeMultipleTables) throws IOException {
  7.     Configuration conf = job.getConfiguration();
  8.     // create the partitions file
  9.     FileSystem fs = FileSystem.get(conf);
  10.     String hbaseTmpFsDir =
  11.       conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
  12.     Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
  13.     fs.makeQualified(partitionsPath);
  14.     writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
  15.     fs.deleteOnExit(partitionsPath);
  16.     // configure job to use it
  17.     job.setPartitionerClass(TotalOrderPartitioner.class);
  18.     TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
  19.   }
复制代码
分区器:org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner

(源码太长了,就不贴了)
可以实现全局有序分区,getPartition()方法用于确定给定键值对属于哪个分区。它依靠于之前构建的分区数据结构(Node),在差别情况下有差别的处理处罚逻辑:
1. 键范例为BinaryComparable且启用了自然顺序

如果键范例是BinaryComparable且配置启用了自然顺序,分区器会构建一个前缀树(Trie)。这种情况下,getPartition()方法通过前缀树来查找分区。前缀树的构建基于键的字节体现,前缀树的每个节点体现一个字节,键的字节序列决定了它在前缀树中的路径。这种方法利用了键的二进制体现的自然顺序,可以大概快速定位到正确的分区。前缀树的深度由配置参数total.order.partitioner.max.trie.depth控制,默以为200。
2. 键范例不是BinaryComparable或者自然顺序被禁用

如果键范例不是BinaryComparable或者自然顺序被禁用,分区器会使用二分查找来定位分区。这种情况下,getPartition()方法会调用BinarySearchNode类的findPartition()方法,使用二分查找算法在分区键集合中查找给定键。二分查找的时间复杂度为O(logn),其中n是分区键的数目。

参考文章:Hbase Bulkload 原理|口试必备 - 知乎
https://zhuanlan.zhihu.com/p/363044046

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表