农妇山泉一亩田 发表于 2024-11-11 13:56:33

【HBase分布式数据库】第七章 数据的导入导出 (2-5)

7.2 bulkload导入数据

使命目的

掌握引入外部依靠包的方法
掌握eclipse打包的方法
掌握bulkload导入数据的逻辑代码
使命清单



[*]使命1:引入外部依靠包
[*]使命2:bulkload导入数据
使命步骤

使命1:引入外部依靠包

Bulkload是通过一个MapReduce Job来实现的,通过Job直接生成一个HBase的内部HFile格式文件来形成一个特别的HBase数据表,然后直接将数据文件加载到运行的集群中。使用bulk load功能最简单的方式就是使用importtsv 工具。importtsv 是从TSV文件直接加载内容至HBase的一个内置工具。它通过运行一个MapReduce Job,将数据从TSV文件中直接写入HBase的表或者写入一个HBase的自有格式数据文件。在编写代码逻辑之前,我们首先要引入程序依靠的jar包,步骤如下:
1、右键项目,选择【build path】>【configure build path】
https://i-blog.csdnimg.cn/direct/e00a6c5a6d4a44838f1422b6099478d5.png
8.2-1
2、在弹出的对话框内,单击【libraries】> 【add external jars】
https://i-blog.csdnimg.cn/direct/222b527fe9ac4b7b88ed589c4c7b2109.png
8.2-2
3、弹出的对话框中,找到Hadoop存放jar包的路径,路径如图所示。
https://i-blog.csdnimg.cn/direct/9123bfd33f3242018b5aa10c6607a38a.png
8.2-3
4、当前页面下的文件夹包罗MapReduce、hdfs、yarn和common下的全部jar包,选中之后,单击底部的open按钮。需要注意的是,这4个包每个包需要单独打开,单独选中。全部添加完毕之后,单击【apply and close】
使命2:bulkload导入数据
构建BulkLoadJob类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class BulkLoadJob {
            //指定的类BulkLoadJob初始化日志对象,方便在日志输出的时候,可以打印出日志信息所属的类。
      static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class);
                //构建map端输入
      public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
                                //map方法
                public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                                                //对输入数据进行切分
                        String[] valueStrSplit = value.toString().split("\t");
                          //拿到行键
                        String hkey = valueStrSplit;
                          //拿到列族
                        String family = valueStrSplit.toString().split(":");
                          //拿到列
                        String column = valueStrSplit.toString().split(":");
                          //拿到数值
                        String hvalue = valueStrSplit;
                          //行键转换成不可变型的字节
                        final byte[] rowKey = Bytes.toBytes(hkey);
                        final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
                        //把行键、列族、列和值封装成KV对儿
                          KeyValue kv = new KeyValue(rowKey, Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(hvalue));
                        //写到磁盘
                          context.write(HKey, kv);
                }
      }

      public static void main(String[] args) throws Exception {
                    //配置信息的创建
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.property.clientPort", "2181");
                conf.set("hbase.zookeeper.quorum", "localhost");
                    //指定数据的输入和输出
                String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
                String inputPath = dfsArgs;
                System.out.println("source: " + dfsArgs);
                String outputPath = dfsArgs;
                System.out.println("dest: " + dfsArgs);
                HTable hTable = null;
                Job job = Job.getInstance(conf, "Test Import HFile & Bulkload");
                job.setJarByClass(BulkLoadJob.class);
                job.setMapperClass(BulkLoadJob.BulkLoadMap.class);
                job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                job.setMapOutputValueClass(KeyValue.class);
                // 避免测试task
                job.setSpeculativeExecution(false);
                job.setReduceSpeculativeExecution(false);
                // 输入输出端的格式
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(HFileOutputFormat2.class);

                FileInputFormat.setInputPaths(job, inputPath);
                FileOutputFormat.setOutputPath(job, new Path(outputPath));
                    //指定表名
                hTable = new HTable(conf, "ns:t_table");
                HFileOutputFormat2.configureIncrementalLoad(job, hTable);

                if (job.waitForCompletion(true)) {
                        FsShell shell = new FsShell(conf);
                        try {
                              shell.run(new String[] { "-chmod", "-R", "777", dfsArgs });
                        } catch (Exception e) {
                              logger.error("Couldnt change the file permissions ", e);
                              throw new IOException(e);
                        }
                        //数据导入hbase表
                        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                        loader.doBulkLoad(new Path(outputPath), hTable);
                } else {
                        logger.error("loading failed.");
                        System.exit(1);
                }
                if (hTable != null) {
                        hTable.close();
                }
      }
}
创建测试表
进入hbase的shell环境,创建测试命名空间和测试表。
bin/hbase shell
create_namespace 'ns'
create 'ns:t_table','cf1','cf2'
https://i-blog.csdnimg.cn/direct/dfeaf6f50b194df0ade07d8c94e72c27.png
8.2-4
数据上传
HDFS上创建目录,把数据bulkdata.csv上传到HDFS。
cat bulkdata.csv
hadoop fs -mkdir -p /data/input
hadoop fs -put bulkdata.csv /data/input
https://i-blog.csdnimg.cn/direct/cbe4f29ae9564fec8a90cb8a41e2cd9a.png
8.2-5
完成了数据和程序之后,就要对程序打包了。
1、选中程序所在包,右键选择【export】
https://i-blog.csdnimg.cn/direct/b103cddeda1f4410b05aadd0f951c0c8.png
8.2-6
2、弹出的对话框中选择【Java】下的【jar file】,单击next。
https://i-blog.csdnimg.cn/direct/fe567dd2700e4fcf88d9e7697e05c49e.png
8.2-7
3、在弹出的对话框,勾选依靠,指定jar包的输出路径。单击next。
https://i-blog.csdnimg.cn/direct/6d7f7a573f9d42d19a09ce6bd96f83f9.png
8.2-8
4、本对话框不需要操纵
https://i-blog.csdnimg.cn/direct/4d96a91b031343f69d82709309e7d1b4.png
8.2-9
5、在接下来的对话框中,需要指定运行主类。末了单击finish
https://i-blog.csdnimg.cn/direct/07981420fd474e12a92b79c124b34e42.png
8.2-10
运行jar包
使用Hadoop运行jar包的命令,执行导入数据操纵。
hadoop jar /headless/Desktop/test.jar /data/input/bulkdata.csv /data/output/bulk_out
https://i-blog.csdnimg.cn/direct/07c190c14403454cba5ec98a6dd30e51.png
8.2-11
检察效果
进入shell环境,检察表中是否有数据。
bin/hbase shell
scan 'ns:t_table'
https://i-blog.csdnimg.cn/direct/d8d6a98e1b9c4565a516377d5788bf25.png
7.3 HBase的WordCount

