目录
一、读数据
二、写数据
1、写数据
2、数据刷到磁盘过程
3、数据合并过程
三、Api的使用
四、Phoenix
1、安装Phoenix
2、简单使用
2.1、查看表信息
2.2、创建表
2.3、插入数据
2.4、查看数据
2.5、修改数据
2.6、删除数据
2.7、退出
2.8、导入数据
3、API的使用
五、整合SpringBoot+MyBatis
六、参考
一、读数据
- Client先访问zookeeper,哀求获取Meta表信息。从上面图可以看到Zookeeper中存储了HBase的元数据信息。
- Zookeeper返回meta数据信息。
- 根据namespace、表名和rowkey在meta表的信息去获取数据所在的RegionServer和Region
- 找到这个region对应的regionserver;
- 查找对应的region;
- 先从MemStore找数据,假如没有,再到BlockCache里面读;
- BlockCache还没有,再到StoreFile上读(为了读取的服从);
- 假如是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。
二、写数据
1、写数据
写数据和读数据前几步的过程都是一样的,首先从Zookeeper获取Meta表信息,根据meta表信息确认RegionServer 和Region
- Client向RegionServer发送写哀求。
- RegionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复(避免数据在内存还没刷到磁盘就宕机丢失数据)。
- RegionServer将数据写到内存(MemStore),此时就认为写数据完成了,至于什么时间数据写进HDFS,它不关心。
- 反馈Client写成功。
2、数据刷到磁盘过程
当MemStore数据达到阈值(默认是128M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog(Write Ahead Log)中的汗青数据;并将数据存储到HDFS中;在HLog中做标记点。
3、数据合并过程
- 当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并。
- 当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理。
- 当RegionServer宕机后,将RegionServer上的hlog拆分,然后分配给不同的RegionServer加载,修改.META。
- HLog的数据会同步到HDFS,保证数据的可靠性。
三、Api的使用
- package com.xiaojie.hadoop.utils;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.NamespaceDescriptor;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.filter.FilterList;
- import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
- import org.apache.hadoop.hbase.quotas.QuotaSettings;
- import org.apache.hadoop.hbase.quotas.QuotaType;
- import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.hbase.util.Pair;
- import java.io.IOException;
- import java.util.List;
- /**
- * @author 熟透的蜗牛
- * @version 1.0
- * @description: hbase工具类
- * @date 2025/1/3 9:27
- */
- @Slf4j
- public class HBaseUtil {
- private static Connection connection;
- static {
- Configuration configuration = HBaseConfiguration.create();
- //设置端口号
- configuration.set("hbase.zookeeper.property.clientPort", "2181");
- //设置zk连接
- configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
- //创建连接
- try {
- connection = ConnectionFactory.createConnection(configuration);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @param namespace
- * @description: 创建namespace
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 11:03
- */
- public static boolean createNameSpace(String namespace) {
- try {
- Admin admin = connection.getAdmin();
- NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
- admin.createNamespace(namespaceDescriptor);
- log.info(">>>>>>>>>>>>创建namespace成功");
- return true;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @param namespace
- * @description: 删除ns
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 11:05
- */
- public static boolean deleteNameSpace(String namespace) {
- try {
- Admin admin = connection.getAdmin();
- admin.deleteNamespace(namespace);
- log.info(">>>>>>>>>>>>删除namespace成功");
- return true;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @param tableName 表名
- * @param columnFamilies 列族
- * @description: 创建表
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 9:35
- */
- public static boolean createTable(String tableName, List<String> columnFamilies, String nameSpace) throws IOException {
- Admin admin = connection.getAdmin();
- boolean exists = admin.tableExists(TableName.valueOf(tableName));
- //创建表
- if (!exists) {
- //如果namespace是空值则会使用default,作为命名空间
- TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));
- columnFamilies.forEach(cf -> {
- ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));
- columnFamilyDescriptorBuilder.setMaxVersions(1);
- ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
- tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
- });
- admin.createTable(tableDescriptorBuilder.build());
- return true;
- } else {
- log.info("table exists>>>>>>>>");
- return false;
- }
- }
- /**
- * @param tableName 表名
- * @description: 删除表
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 10:16
- */
- public static boolean deleteTable(String tableName) {
- try {
- Admin admin = connection.getAdmin();
- //先禁用表
- admin.disableTable(TableName.valueOf(tableName));
- //再删除
- admin.deleteTable(TableName.valueOf(tableName));
- return true;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @param tableName 表名
- * @param rowKey rowkey
- * @param columnFamilyName 列族
- * @param qualifier 列标识
- * @param value 数据
- * @description: 插入数据
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 16:46
- */
- public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, String qualifier,
- String value) {
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Put put = new Put(Bytes.toBytes(rowKey));
- put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));
- table.put(put);
- log.info(">>>>>>>插入数据成功");
- return true;
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
- /**
- * @param nameSpace
- * @param tableName
- * @param rowKey
- * @param columnFamilyName
- * @param pairList 键值对集合
- * @description: 插入数据
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 17:32
- */
- public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Put put = new Put(Bytes.toBytes(rowKey));
- pairList.forEach(pair -> {
- put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getFirst()), Bytes.toBytes(pair.getSecond()));
- });
- table.put(put);
- log.info(">>>>>>>插入数据成功");
- return true;
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
- /**
- * @param nameSpace
- * @param tableName
- * @param rowKey
- * @description:根据Rowkey查询
- * @return: org.apache.hadoop.hbase.client.Result
- * @author 熟透的蜗牛
- * @date: 2025/1/3 17:42
- */
- public static Result getRowByRowKey(String nameSpace, String tableName, String rowKey) {
- try {
- Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Result result = table.get(new Get(Bytes.toBytes(rowKey)));
- return result;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @param nameSpace
- * @param tableName
- * @description: 查询所有数据, 和范围查询
- * @return: org.apache.hadoop.hbase.client.ResultScanner
- * @author 熟透的蜗牛
- * @date: 2025/1/3 18:12
- */
- public static ResultScanner getAll(String nameSpace, String tableName) {
- try {
- Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Scan scan = new Scan();
- scan.setCacheBlocks(true);//设置读缓存
- scan.withStartRow(Bytes.toBytes("1002")); //rowkey的起始值
- scan.withStopRow(Bytes.toBytes("1003")); //rowkey的结束值,返回结果不包含该值
- ResultScanner scanner = table.getScanner(scan);
- return scanner;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @param nameSpace
- * @param tableName
- * @param filterList
- * @description: 查找过滤
- * @return: org.apache.hadoop.hbase.client.ResultScanner
- * @author 熟透的蜗牛
- * @date: 2025/1/3 20:47
- */
- public static ResultScanner getScanner(String nameSpace, String tableName, FilterList filterList) {
- try {
- Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Scan scan = new Scan();
- scan.setFilter(filterList);
- ResultScanner scanner = table.getScanner(scan);
- return scanner;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- /**
- * @param tableName
- * @param rowKey
- * @param columnFamily 列族
- * @param qualifier 限定符
- * @description: 获取指定cell数据
- * @return: java.lang.String
- * @author 熟透的蜗牛
- * @date: 2025/1/3 20:59
- */
- public static String getCell(String nameSpace, String tableName, String rowKey, String columnFamily, String qualifier) {
- try {
- Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Get get = new Get(Bytes.toBytes(rowKey));
- if (!get.isCheckExistenceOnly()) {
- get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
- Result result = table.get(get);
- byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
- return Bytes.toString(resultValue);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return null;
- }
- /**
- * @param nameSpace
- * @param tableName
- * @param rowKey
- * @description: 删除一行数据
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 21:34
- */
- public static boolean deleteRow(String nameSpace, String tableName, String rowKey) {
- try {
- Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- table.delete(delete);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return true;
- }
- /**
- * @param nameSpace
- * @param tableName
- * @param rowKey
- * @param familyName
- * @param qualifier
- * @description: 删除指定列
- * @return: boolean
- * @author 熟透的蜗牛
- * @date: 2025/1/3 21:34
- */
- public static boolean deleteColumn(String nameSpace, String tableName, String rowKey, String familyName,
- String qualifier) {
- try {
- Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));
- table.delete(delete);
- table.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return true;
- }
- }
复制代码 四、Phoenix
Phoenix是 HBase 的开源 SQL 中心层,它允许你使用标准 JDBC 的方式来操作 HBase 上的数据。
1、安装Phoenix
- #解压文件
- tar -zxf phoenix-hbase-2.6-5.2.1-bin.tar.gz
- #移动文件
- mv phoenix-hbase-2.6-5.2.1-bin /usr/local/
- #复制jar到regionserver 和master的lib
- cp phoenix-server-hbase-2.6.jar /usr/local/hbase-2.6.1/lib
- #分发到其他服务器
- xsync /usr/local/hbase-2.6.1/
- #重启hbase
- ./hbase.sh stop
- ./hbase.sh start
- #启动phoenix
- /usr/local/phoenix-hbase-2.6-5.2.1-bin/bin/sqlline.py hadoop1,hadoop2,hadoop3
复制代码 2、简单使用
2.1、查看表信息
2.2、创建表
- CREATE TABLE IF NOT EXISTS us_population (
- state CHAR(2) NOT NULL,
- city VARCHAR NOT NULL,
- population BIGINT
- CONSTRAINT my_pk PRIMARY KEY (state, city));
复制代码
2.3、插入数据
- #插入数据
- UPSERT INTO us_population VALUES('NY','New York',8143197);
复制代码 2.4、查看数据
- select * from us_population;
复制代码 2.5、修改数据
- UPSERT INTO us_population VALUES('NY','New York',999999);
复制代码 2.6、删除数据
- DELETE FROM us_population WHERE city='New York';
- #删除表
- drop table us_population;
复制代码 2.7、退出
2.8、导入数据
us_population.csv
- NY,New York,8143197
- CA,Los Angeles,3844829
- IL,Chicago,2842518
- TX,Houston,2016582
- PA,Philadelphia,1463281
- AZ,Phoenix,1461575
- TX,San Antonio,1256509
- CA,San Diego,1255540
- TX,Dallas,1213825
- CA,San Jose,912332
复制代码
us_population.sql
- CREATE TABLE IF NOT EXISTS us_population (
- state CHAR(2) NOT NULL,
- city VARCHAR NOT NULL,
- population BIGINT
- CONSTRAINT my_pk PRIMARY KEY (state, city));
复制代码- #导入数据
- ./psql.py hadoop1,hadoop2,hadoop3 us_population.sql us_population.csv
- #查看数据
- select * from us_population;
- #查询数据
- SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
- FROM us_population
- GROUP BY state
- ORDER BY sum(population) DESC;
复制代码
3、API的使用
- package com.xiaojie;
- import java.sql.*;
- /**
- * @author 熟透的蜗牛
- * @version 1.0
- * @description: 测试phoenix
- * @date 2025/1/3 23:08
- */
- public class PhoenixApi {
- public static void main(String[] args) throws ClassNotFoundException, SQLException {
- //1、加载驱动
- Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
- //2、创建连接
- String url = "jdbc:phoenix:hadoop1,hadoop2,hadoop3";
- Connection connect = DriverManager.getConnection(url);
- //3、创建查询
- PreparedStatement preparedStatement = connect.prepareStatement("SELECT * FROM us_population");
- //4、遍历结果
- ResultSet resultSet = preparedStatement.executeQuery();
- while (resultSet.next()) {
- System.out.println(resultSet.getString("city") + " "
- + resultSet.getInt("population"));
- }
- //5、关闭资源
- preparedStatement.close();
- resultSet.close();
- connect.close();
- }
- }
复制代码 五、整合SpringBoot+MyBatis
- package com.xiaojie.hadoop.hbase.phoenix.dao;
- import com.xiaojie.hadoop.hbase.phoenix.bean.USPopulation;
- import org.apache.ibatis.annotations.*;
- import java.util.List;
- @Mapper
- public interface PopulationDao {
- @Select("SELECT * from us_population")
- List<USPopulation> queryAll();
- @Insert("UPSERT INTO us_population VALUES( #{state}, #{city}, #{population} )")
- void save(USPopulation USPopulation);
- @Select("SELECT * FROM us_population WHERE state=#{state} AND city = #{city}")
- USPopulation queryByStateAndCity(String state, String city);
- @Delete("DELETE FROM us_population WHERE state=#{state} AND city = #{city}")
- void deleteByStateAndCity(String state, String city);
- }
复制代码- server:
- port: 9999
- spring:
- application:
- name: hadoop
- datasource:
- #zookeeper地址
- url: jdbc:phoenix:hadoop1,hadoop2,hadoop3
- driver-class-name: org.apache.phoenix.jdbc.PhoenixDriver
- type: com.zaxxer.hikari.HikariDataSource
- hikari:
- # 池中维护的最小空闲连接数
- minimum-idle: 10
- # 池中最大连接数,包括闲置和使用中的连接
- maximum-pool-size: 20
- # 此属性控制从池返回的连接的默认自动提交行为。默认为true
- auto-commit: true
- # 允许最长空闲时间
- idle-timeout: 30000
- # 此属性表示连接池的用户定义名称,主要显示在日志记录和JMX管理控制台中,以标识池和池配置。 默认值:自动生成
- pool-name: custom-hikari
- #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
- max-lifetime: 1800000
- # 数据库连接超时时间,默认30秒,即30000
- connection-timeout: 30000
- # 连接测试sql 这个地方需要根据数据库方言差异而配置 例如 oracle 就应该写成 select 1 from dual
- connection-test-query: SELECT 1
- wssnail:
- hdfs:
- url: hdfs://hadoop1:8020
- user-name: root
- # mybatis 相关配置
- mybatis:
- configuration:
- # 是否打印sql语句 调试的时候可以开启
- log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
复制代码 六、参考
https://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html
完整代码:spring-boot: Springboot整合redis、消息中心件等相关代码 - Gitee.com
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |