大数据技术(八)—— HBase数据读写流程和Api的使用

打印 上一主题 下一主题

主题 838|帖子 838|积分 2529

目录
一、读数据
二、写数据
1、写数据
2、数据刷到磁盘过程
3、数据合并过程
三、Api的使用
四、Phoenix
1、安装Phoenix
2、简单使用
2.1、查看表信息
2.2、创建表
2.3、插入数据
2.4、查看数据
2.5、修改数据
2.6、删除数据
2.7、退出
2.8、导入数据
3、API的使用
五、整合SpringBoot+MyBatis
六、参考


一、读数据







  • Client先访问zookeeper,哀求获取Meta表信息。从上面图可以看到Zookeeper中存储了HBase的元数据信息。
  • Zookeeper返回meta数据信息。
  • 根据namespace、表名和rowkey在meta表的信息去获取数据所在的RegionServer和Region
  • 找到这个region对应的regionserver;
  • 查找对应的region;
  • 先从MemStore找数据,假如没有,再到BlockCache里面读;
  • BlockCache还没有,再到StoreFile上读(为了读取的服从);
  • 假如是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。
二、写数据

1、写数据



写数据和读数据前几步的过程都是一样的,首先从Zookeeper获取Meta表信息,根据meta表信息确认RegionServer 和Region

  • Client向RegionServer发送写哀求。
  • RegionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复(避免数据在内存还没刷到磁盘就宕机丢失数据)。
  • RegionServer将数据写到内存(MemStore),此时就认为写数据完成了,至于什么时间数据写进HDFS,它不关心。
  • 反馈Client写成功。
2、数据刷到磁盘过程

