ToB企服应用市场:ToB评测及商务社交产业平台

标题: HBase Shell操作&Flink写入HBase [打印本页]

作者: 天空闲话    时间: 2024-3-3 03:07
标题: HBase Shell操作&Flink写入HBase
一、HBase Shell操作

1、基本操作

1)进入HBase客户端命令行
  1.     [root@bigdata1 hbase]$ bin/hbase shell
复制代码
2)查看帮助命令
  1.     hbase(main):001:0> help
复制代码
3)查看当前数据库中有哪些表
  1.     hbase(main):002:0> list
复制代码
2、表的操作

1)创建表
  1.     hbase(main):002:0> create 'student','info'
复制代码
2)插入数据到表
  1.     hbase(main):003:0> put 'student','1001','info:sex','male'
  2.     hbase(main):004:0> put 'student','1001','info:age','18'
  3.     hbase(main):005:0> put 'student','1002','info:name','Janna'
  4.     hbase(main):006:0> put 'student','1002','info:sex','female'
  5.     hbase(main):007:0> put 'student','1002','info:age','20'
复制代码
3)扫描查看表数据
  1.     hbase(main):008:0> scan 'student'
  2.     hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW  => '1001'}
  3.     hbase(main):010:0> scan 'student',{STARTROW => '1001'}
复制代码
4)查看表结构
  1.     hbase(main):011:0> describe 'student'
复制代码
5)更新指定字段的数据
  1.     hbase(main):012:0> put 'student','1001','info:name','Nick'
  2.     hbase(main):013:0> put 'student','1001','info:age','100'
复制代码
6)查看“指定行”或“指定列族:列”的数据
  1.     hbase(main):014:0> get 'student','1001'
  2.     hbase(main):015:0> get 'student','1001','info:name'
复制代码
7)统计表数据行数
  1.     hbase(main):021:0> count 'student'
复制代码
8)删除数据
  1.     删除某rowkey的全部数据:
  2.     hbase(main):016:0> deleteall 'student','1001'
  3.     删除某rowkey的某一列数据:
  4.     hbase(main):017:0> delete 'student','1002','info:sex'
复制代码
9)清空表数据
  1.     hbase(main):018:0> truncate 'student'
  2.     提示:清空表的操作顺序为先disable,然后再truncate。
复制代码
10)删除表
  1.     首先需要先让该表为disable状态:
  2.     hbase(main):019:0> disable 'student'
  3.     然后才能drop这个表:
  4.     hbase(main):020:0> drop 'student'
  5.     提示:如果直接drop表,会报错:ERROR: Table student is enabled. Disable it first.
复制代码
11)变更表信息
  1.     将info列族中的数据存放3个版本:
  2.     hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
  3.     hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
复制代码
二、Flink整合HBase写入操作

现在需要将Flink处理的数据存入HBase数据库(namespace)shtd_result的order_info表中,rowkey为id的值,然后在Linux的HBase shell命令行中查询列consignee,并查询出任意5条
表空间为:shtd_result,表为order_info,列族为:info
表结构为:
字段类型注释rowkeystringHBase的主键,值为ididbigintconsigneestringconsignee_telstringfinal_total_amountdoubleorder_statusstringuser_idbigintdelivery_addressstringorder_commentstringout_trade_nostringtrade_bodystringcreate_timestring转成yyyy-MM-dd hh:mm:ss格式的的字符串operate_timestring转成yyyy-MM-dd hh:mm:ss格式的的字符串expire_timestring转成yyyy-MM-dd hh:mm:ss格式的的字符串tracking_nostringparent_order_idbigintimg_urlstringprovince_idintbenefit_reduce_amountdouble我们需要写一个WriteToHBase类,集成自RichSinkFunction,RichSinkFunction 是一个抽象类,提供了一个更为丰富的接口,用于实现自定义的 Sink(接收器)功能。
在Scala api中RichSinkFunction的主要方法有open,invoke以及close。
了解了这些方法后,我们来写一下WriteToHBase
一、WriteToHBase的实现
  1.   class WriteToHBase extends RichSinkFunction[OrderData] {
  2.     @transient private var connection: Connection = _
  3.     @transient private var table: Table = _
  4.     override def open(parameters: Configuration): Unit = {
  5.       val config = HBaseConfiguration.create()
  6.       // 设置HBase配置, 如Zookeeper地址等
  7.        config.set("hbase.zookeeper.quorum", "bigdata1:2181")
  8.       connection = ConnectionFactory.createConnection(config)
  9.       table = connection.getTable(TableName.valueOf("shtd_result:order_info"))
  10.     }
  11.     override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  12.       // 将 id 转换为行键(假设 id 是唯一的)
  13.       val rowKey = Bytes.toBytes(value.id.toString)
  14.       // 为该行创建一个新的 Put 实例
  15.       val put = new Put(rowKey)
  16.       // 向 Put 实例中添加列
  17.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee"), Bytes.toBytes(value.consignee))
  18.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("consignee_tel"), Bytes.toBytes(value.consignee_tel))
  19.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("final_total_amount"), Bytes.toBytes(value.final_total_amount))
  20.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_status"), Bytes.toBytes(value.order_status))
  21.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_id"), Bytes.toBytes(value.user_id))
  22.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("delivery_address"), Bytes.toBytes(value.delivery_address))
  23.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("order_comment"), Bytes.toBytes(value.order_comment))
  24.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("out_trade_no"), Bytes.toBytes(value.out_trade_no))
  25.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("trade_body"), Bytes.toBytes(value.trade_body))
  26.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("operate_time"), Bytes.toBytes(value.operate_time))
  27.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("expire_time"), Bytes.toBytes(value.expire_time))
  28.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("tracking_no"), Bytes.toBytes(value.tracking_no))
  29.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parent_order_id"), Bytes.toBytes(value.parent_order_id))
  30.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("img_url"), Bytes.toBytes(value.img_url))
  31.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province_id"), Bytes.toBytes(value.province_id))
  32.       put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("benefit_reduce_amount"), Bytes.toBytes(value.benefit_reduce_amount))
  33.       table.put(put)
  34.     }
  35.     override def close(): Unit = {
  36.       if (table != null) {
  37.         table.close()
  38.       }
  39.       if (connection != null) {
  40.         connection.close()
  41.       }
  42.     }
  43.   }