使命目的
实践hbase的Wordcount
使命清单
使命1:预备工作
使命2:WordCount
使命步骤

使命1:预备工作

测试命名空间和测试表
进入shell环境。创建测试命名空间ns以及测试表src_table和dest_table,两张表都只有一个列族cf。
bin/hbase shell
create_namespace 'ns'
create 'ns:src_table','cf'
create 'ns:dest_table','cf'
测试数据
为测试表插入测试数据。
put 'ns:src_table','1','cf:word','hello'
put 'ns:src_table','2','cf:word','Java'
put 'ns:src_table','3','cf:word','hello'
put 'ns:src_table','4','cf:word','Scala'
https://i-blog.csdnimg.cn/direct/150f7d8858a14ff7aa5ffe16800380cf.png
8.3-1
使命2:WordCount

程序逻辑
新建wordcount包,在包下新建HbaseWordCount类。
package wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HbaseWordCount {
                //日志输出的时候,可以打印出日志信息所在类
      static Logger logger = LoggerFactory.getLogger(HbaseWordCount.class);
            //设置服务器端口以及服务器地址
      static Configuration conf = null;
      static {
                conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.property.clientPort", "2181");
                conf.set("hbase.zookeeper.quorum", "localhost");
      }

      public static class HBMapper extends TableMapper<Text,IntWritable>{
                private static IntWritable one = new IntWritable(1);
                private static Text word = new Text();
                @Override
                protected void map(ImmutableBytesWritable key, Result value,
                              Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
                              throws IOException, InterruptedException {
                        for(Cell cell : value.rawCells()) {
                              word.set(CellUtil.cloneValue(cell));
                              context.write(word, one);
                        }
                }
      }

      public static class HBReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{

                @SuppressWarnings("deprecation")
                @Override
                protected void reduce(Text key, Iterable<IntWritable> values,
                              Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
                              throws IOException, InterruptedException {
                        int sum = 0;
                        for(IntWritable value : values) {
                              sum += value.get();
                        }
                        //把单词作为行键进行存储
                        Put put = new Put(Bytes.toBytes(key.toString()));
                        //数据存储到hbase表,列族为cf,列为col,值为sum
                        put.add(Bytes.toBytes("cf"),
                                        Bytes.toBytes("col"),
                                        Bytes.toBytes(String.valueOf(sum)));
                        //写到hbase中的需要指定行键和put
                        context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
                }
      }

      public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
                @SuppressWarnings("deprecation")
                Job job = new Job(conf,"hbase wordcount");
                Scan scan = new Scan();
                    //使用TableMapReduceUtil工具类初始化map,扫描源表中数据执行map操作
                TableMapReduceUtil.initTableMapperJob(
                              "ns:src_table",
                              scan,
                              HBMapper.class,
                              Text.class,
                              IntWritable.class,
                              job);
                    //使用TableMapReduceUtil工具类初始化reduce,把reduce之后的结果存储到目标表
                TableMapReduceUtil.initTableReducerJob(
                              "ns:dest_table",
                              HBReducer.class,
                              job);
                job.waitForCompletion(true);
                System.out.println("finished");
      }
}
执行效果
程序完成之后,运行程序。当我们看到“finished”后,进入shell环境检察目标表中数据。
scan 'ns:dest_table'
https://i-blog.csdnimg.cn/direct/f0c7a6d70e32470787c7d789948e96da.png
8.3-2
8.4 HDFS数据导入HBase

使命目的
掌握Hadoop与HBase的集成使用
使命清单
使命1:HDFS数据导入HBase
使命步骤

使命1:HDFS数据导入HBase

上传数据
在HDFS上新建/data/input目录,把hbase目录下的testdata中的csvdata.txt上传到该目录。
hadoop fs -mkdir -p /data/input
hadoop fs -put ./csvdata.txt /data/input
https://i-blog.csdnimg.cn/direct/df8a695c30ea4633875f01943bb7d5c6.png
8.4-1
程序
新建一个hdfsandhbase包,包下新建一个Hdfs2Hbase类。
package hdfsandhbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class Hdfs2Hbase {

      public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
                @Override
                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                              throws IOException, InterruptedException {
                        String line = value.toString();
                        String[] lines = line.split(",");
                        for (String s : lines) {
                              context.write(new Text(s), new Text(1+""));
                        }
                }
      }

      public static class MyReduce extends TableReducer<Text,Text,ImmutableBytesWritable>{
                @Override
                protected void reduce(Text key, Iterable<Text> value,Context context)
                              throws IOException, InterruptedException {
                        int counter = 0;
                        for(Text t:value) {
                              counter += Integer.parseInt(t.toString());
                        }
                        //写出到hbase中去
                        Put put = new Put(Bytes.toBytes(key.toString()));
                        put.addColumn("data".getBytes(), "count".getBytes(), (counter+"").getBytes());
                        context.write(new ImmutableBytesWritable(key.getBytes()), put);
                }
      }

      public static void main(String [] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", "hdfs://localhost:9000");
                conf.set("hbase.zookeeper.quorum", "localhost");
                TableName tn = TableName.valueOf("ns:test");
                    //对hbase进行操作
                Connection conn = ConnectionFactory.createConnection(conf);
                Admin admin = conn.getAdmin();
                    //创建命名空间
                NamespaceDescriptor nsd = NamespaceDescriptor.create("ns").build();
                admin.createNamespace(nsd);
                    //创建表
                HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("ns:test"));
                HColumnDescriptor hcd = new HColumnDescriptor("data");
                htd.addFamily(hcd);
                    //判断表是否存在
                if(admin.tableExists(tn)) {
                        if(admin.isTableEnabled(tn)) {
                              admin.disableTable(tn);
                        }
                        admin.deleteTable(tn);
                }
                admin.createTable(htd);
                    //定义job
                Job job = Job.getInstance(conf,"hdfs2hbase");
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                    //数据输入路径
                FileInputFormat.addInputPath(job, new Path("/data/input/csvdata.txt"));
                    //使用TableMapreduceUtil初始化reduce
                TableMapReduceUtil.initTableReducerJob(
                              "ns:test",
                              MyReduce.class,
                              job);
                job.waitForCompletion(true);
                System.out.println("finished");
      }
}
检察效果
运行程序没有报错的情况下,进入shell环境,检察test表中数据。
bin/hbase shell
scan 'ns:test'
https://i-blog.csdnimg.cn/direct/44f3f6a056ea416a86cfed6d3616f0ce.png
8.4-2
8.5 HBase数据导入HDFS