当MemStore数据达到阈值(默认是128M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog(Write Ahead Log)中的汗青数据;并将数据存储到HDFS中;在HLog中做标记点。
3、数据合并过程


  • 当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并。
  • 当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理。
  • 当RegionServer宕机后,将RegionServer上的hlog拆分,然后分配给不同的RegionServer加载,修改.META。
  • HLog的数据会同步到HDFS,保证数据的可靠性。
三、Api的使用

  1. package com.xiaojie.hadoop.utils;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.HBaseConfiguration;
  6. import org.apache.hadoop.hbase.NamespaceDescriptor;
  7. import org.apache.hadoop.hbase.TableName;
  8. import org.apache.hadoop.hbase.client.*;
  9. import org.apache.hadoop.hbase.filter.FilterList;
  10. import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
  11. import org.apache.hadoop.hbase.quotas.QuotaSettings;
  12. import org.apache.hadoop.hbase.quotas.QuotaType;
  13. import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
  14. import org.apache.hadoop.hbase.util.Bytes;
  15. import org.apache.hadoop.hbase.util.Pair;
  16. import java.io.IOException;
  17. import java.util.List;
  18. /**
  19. * @author 熟透的蜗牛
  20. * @version 1.0
  21. * @description: hbase工具类
  22. * @date 2025/1/3 9:27
  23. */
  24. @Slf4j
  25. public class HBaseUtil {
  26.     private static Connection connection;
  27.     static {
  28.         Configuration configuration = HBaseConfiguration.create();
  29.         //设置端口号
  30.         configuration.set("hbase.zookeeper.property.clientPort", "2181");
  31.         //设置zk连接
  32.         configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
  33.         //创建连接
  34.         try {
  35.             connection = ConnectionFactory.createConnection(configuration);
  36.         } catch (IOException e) {
  37.             throw new RuntimeException(e);
  38.         }
  39.     }
  40.     /**
  41.      * @param namespace
  42.      * @description: 创建namespace
  43.      * @return: boolean
  44.      * @author 熟透的蜗牛
  45.      * @date: 2025/1/3 11:03
  46.      */
  47.     public static boolean createNameSpace(String namespace) {
  48.         try {
  49.             Admin admin = connection.getAdmin();
  50.             NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
  51.             admin.createNamespace(namespaceDescriptor);
  52.             log.info(">>>>>>>>>>>>创建namespace成功");
  53.             return true;
  54.         } catch (IOException e) {
  55.             throw new RuntimeException(e);
  56.         }
  57.     }
  58.     /**
  59.      * @param namespace
  60.      * @description: 删除ns
  61.      * @return: boolean
  62.      * @author 熟透的蜗牛
  63.      * @date: 2025/1/3 11:05
  64.      */
  65.     public static boolean deleteNameSpace(String namespace) {
  66.         try {
  67.             Admin admin = connection.getAdmin();
  68.             admin.deleteNamespace(namespace);
  69.             log.info(">>>>>>>>>>>>删除namespace成功");
  70.             return true;
  71.         } catch (IOException e) {
  72.             throw new RuntimeException(e);
  73.         }
  74.     }
  75.     /**
  76.      * @param tableName      表名
  77.      * @param columnFamilies 列族
  78.      * @description: 创建表
  79.      * @return: boolean
  80.      * @author 熟透的蜗牛
  81.      * @date: 2025/1/3 9:35
  82.      */
  83.     public static boolean createTable(String tableName, List<String> columnFamilies, String nameSpace) throws IOException {
  84.         Admin admin = connection.getAdmin();
  85.         boolean exists = admin.tableExists(TableName.valueOf(tableName));
  86.         //创建表
  87.         if (!exists) {
  88.             //如果namespace是空值则会使用default,作为命名空间
  89.             TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));
  90.             columnFamilies.forEach(cf -> {
  91.                 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
  92.                 columnFamilyDescriptorBuilder.setMaxVersions(1);
  93.                 ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
  94.                 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
  95.             });
  96.             admin.createTable(tableDescriptorBuilder.build());
  97.             return true;
  98.         } else {
  99.             log.info("table exists>>>>>>>>");
  100.             return false;
  101.         }
  102.     }
  103.     /**
  104.      * @param tableName 表名
  105.      * @description: 删除表
  106.      * @return: boolean
  107.      * @author 熟透的蜗牛
  108.      * @date: 2025/1/3 10:16
  109.      */
  110.     public static boolean deleteTable(String tableName) {
  111.         try {
  112.             Admin admin = connection.getAdmin();
  113.             //先禁用表
  114.             admin.disableTable(TableName.valueOf(tableName));
  115.             //再删除
  116.             admin.deleteTable(TableName.valueOf(tableName));
  117.             return true;
  118.         } catch (IOException e) {
  119.             throw new RuntimeException(e);
  120.         }
  121.     }
  122.     /**
  123.      * @param tableName        表名
  124.      * @param rowKey           rowkey
  125.      * @param columnFamilyName 列族
  126.      * @param qualifier        列标识
  127.      * @param value            数据
  128.      * @description: 插入数据
  129.      * @return: boolean
  130.      * @author 熟透的蜗牛
  131.      * @date: 2025/1/3 16:46
  132.      */
  133.     public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, String qualifier,
  134.                                  String value) {
  135.         Table table = null;
  136.         try {
  137.             table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  138.             Put put = new Put(Bytes.toBytes(rowKey));
  139.             put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
  140.             table.put(put);
  141.             log.info(">>>>>>>插入数据成功");
  142.             return true;
  143.         } catch (IOException e) {
  144.             throw new RuntimeException(e);
  145.         } finally {
  146.             if (table != null) {
  147.                 try {
  148.                     table.close();
  149.                 } catch (IOException e) {
  150.                     throw new RuntimeException(e);
  151.                 }
  152.             }
  153.         }
  154.     }
  155.     /**
  156.      * @param nameSpace
  157.      * @param tableName
  158.      * @param rowKey
  159.      * @param columnFamilyName
  160.      * @param pairList         键值对集合
  161.      * @description: 插入数据
  162.      * @return: boolean
  163.      * @author 熟透的蜗牛
  164.      * @date: 2025/1/3 17:32
  165.      */
  166.     public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
  167.         Table table = null;
  168.         try {
  169.             table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  170.             Put put = new Put(Bytes.toBytes(rowKey));
  171.             pairList.forEach(pair -> {
  172.                 put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getFirst()), Bytes.toBytes(pair.getSecond()));
  173.             });
  174.             table.put(put);
  175.             log.info(">>>>>>>插入数据成功");
  176.             return true;
  177.         } catch (IOException e) {
  178.             throw new RuntimeException(e);
  179.         } finally {
  180.             if (table != null) {
  181.                 try {
  182.                     table.close();
  183.                 } catch (IOException e) {
  184.                     throw new RuntimeException(e);
  185.                 }
  186.             }
  187.         }
  188.     }
  189.     /**
  190.      * @param nameSpace
  191.      * @param tableName
  192.      * @param rowKey
  193.      * @description:根据Rowkey查询
  194.      * @return: org.apache.hadoop.hbase.client.Result
  195.      * @author 熟透的蜗牛
  196.      * @date: 2025/1/3 17:42
  197.      */
  198.     public static Result getRowByRowKey(String nameSpace, String tableName, String rowKey) {
  199.         try {
  200.             Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  201.             Result result = table.get(new Get(Bytes.toBytes(rowKey)));
  202.             return result;
  203.         } catch (IOException e) {
  204.             throw new RuntimeException(e);
  205.         }
  206.     }
  207.     /**
  208.      * @param nameSpace
  209.      * @param tableName
  210.      * @description: 查询所有数据, 和范围查询
  211.      * @return: org.apache.hadoop.hbase.client.ResultScanner
  212.      * @author 熟透的蜗牛
  213.      * @date: 2025/1/3 18:12
  214.      */
  215.     public static ResultScanner getAll(String nameSpace, String tableName) {
  216.         try {
  217.             Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  218.             Scan scan = new Scan();
  219.             scan.setCacheBlocks(true);//设置读缓存
  220.             scan.withStartRow(Bytes.toBytes("1002")); //rowkey的起始值
  221.             scan.withStopRow(Bytes.toBytes("1003"));  //rowkey的结束值,返回结果不包含该值
  222.             ResultScanner scanner = table.getScanner(scan);
  223.             return scanner;
  224.         } catch (IOException e) {
  225.             throw new RuntimeException(e);
  226.         }
  227.     }
  228.     /**
  229.      * @param nameSpace
  230.      * @param tableName
  231.      * @param filterList
  232.      * @description: 查找过滤
  233.      * @return: org.apache.hadoop.hbase.client.ResultScanner
  234.      * @author 熟透的蜗牛
  235.      * @date: 2025/1/3 20:47
  236.      */
  237.     public static ResultScanner getScanner(String nameSpace, String tableName, FilterList filterList) {
  238.         try {
  239.             Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  240.             Scan scan = new Scan();
  241.             scan.setFilter(filterList);
  242.             ResultScanner scanner = table.getScanner(scan);
  243.             return scanner;
  244.         } catch (IOException e) {
  245.             throw new RuntimeException(e);
  246.         }
  247.     }
  248.     /**
  249.      * @param tableName
  250.      * @param rowKey
  251.      * @param columnFamily 列族
  252.      * @param qualifier    限定符
  253.      * @description: 获取指定cell数据
  254.      * @return: java.lang.String
  255.      * @author 熟透的蜗牛
  256.      * @date: 2025/1/3 20:59
  257.      */
  258.     public static String getCell(String nameSpace, String tableName, String rowKey, String columnFamily, String qualifier) {
  259.         try {
  260.             Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  261.             Get get = new Get(Bytes.toBytes(rowKey));
  262.             if (!get.isCheckExistenceOnly()) {
  263.                 get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
  264.                 Result result = table.get(get);
  265.                 byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
  266.                 return Bytes.toString(resultValue);
  267.             }
  268.         } catch (IOException e) {
  269.             throw new RuntimeException(e);
  270.         }
  271.         return null;
  272.     }
  273.     /**
  274.      * @param nameSpace
  275.      * @param tableName
  276.      * @param rowKey
  277.      * @description: 删除一行数据
  278.      * @return: boolean
  279.      * @author 熟透的蜗牛
  280.      * @date: 2025/1/3 21:34
  281.      */
  282.     public static boolean deleteRow(String nameSpace, String tableName, String rowKey) {
  283.         try {
  284.             Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  285.             Delete delete = new Delete(Bytes.toBytes(rowKey));
  286.             table.delete(delete);
  287.         } catch (IOException e) {
  288.             e.printStackTrace();
  289.         }
  290.         return true;
  291.     }
  292.     /**
  293.      * @param nameSpace
  294.      * @param tableName
  295.      * @param rowKey
  296.      * @param familyName
  297.      * @param qualifier
  298.      * @description: 删除指定列
  299.      * @return: boolean
  300.      * @author 熟透的蜗牛
  301.      * @date: 2025/1/3 21:34
  302.      */
  303.     public static boolean deleteColumn(String nameSpace, String tableName, String rowKey, String familyName,
  304.                                        String qualifier) {
  305.         try {
  306.             Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
  307.             Delete delete = new Delete(Bytes.toBytes(rowKey));
  308.             delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
  309.             table.delete(delete);
  310.             table.close();
  311.         } catch (IOException e) {
  312.             e.printStackTrace();
  313.         }
  314.         return true;
  315.     }
  316. }