复制代码
在 Scala 和 Java 中,@transient 关键字用于标记一个类的成员变量为“暂时的”(transient),这意味着这个变量不会被默认的序列化过程序列化。
在 Flink中,通常用于:
在 WriteToHBase 类中,connection 和 table 作为 HBase 的连接和表实例,通常不支持序列化,也不应该被序列化。所以,它们被标记为 @transient。
上面的向 Put 实例中添加列过于冗长,可以用反射来代替:
  1. def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
  2.   //获取运行时镜像和实例镜像
  3.   val mirror = runtimeMirror(getClass.getClassLoader)
  4.   val instanceMirror = mirror.reflect(data)
  5.   //获取类成员并过滤方法
  6.   val members = typeOf[T].members.sorted.filterNot(_.isMethod)
  7.   //遍历字段并添加到 Put 实例
  8.   members.foreach { m =>
  9.     val fieldMirror = instanceMirror.reflectField(m.asTerm)
  10.     val name = m.name.toString.trim
  11.     val value = fieldMirror.get.toString
  12.     put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  13.   }
  14. }
  15. override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  16.   val rowKey = Bytes.toBytes(value.id.toString)
  17.   val put = new Put(rowKey)
  18.   val infoCF = "info"
  19.   // 使用反射自动添加列
  20.   addColumnsUsingReflection(put, infoCF, value)
  21.   table.put(put)
  22. }
复制代码
现在来解释一下这段代码:
addColumnsUsingReflection 函数定义
  1. def addColumnsUsingReflection[T: TypeTag](put: Put, cf: String, data: T): Unit = {
复制代码
获取运行时镜像和实例镜像
  1.   val mirror = runtimeMirror(getClass.getClassLoader)
  2.   val instanceMirror = mirror.reflect(data)
复制代码
获取类成员并过滤方法
  1.   val members = typeOf[T].members.sorted.filterNot(_.isMethod)
复制代码
遍历字段并添加到 Put 实例
  1.   members.foreach { m =>
  2.     val fieldMirror = instanceMirror.reflectField(m.asTerm)
  3.     val name = m.name.toString.trim
  4.     val value = fieldMirror.get.toString
  5.     put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(name), Bytes.toBytes(value))
  6.   }
复制代码
invoke 方法
  1. override def invoke(value: OrderData, context: SinkFunction.Context): Unit = {
  2.   val rowKey = Bytes.toBytes(value.id.toString)
  3.   val put = new Put(rowKey)
  4.   val infoCF = "info"
  5.   addColumnsUsingReflection(put, infoCF, value)
  6.   table.put(put)
  7. }
复制代码
这段代码通过反射自动化了向 HBase Put 实例添加数据的过程,避免了手动为每个字段编写重复代码的需要。
然后我们需要对于dataStream应用刚才写的 WriteToHBase 类
应用 WriteToHBase 类
  1. dataStream.addSink(new WriteToHBase)
复制代码
二、HBase Shell操作

1. 启动 HBase Shell

首先,我们需要进入 HBase Shell。在命令行中输入:
  1. hbase shell
复制代码
2. 创建命名空间

如果命名空间 shtd_result 还不存在,需要先创建它。在 HBase Shell 中执行以下命令:
  1. create_namespace 'shtd_result'
复制代码
3. 创建表

接着,创建表 order_info。我们需要定义至少一个列族(在这个示例中,我将使用 info 作为列族名)。在 HBase Shell 中执行以下命令:
  1. create 'shtd_result:order_info', 'info'
复制代码
这里,'shtd_resultrder_info' 指定了完整的表名(包括命名空间),而 'info' 是列族名。
4. 验证表创建

最后,您可以列出所有表来验证新表是否已成功创建:
  1. list
复制代码
5. 查询
  1. scan 'shtd_result:order_info', {COLUMNS => ['info:consignee'], LIMIT => 5}
复制代码
这里的 scan 命令用于扫描 shtd_resultrder_info 表,COLUMNS 参数指定我们只关心 info:consignee 列(假设 consignee 存储在名为 info 的列族中),而 LIMIT => 5 指定我们只查看 5 条记录。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4