【HBase分布式数据库】第七章 数据的导入导出 (2-5)

打印 上一主题 下一主题

主题 1003|帖子 1003|积分 3009

7.2 bulkload导入数据

使命目的

掌握引入外部依靠包的方法
掌握eclipse打包的方法
掌握bulkload导入数据的逻辑代码
使命清单



  • 使命1:引入外部依靠包
  • 使命2:bulkload导入数据
使命步骤

使命1:引入外部依靠包

Bulkload是通过一个MapReduce Job来实现的,通过Job直接生成一个HBase的内部HFile格式文件来形成一个特别的HBase数据表,然后直接将数据文件加载到运行的集群中。使用bulk load功能最简单的方式就是使用importtsv 工具。importtsv 是从TSV文件直接加载内容至HBase的一个内置工具。它通过运行一个MapReduce Job,将数据从TSV文件中直接写入HBase的表或者写入一个HBase的自有格式数据文件。在编写代码逻辑之前,我们首先要引入程序依靠的jar包,步骤如下:
1、右键项目,选择【build path】>【configure build path】

8.2-1
2、在弹出的对话框内,单击【libraries】> 【add external jars】

8.2-2
3、弹出的对话框中,找到Hadoop存放jar包的路径,路径如图所示。

8.2-3
4、当前页面下的文件夹包罗MapReduce、hdfs、yarn和common下的全部jar包,选中之后,单击底部的open按钮。需要注意的是,这4个包每个包需要单独打开,单独选中。全部添加完毕之后,单击【apply and close】
使命2:bulkload导入数据
构建BulkLoadJob类
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.FsShell;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.KeyValue;
  6. import org.apache.hadoop.hbase.client.HTable;
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  8. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
  9. import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.util.GenericOptionsParser;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. import java.io.IOException;
  22. public class BulkLoadJob {
  23.             //指定的类BulkLoadJob初始化日志对象,方便在日志输出的时候,可以打印出日志信息所属的类。
  24.         static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class);
  25.                 //构建map端输入
  26.         public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
  27.                                 //map方法
  28.                 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  29.                                                 //对输入数据进行切分
  30.                         String[] valueStrSplit = value.toString().split("\t");
  31.                             //拿到行键
  32.                         String hkey = valueStrSplit[0];
  33.                             //拿到列族
  34.                         String family = valueStrSplit[1].toString().split(":")[0];
  35.                             //拿到列
  36.                         String column = valueStrSplit[1].toString().split(":")[1];
  37.                             //拿到数值
  38.                         String hvalue = valueStrSplit[2];
  39.                             //行键转换成不可变型的字节
  40.                         final byte[] rowKey = Bytes.toBytes(hkey);
  41.                         final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
  42.                         //把行键、列族、列和值封装成KV对儿
  43.                             KeyValue kv = new KeyValue(rowKey, Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(hvalue));
  44.                         //写到磁盘
  45.                             context.write(HKey, kv);
  46.                 }
  47.         }
  48.         public static void main(String[] args) throws Exception {
  49.                     //配置信息的创建
  50.                 Configuration conf = HBaseConfiguration.create();
  51.                 conf.set("hbase.zookeeper.property.clientPort", "2181");
  52.                 conf.set("hbase.zookeeper.quorum", "localhost");
  53.                     //指定数据的输入和输出
  54.                 String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  55.                 String inputPath = dfsArgs[0];
  56.                 System.out.println("source: " + dfsArgs[0]);
  57.                 String outputPath = dfsArgs[1];
  58.                 System.out.println("dest: " + dfsArgs[1]);
  59.                 HTable hTable = null;
  60.                 Job job = Job.getInstance(conf, "Test Import HFile & Bulkload");
  61.                 job.setJarByClass(BulkLoadJob.class);
  62.                 job.setMapperClass(BulkLoadJob.BulkLoadMap.class);
  63.                 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  64.                 job.setMapOutputValueClass(KeyValue.class);
  65.                 // 避免测试task
  66.                 job.setSpeculativeExecution(false);
  67.                 job.setReduceSpeculativeExecution(false);
  68.                 // 输入输出端的格式
  69.                 job.setInputFormatClass(TextInputFormat.class);
  70.                 job.setOutputFormatClass(HFileOutputFormat2.class);
  71.                 FileInputFormat.setInputPaths(job, inputPath);
  72.                 FileOutputFormat.setOutputPath(job, new Path(outputPath));
  73.                     //指定表名
  74.                 hTable = new HTable(conf, "ns:t_table");
  75.                 HFileOutputFormat2.configureIncrementalLoad(job, hTable);
  76.                 if (job.waitForCompletion(true)) {
  77.                         FsShell shell = new FsShell(conf);
  78.                         try {
  79.                                 shell.run(new String[] { "-chmod", "-R", "777", dfsArgs[1] });
  80.                         } catch (Exception e) {
  81.                                 logger.error("Couldnt change the file permissions ", e);
  82.                                 throw new IOException(e);
  83.                         }
  84.                         //数据导入hbase表
  85.                         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
  86.                         loader.doBulkLoad(new Path(outputPath), hTable);
  87.                 } else {
  88.                         logger.error("loading failed.");
  89.                         System.exit(1);
  90.                 }
  91.                 if (hTable != null) {
  92.                         hTable.close();
  93.                 }
  94.         }
  95. }