复制代码
四、Phoenix

Phoenix是 HBase 的开源 SQL 中心层,它允许你使用标准 JDBC 的方式来操作 HBase 上的数据。
1、安装Phoenix

  1. #解压文件
  2. tar -zxf phoenix-hbase-2.6-5.2.1-bin.tar.gz
  3. #移动文件
  4. mv phoenix-hbase-2.6-5.2.1-bin /usr/local/
  5. #复制jar到regionserver 和master的lib
  6. cp phoenix-server-hbase-2.6.jar /usr/local/hbase-2.6.1/lib
  7. #分发到其他服务器
  8. xsync /usr/local/hbase-2.6.1/
  9. #重启hbase
  10. ./hbase.sh stop
  11. ./hbase.sh start
  12. #启动phoenix
  13. /usr/local/phoenix-hbase-2.6-5.2.1-bin/bin/sqlline.py hadoop1,hadoop2,hadoop3
复制代码
2、简单使用

2.1、查看表信息

  1. #查看表信息
  2. !tables
复制代码


2.2、创建表

  1. CREATE TABLE IF NOT EXISTS us_population (
  2.       state CHAR(2) NOT NULL,
  3.       city VARCHAR NOT NULL,
  4.       population BIGINT
  5.       CONSTRAINT my_pk PRIMARY KEY (state, city));
