- Mapper类:将输入数据转换为HBase的KeyValue格式
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- public class HFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
-
- private byte[] columnFamily;
- private byte[] columnQualifier;
- private ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
- @Override
- protected void setup(Context context) {
- // 配置列族和列名(根据实际情况修改)
- columnFamily = Bytes.toBytes("cf");
- columnQualifier = Bytes.toBytes("col");
- }
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
-
- String[] fields = value.toString().split(","); // 假设输入格式为: rowkey,value
- if (fields.length < 2) return;
- String row = fields[0];
- String val = fields[1];
-
- rowKey.set(Bytes.toBytes(row));
-
- // 创建KeyValue对象(时间戳可自定义或自动生成)
- KeyValue kv = new KeyValue(
- Bytes.toBytes(row), // Row key
- columnFamily, // Column family
- columnQualifier, // Column qualifier
- System.currentTimeMillis(), // Timestamp
- Bytes.toBytes(val) // Value
- );
-
- context.write(rowKey, kv);
- }
- }
复制代码
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Connection;
- import org.apache.hadoop.hbase.client.ConnectionFactory;
- import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class HFileGenerator {
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- // 设置HBase ZooKeeper地址(根据实际情况修改)
- conf.set("hbase.zookeeper.quorum", "zk1.example.com,zk2.example.com");
-
- Job job = Job.getInstance(conf, "Generate HFile for HBase");
- job.setJarByClass(HFileGenerator.class);
-
- // 设置Mapper
- job.setMapperClass(HFileMapper.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
-
- // 设置输入格式和路径
- job.setInputFormatClass(TextInputFormat.class);
- FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径
-
- // 设置输出格式
- job.setOutputFormatClass(HFileOutputFormat2.class);
- FileOutputFormat.setOutputPath(job, new Path(args[1])); // HFile输出路径
-
- // 获取HBase表信息
- Connection conn = ConnectionFactory.createConnection(conf);
- TableName tableName = TableName.valueOf("your_table_name"); // 目标表名
-
- // 配置HFileOutputFormat
- HFileOutputFormat2.configureIncrementalLoad(
- job,
- conn.getTable(tableName),
- conn.getRegionLocator(tableName)
- );
-
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
复制代码 这里重点看一下org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class源码,打开一看有一个解释:
- /**
- * Writes HFiles. Passed Cells must arrive in order. Writes current time as the sequence id for the
- * file. Sets the major compacted attribute on created {@link HFile}s. Calling write(null,null) will
- * forcibly roll all HFiles being written.
- * <p>
- * Using this class as part of a MapReduce job is best done using
- * {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
- */
复制代码
- 写入 HFiles,传入的 Cell 必须按顺序到达。将当前时间作为文件的序列 ID 写入。在创建的 HFile 上设置“主要压缩完成”属性。调用 write(null, null) 将逼迫滚动所有正在写入的 HFiles。 (说明写入前需要排好序。!!!!!)
思考:需不需要手动设置ReduceTask?
不需要,HFileOutputFormat2.configureIncrementalLoad()会根据MapTask输出的Value范例,自动设置Reducer。
支持的Value范例有:KeyValue.class、MapReduceExtendedCell.class、Put.class、Text.class。
别的,还会根据表的region情况,设置reduce数目和Partitioner分区器
- static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
- Class<? extends OutputFormat<?, ?>> cls) throws IOException {
- ...........
- // Based on the configured map output class, set the correct reducer to properly
- // sort the incoming values.
- // TODO it would be nice to pick one or the other of these formats.
- if (
- KeyValue.class.equals(job.getMapOutputValueClass())
- || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
- ) {
- job.setReducerClass(CellSortReducer.class);
- } else if (Put.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(PutSortReducer.class);
- } else if (Text.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(TextSortReducer.class);
- } else {
- LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
- }
- ...........
- List<ImmutableBytesWritable> startKeys =
- getRegionStartKeys(regionLocators, writeMultipleTables);
- // Use table's region boundaries for TOP split points.
- LOG.info("Configuring " + startKeys.size() + " reduce partitions "
- + "to match current region count for all tables");
- job.setNumReduceTasks(startKeys.size());
- configurePartitioner(job, startKeys, writeMultipleTables);
- ...........
- }
复制代码 初始化Partitioner:
- /**
- * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
- * <code>splitPoints</code>. Cleans up the partitions file after job exists.
- */
- static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
- boolean writeMultipleTables) throws IOException {
- Configuration conf = job.getConfiguration();
- // create the partitions file
- FileSystem fs = FileSystem.get(conf);
- String hbaseTmpFsDir =
- conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
- fs.makeQualified(partitionsPath);
- writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
- fs.deleteOnExit(partitionsPath);
- // configure job to use it
- job.setPartitionerClass(TotalOrderPartitioner.class);
- TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
- }
复制代码 分区器:org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner
(源码太长了,就不贴了)
可以实现全局有序分区,getPartition()方法用于确定给定键值对属于哪个分区。它依靠于之前构建的分区数据结构(Node),在差别情况下有差别的处理处罚逻辑:
1. 键范例为BinaryComparable且启用了自然顺序
如果键范例是BinaryComparable且配置启用了自然顺序,分区器会构建一个前缀树(Trie)。这种情况下,getPartition()方法通过前缀树来查找分区。前缀树的构建基于键的字节体现,前缀树的每个节点体现一个字节,键的字节序列决定了它在前缀树中的路径。这种方法利用了键的二进制体现的自然顺序,可以大概快速定位到正确的分区。前缀树的深度由配置参数total.order.partitioner.max.trie.depth控制,默以为200。
2. 键范例不是BinaryComparable或者自然顺序被禁用
如果键范例不是BinaryComparable或者自然顺序被禁用,分区器会使用二分查找来定位分区。这种情况下,getPartition()方法会调用BinarySearchNode类的findPartition()方法,使用二分查找算法在分区键集合中查找给定键。二分查找的时间复杂度为O(logn),其中n是分区键的数目。
参考文章:Hbase Bulkload 原理|口试必备 - 知乎https://zhuanlan.zhihu.com/p/363044046
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|