复制代码
创建测试表
进入hbase的shell环境,创建测试命名空间和测试表。
  1. bin/hbase shell
  2. create_namespace 'ns'
  3. create 'ns:t_table','cf1','cf2'
复制代码

8.2-4
数据上传
HDFS上创建目录,把数据bulkdata.csv上传到HDFS。
  1. cat bulkdata.csv
  2. hadoop fs -mkdir -p /data/input
  3. hadoop fs -put bulkdata.csv /data/input
复制代码

8.2-5
完成了数据和程序之后,就要对程序打包了。
1、选中程序所在包,右键选择【export】

8.2-6
2、弹出的对话框中选择【Java】下的【jar file】,单击next。

8.2-7
3、在弹出的对话框,勾选依靠,指定jar包的输出路径。单击next。

8.2-8
4、本对话框不需要操纵

8.2-9
5、在接下来的对话框中,需要指定运行主类。末了单击finish

8.2-10
运行jar包
使用Hadoop运行jar包的命令,执行导入数据操纵。
  1. hadoop jar /headless/Desktop/test.jar /data/input/bulkdata.csv /data/output/bulk_out
复制代码

8.2-11
检察效果
进入shell环境,检察表中是否有数据。
  1. bin/hbase shell
  2. scan 'ns:t_table'
复制代码

7.3 HBase的WordCount

使命目的
实践hbase的Wordcount
使命清单
使命1:预备工作
使命2:WordCount
使命步骤

使命1:预备工作

测试命名空间和测试表
进入shell环境。创建测试命名空间ns以及测试表src_table和dest_table,两张表都只有一个列族cf。
  1. bin/hbase shell
  2. create_namespace 'ns'
  3. create 'ns:src_table','cf'
  4. create 'ns:dest_table','cf'
复制代码
测试数据
为测试表插入测试数据。
  1. put 'ns:src_table','1','cf:word','hello'
  2. put 'ns:src_table','2','cf:word','Java'
  3. put 'ns:src_table','3','cf:word','hello'
  4. put 'ns:src_table','4','cf:word','Scala'
复制代码

8.3-1
使命2:WordCount