复制代码


2.3、插入数据

  1. #插入数据
  2. UPSERT INTO us_population VALUES('NY','New York',8143197);
复制代码
2.4、查看数据

  1. select * from us_population;
复制代码
2.5、修改数据

  1. UPSERT INTO us_population VALUES('NY','New York',999999);
复制代码
2.6、删除数据

  1. DELETE FROM us_population WHERE city='New York';
  2. #删除表
  3. drop table us_population;
复制代码
2.7、退出

  1. !quit
复制代码
2.8、导入数据

us_population.csv
  1. NY,New York,8143197
  2. CA,Los Angeles,3844829
  3. IL,Chicago,2842518
  4. TX,Houston,2016582
  5. PA,Philadelphia,1463281
  6. AZ,Phoenix,1461575
  7. TX,San Antonio,1256509
  8. CA,San Diego,1255540
  9. TX,Dallas,1213825
  10. CA,San Jose,912332
复制代码

us_population.sql
  1. CREATE TABLE IF NOT EXISTS us_population (
  2.   state CHAR(2) NOT NULL,
  3.   city VARCHAR NOT NULL,
  4.   population BIGINT
  5.   CONSTRAINT my_pk PRIMARY KEY (state, city));
复制代码
  1. #导入数据
  2. ./psql.py hadoop1,hadoop2,hadoop3 us_population.sql us_population.csv
  3. #查看数据
  4. select * from us_population;
  5. #查询数据
  6. SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
  7. FROM us_population
  8. GROUP BY state
  9. ORDER BY sum(population) DESC;