使命目的
掌握hbase数据导入到HDFS的程序逻辑
使命清单
使命1:HBase数据导入HDFS
使命步骤

使命1:HBase数据导入HDFS

原始表和数据
进入shell环境,创建测试表“ns:test”,包罗一个列族cf和一个列col,并插入两条数据。
create_namespace 'ns'
create 'ns:test','cf'
put 'ns:test','1','cf:col','value1'
put 'ns:test','2','cf:col','value2'
https://i-blog.csdnimg.cn/direct/4c003457058d45369cff503e5a02eab0.png
8.5-1
程序
在hdfsandhbase包下新建Hbase2Hdfs类。
package hdfsandhbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Hbase2Hdfs {
      public static class MyMapper extends TableMapper<Text,Text>{
                @Override
                protected void map(ImmutableBytesWritable key, Result value,
                              Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                              throws IOException, InterruptedException {
                        //获取对应的列族和列,设置为utf-8
                        String cfandc = new String(value.getValue("cf".getBytes(),
                                        "col".getBytes()),"utf-8");
                        context.write(new Text(""), new Text(cfandc));
                }
      }

      public static class MyReducer extends Reducer<Text,Text,Text,Text>{

                    //实例化Text用来存储获取到的数据
                private Text result = new Text();
                @Override
                protected void reduce(Text key, Iterable<Text> values, Context context)
                              throws IOException, InterruptedException {
                        for(Text t : values) {
                              result.set(t);
                              context.write(key, result);
                        }

                }
      }

      public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                //配置相关信息
                    Configuration conf = new Configuration();
                conf.set("fs.defaultFS", "hdfs://localhost:9000");
                conf.set("hbase.zookeeper.quorum", "localhost");
                    //实例化任务
                Job job = Job.getInstance(conf,"hbase2hdfs");
                //设置运行主类
                    job.setJarByClass(Hbase2Hdfs.class);
                Scan scan = new Scan();
                TableMapReduceUtil.initTableMapperJob(
                              "ns:test",
                              scan,
                              MyMapper.class,
                              Text.class,
                              Text.class,
                              job);
                job.setReducerClass(MyReducer.class);
                    //设置输出路径
                FileOutputFormat.setOutputPath(job, new Path("/data/output/out"));
                job.waitForCompletion(true);
                System.out.println("finished");
      }
}
执行效果
退出shell环境,检察输出路径下的文件效果。
quit
hadoop fs -cat /data/output/out/part-r-00000
https://i-blog.csdnimg.cn/direct/280ebee6cfc44758a968d61c1a6918b1.png
8.5-2

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【HBase分布式数据库】第七章 数据的导入导出 (2-5)