程序逻辑
新建wordcount包,在包下新建HbaseWordCount类。
  1. package wordcount;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.Cell;
  5. import org.apache.hadoop.hbase.CellUtil;
  6. import org.apache.hadoop.hbase.HBaseConfiguration;
  7. import org.apache.hadoop.hbase.client.Mutation;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.client.Result;
  10. import org.apache.hadoop.hbase.client.Scan;
  11. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  12. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  13. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  14. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.io.IntWritable;
  17. import org.apache.hadoop.io.Text;
  18. import org.apache.hadoop.mapreduce.Job;
  19. import org.apache.hadoop.mapreduce.Mapper;
  20. import org.apache.hadoop.mapreduce.Reducer;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. public class HbaseWordCount {
  24.                 //日志输出的时候,可以打印出日志信息所在类
  25.         static Logger logger = LoggerFactory.getLogger(HbaseWordCount.class);
  26.             //设置服务器端口以及服务器地址
  27.         static Configuration conf = null;
  28.         static {
  29.                 conf = HBaseConfiguration.create();
  30.                 conf.set("hbase.zookeeper.property.clientPort", "2181");
  31.                 conf.set("hbase.zookeeper.quorum", "localhost");
  32.         }
  33.         public static class HBMapper extends TableMapper<Text,IntWritable>{
  34.                 private static IntWritable one = new IntWritable(1);
  35.                 private static Text word = new Text();
  36.                 @Override
  37.                 protected void map(ImmutableBytesWritable key, Result value,
  38.                                 Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
  39.                                 throws IOException, InterruptedException {
  40.                         for(Cell cell : value.rawCells()) {
  41.                                 word.set(CellUtil.cloneValue(cell));
  42.                                 context.write(word, one);
  43.                         }
  44.                 }
  45.         }
  46.         public static class HBReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{
  47.                 @SuppressWarnings("deprecation")
  48.                 @Override
  49.                 protected void reduce(Text key, Iterable<IntWritable> values,
  50.                                 Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
  51.                                 throws IOException, InterruptedException {
  52.                         int sum = 0;
  53.                         for(IntWritable value : values) {
  54.                                 sum += value.get();
  55.                         }
  56.                         //把单词作为行键进行存储
  57.                         Put put = new Put(Bytes.toBytes(key.toString()));
  58.                         //数据存储到hbase表,列族为cf,列为col,值为sum
  59.                         put.add(Bytes.toBytes("cf"),
  60.                                         Bytes.toBytes("col"),
  61.                                         Bytes.toBytes(String.valueOf(sum)));
  62.                         //写到hbase中的需要指定行键和put
  63.                         context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
  64.                 }
  65.         }
  66.         public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
  67.                 @SuppressWarnings("deprecation")
  68.                 Job job = new Job(conf,"hbase wordcount");
  69.                 Scan scan = new Scan();
  70.                     //使用TableMapReduceUtil工具类初始化map,扫描源表中数据执行map操作
  71.                 TableMapReduceUtil.initTableMapperJob(
  72.                                 "ns:src_table",
  73.                                 scan,
  74.                                 HBMapper.class,
  75.                                 Text.class,
  76.                                 IntWritable.class,
  77.                                 job);
  78.                     //使用TableMapReduceUtil工具类初始化reduce,把reduce之后的结果存储到目标表
  79.                 TableMapReduceUtil.initTableReducerJob(
  80.                                 "ns:dest_table",
  81.                                 HBReducer.class,
  82.                                 job);
  83.                 job.waitForCompletion(true);
  84.                 System.out.println("finished");
  85.         }
  86. }
复制代码
执行效果
程序完成之后,运行程序。当我们看到“finished”后,进入shell环境检察目标表中数据。
  1. scan 'ns:dest_table'
复制代码

8.3-2
8.4 HDFS数据导入HBase

使命目的
掌握Hadoop与HBase的集成使用
使命清单
使命1:HDFS数据导入HBase
使命步骤

使命1:HDFS数据导入HBase

上传数据
在HDFS上新建/data/input目录,把hbase目录下的testdata中的csvdata.txt上传到该目录。
  1. hadoop fs -mkdir -p /data/input
  2. hadoop fs -put ./csvdata.txt /data/input
复制代码

8.4-1
程序
新建一个hdfsandhbase包,包下新建一个Hdfs2Hbase类。
  1. package hdfsandhbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.hbase.HColumnDescriptor;
  6. import org.apache.hadoop.hbase.HTableDescriptor;
  7. import org.apache.hadoop.hbase.NamespaceDescriptor;
  8. import org.apache.hadoop.hbase.TableName;
  9. import org.apache.hadoop.hbase.client.Admin;
  10. import org.apache.hadoop.hbase.client.Connection;
  11. import org.apache.hadoop.hbase.client.ConnectionFactory;
  12. import org.apache.hadoop.hbase.client.Put;
  13. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  14. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  15. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  16. import org.apache.hadoop.hbase.util.Bytes;
  17. import org.apache.hadoop.io.LongWritable;
  18. import org.apache.hadoop.io.Text;
  19. import org.apache.hadoop.mapreduce.Job;
  20. import org.apache.hadoop.mapreduce.Mapper;
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. public class Hdfs2Hbase {
  23.         public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
  24.                 @Override
  25.                 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
  26.                                 throws IOException, InterruptedException {
  27.                         String line = value.toString();
  28.                         String[] lines = line.split(",");
  29.                         for (String s : lines) {
  30.                                 context.write(new Text(s), new Text(1+""));
  31.                         }
  32.                 }
  33.         }
  34.         public static class MyReduce extends TableReducer<Text,Text,ImmutableBytesWritable>{
  35.                 @Override
  36.                 protected void reduce(Text key, Iterable<Text> value,Context context)
  37.                                 throws IOException, InterruptedException {
  38.                         int counter = 0;
  39.                         for(Text t:value) {
  40.                                 counter += Integer.parseInt(t.toString());
  41.                         }
  42.                         //写出到hbase中去
  43.                         Put put = new Put(Bytes.toBytes(key.toString()));
  44.                         put.addColumn("data".getBytes(), "count".getBytes(), (counter+"").getBytes());
  45.                         context.write(new ImmutableBytesWritable(key.getBytes()), put);
  46.                 }
  47.         }
  48.         public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
  49.                 Configuration conf = new Configuration();
  50.                 conf.set("fs.defaultFS", "hdfs://localhost:9000");
  51.                 conf.set("hbase.zookeeper.quorum", "localhost");
  52.                 TableName tn = TableName.valueOf("ns:test");
  53.                     //对hbase进行操作
  54.                 Connection conn = ConnectionFactory.createConnection(conf);
  55.                 Admin admin = conn.getAdmin();
  56.                     //创建命名空间
  57.                 NamespaceDescriptor nsd = NamespaceDescriptor.create("ns").build();
  58.                 admin.createNamespace(nsd);
  59.                     //创建表
  60.                 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("ns:test"));
  61.                 HColumnDescriptor hcd = new HColumnDescriptor("data");
  62.                 htd.addFamily(hcd);
  63.                     //判断表是否存在
  64.                 if(admin.tableExists(tn)) {
  65.                         if(admin.isTableEnabled(tn)) {
  66.                                 admin.disableTable(tn);
  67.                         }
  68.                         admin.deleteTable(tn);
  69.                 }
  70.                 admin.createTable(htd);
  71.                     //定义job
  72.                 Job job = Job.getInstance(conf,"hdfs2hbase");
  73.                 job.setMapperClass(MyMapper.class);
  74.                 job.setMapOutputKeyClass(Text.class);
  75.                 job.setMapOutputValueClass(Text.class);
  76.                     //数据输入路径
  77.                 FileInputFormat.addInputPath(job, new Path("/data/input/csvdata.txt"));
  78.                     //使用TableMapreduceUtil初始化reduce
  79.                 TableMapReduceUtil.initTableReducerJob(
  80.                                 "ns:test",
  81.                                 MyReduce.class,
  82.                                 job);
  83.                 job.waitForCompletion(true);
  84.                 System.out.println("finished");
  85.         }
  86. }