复制代码


3、API的使用

  1. package com.xiaojie;
  2. import java.sql.*;
  3. /**
  4. * @author 熟透的蜗牛
  5. * @version 1.0
  6. * @description: 测试phoenix
  7. * @date 2025/1/3 23:08
  8. */
  9. public class PhoenixApi {
  10.     public static void main(String[] args) throws ClassNotFoundException, SQLException {
  11.         //1、加载驱动
  12.         Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
  13.         //2、创建连接
  14.         String url = "jdbc:phoenix:hadoop1,hadoop2,hadoop3";
  15.         Connection connect = DriverManager.getConnection(url);
  16.         //3、创建查询
  17.         PreparedStatement preparedStatement = connect.prepareStatement("SELECT * FROM us_population");
  18.         //4、遍历结果
  19.         ResultSet resultSet = preparedStatement.executeQuery();
  20.         while (resultSet.next()) {
  21.             System.out.println(resultSet.getString("city") + " "
  22.                                + resultSet.getInt("population"));
  23.         }
  24.         //5、关闭资源
  25.         preparedStatement.close();
  26.         resultSet.close();
  27.         connect.close();
  28.     }
  29. }
复制代码
五、整合SpringBoot+MyBatis

  1. package com.xiaojie.hadoop.hbase.phoenix.dao;
  2. import com.xiaojie.hadoop.hbase.phoenix.bean.USPopulation;
  3. import org.apache.ibatis.annotations.*;
  4. import java.util.List;
  5. @Mapper
  6. public interface PopulationDao {
  7.     @Select("SELECT * from us_population")
  8.     List<USPopulation> queryAll();
  9.     @Insert("UPSERT INTO us_population VALUES( #{state}, #{city}, #{population} )")
  10.     void save(USPopulation USPopulation);
  11.     @Select("SELECT * FROM us_population WHERE state=#{state} AND city = #{city}")
  12.     USPopulation queryByStateAndCity(String state, String city);
  13.     @Delete("DELETE FROM us_population WHERE state=#{state} AND city = #{city}")
  14.     void deleteByStateAndCity(String state, String city);
  15. }
复制代码
  1. server:
  2.   port: 9999
  3. spring:
  4.   application:
  5.     name: hadoop
  6.   datasource:
  7.     #zookeeper地址
  8.     url: jdbc:phoenix:hadoop1,hadoop2,hadoop3
  9.     driver-class-name: org.apache.phoenix.jdbc.PhoenixDriver
  10.     type: com.zaxxer.hikari.HikariDataSource
  11.     hikari:
  12.       # 池中维护的最小空闲连接数
  13.       minimum-idle: 10
  14.       # 池中最大连接数,包括闲置和使用中的连接
  15.       maximum-pool-size: 20
  16.       # 此属性控制从池返回的连接的默认自动提交行为。默认为true
  17.       auto-commit: true
  18.       # 允许最长空闲时间
  19.       idle-timeout: 30000
  20.       # 此属性表示连接池的用户定义名称,主要显示在日志记录和JMX管理控制台中,以标识池和池配置。 默认值:自动生成
  21.       pool-name: custom-hikari
  22.       #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
  23.       max-lifetime: 1800000
  24.       # 数据库连接超时时间,默认30秒,即30000
  25.       connection-timeout: 30000
  26.       # 连接测试sql 这个地方需要根据数据库方言差异而配置 例如 oracle 就应该写成  select 1 from dual
  27.       connection-test-query: SELECT 1
  28. wssnail:
  29.   hdfs:
  30.     url: hdfs://hadoop1:8020
  31.     user-name: root
  32. # mybatis 相关配置
  33. mybatis:
  34.   configuration:
  35.     # 是否打印sql语句 调试的时候可以开启
  36.     log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
复制代码
六、参考

https://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html
完整代码:spring-boot: Springboot整合redis、消息中心件等相关代码 - Gitee.com

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连全瓷种植牙齿制作中心

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表