复制代码
检察效果
运行程序没有报错的情况下,进入shell环境,检察test表中数据。
  1. bin/hbase shell
  2. scan 'ns:test'
复制代码

8.4-2
8.5 HBase数据导入HDFS

使命目的
掌握hbase数据导入到HDFS的程序逻辑
使命清单
使命1:HBase数据导入HDFS
使命步骤

使命1:HBase数据导入HDFS

原始表和数据
进入shell环境,创建测试表“ns:test”,包罗一个列族cf和一个列col,并插入两条数据。
  1. create_namespace 'ns'
  2. create 'ns:test','cf'
  3. put 'ns:test','1','cf:col','value1'
  4. put 'ns:test','2','cf:col','value2'
复制代码

8.5-1
程序
在hdfsandhbase包下新建Hbase2Hdfs类。
  1. package hdfsandhbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.hbase.client.Result;
  6. import org.apache.hadoop.hbase.client.Scan;
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  8. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  9. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Job;
  12. import org.apache.hadoop.mapreduce.Mapper;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. public class Hbase2Hdfs {
  16.         public static class MyMapper extends TableMapper<Text,Text>{
  17.                 @Override
  18.                 protected void map(ImmutableBytesWritable key, Result value,
  19.                                 Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
  20.                                 throws IOException, InterruptedException {
  21.                         //获取对应的列族和列,设置为utf-8
  22.                         String cfandc = new String(value.getValue("cf".getBytes(),
  23.                                         "col".getBytes()),"utf-8");
  24.                         context.write(new Text(""), new Text(cfandc));
  25.                 }
  26.         }
  27.         public static class MyReducer extends Reducer<Text,Text,Text,Text>{
  28.                     //实例化Text  用来存储获取到的数据
  29.                 private Text result = new Text();
  30.                 @Override
  31.                 protected void reduce(Text key, Iterable<Text> values, Context context)
  32.                                 throws IOException, InterruptedException {
  33.                         for(Text t : values) {
  34.                                 result.set(t);
  35.                                 context.write(key, result);
  36.                         }
  37.                 }
  38.         }
  39.         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  40.                 //配置相关信息
  41.                     Configuration conf = new Configuration();
  42.                 conf.set("fs.defaultFS", "hdfs://localhost:9000");
  43.                 conf.set("hbase.zookeeper.quorum", "localhost");
  44.                     //实例化任务
  45.                 Job job = Job.getInstance(conf,"hbase2hdfs");
  46.                 //设置运行主类
  47.                     job.setJarByClass(Hbase2Hdfs.class);
  48.                 Scan scan = new Scan();
  49.                 TableMapReduceUtil.initTableMapperJob(
  50.                                 "ns:test",
  51.                                 scan,
  52.                                 MyMapper.class,
  53.                                 Text.class,
  54.                                 Text.class,
  55.                                 job);
  56.                 job.setReducerClass(MyReducer.class);
  57.                     //设置输出路径
  58.                 FileOutputFormat.setOutputPath(job, new Path("/data/output/out"));
  59.                 job.waitForCompletion(true);
  60.                 System.out.println("finished");
  61.         }
  62. }
复制代码
执行效果
退出shell环境,检察输出路径下的文件效果。
  1. quit
  2. hadoop fs -cat /data/output/out/part-r-00000
复制代码

8.5-2

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农妇山泉一亩田

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表