'林子雨大数据' 实验3 HBase操作与接口编程

打印 上一主题 下一主题

主题 697|帖子 697|积分 2091

“林子雨大数据” 实验3 HBase操作与接口编程

环境搭建

VM虚拟机和Ubuntu系统的安装
在Windows中使用VirtualBox安装Ubuntu虚拟机(2020年7月版本)_厦大数据库实验室博客 (xmu.edu.cn)
Hadoop安装(伪分布式)
Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)_厦大数据库实验室博客 (xmu.edu.cn)
HBase安装(伪分布式)
HBase2.2.2安装和编程实践指南_厦大数据库实验室博客 (xmu.edu.cn)
通过NAT转发使本地主机连接虚拟机
PowerShell SSH 连接 VirtualBox Ubuntu 虚拟机的具体步骤 - 小能日记 - 博客园 (cnblogs.com)
文件传输工具
FlashFXP - Secure FTP Client Software for Windows. Upload, Download, and Synchronize your files.
在VSCODE中使用SSH进行远程开发
在VScode中使用SSH进行远程开发_vscode ssh_Shipmaster_23的博客-CSDN博客
Linux中安装go环境并运行
【golang】linux中安装go环境并运行_linux 运行golang_沉默小管的博客-CSDN博客
理解Hbase

我终于看懂了HBase,太不容易了... - 知乎 (zhihu.com)
第三方软件包

tsuna/gohbase: Pure-Go HBase client (github.com)
hrpc package - github.com/tsuna/gohbase/hrpc - Go Packages
gin框架 · Go语言中文文档 (topgoer.com)
遇到过的问题

go build command fails with "$GOPATH/go.mod exists but should not" · Issue #38896 · golang/go (github.com)
Infinite Getting code actions from ''Go', 'Go', 'Go'' on save · Issue #3105 · microsoft/vscode-go (github.com)
Linux 查看端口占用情况 | 菜鸟教程 (runoob.com)
接口编程

实验过程

创建三个表

对应第2题答案
  1. create 'student','S_No','S_Name','S_Sex','S_Age'
  2. put 'student','s001','S_No','2015001'
  3. put 'student','s001','S_Name','zhangsan'
  4. put 'student','s001','S_Sex','male'
  5. put 'student','s001','S_Age','23'
  6. put 'student','s002','S_No','2015002'
  7. put 'student','s002','S_Name','Mary'
  8. put 'student','s002','S_Sex','female'
  9. put 'student','s002','S_Age','22'
  10. put 'student','s003','S_No','2015003'
  11. put 'student','s003','S_Name','Lisi'
  12. put 'student','s003','S_Sex','male'
  13. put 'student','s003','S_Age','24'
  14. create 'course','C_No','C_Name','C_Credit'
  15. put 'course','c001','C_No','123001'
  16. put 'course','c001','C_Name','Math'
  17. put 'course','c001','C_Credit','2.0'
  18. put 'course','c002','C_No','123002'
  19. put 'course','c002','C_Name','Computer'
  20. put 'course','c002','C_Credit','5.0'
  21. put 'course','c003','C_No','123003'
  22. put 'course','c003','C_Name','English'
  23. put 'course','c003','C_Credit','3.0'
  24. create 'SC','SC_Sno','SC_Cno','SC_Score'
  25. put 'SC','sc001','SC_Sno','2015001'
  26. put 'SC','sc001','SC_Cno','123001'
  27. put 'SC','sc001','SC_Score','86'
  28. put 'SC','sc002','SC_Sno','2015001'
  29. put 'SC','sc002','SC_Cno','123002'
  30. put 'SC','sc002','SC_Score','77'
  31. put 'SC','sc003','SC_Sno','2015002'
  32. put 'SC','sc003','SC_Cno','123002'
  33. put 'SC','sc003','SC_Score','77'
  34. put 'SC','sc004','SC_Sno','2015002'
  35. put 'SC','sc004','SC_Cno','123003'
  36. put 'SC','sc004','SC_Score','99'
  37. put 'SC','sc005','SC_Sno','2015003'
  38. put 'SC','sc005','SC_Cno','123001'
  39. put 'SC','sc005','SC_Score','98'
  40. put 'SC','sc006','SC_Sno','2015003'
  41. put 'SC','sc006','SC_Cno','123002'
  42. put 'SC','sc006','SC_Score','95'
复制代码
后端编程

后端在启动时分别与本地HBase建立管理员客户端、普通客户端的连接。管理员客户端管理所有表,普通客户端管理表数据的增删改查。并启动HTTP服务提供一系列API接口。
  1. package variable
  2. import "github.com/tsuna/gohbase"
  3. var AdminClient gohbase.AdminClient
  4. var Client gohbase.Client
复制代码
  1. package main
  2. import (
  3.         "github.com/gin-gonic/gin"
  4.         "github.com/tsuna/gohbase"
  5.         "wolflong.com/hbase_gin/router"
  6.         "wolflong.com/hbase_gin/variable"
  7. )
  8. func init() {
  9.         variable.AdminClient = gohbase.NewAdminClient("127.0.0.1")
  10.         variable.Client = gohbase.NewClient("127.0.0.1")
  11. }
  12. func main() {
  13.         r := gin.Default()
  14.         router.Router(r)
  15.         r.Run(":1313")
  16. }
复制代码
给定一个处理错误的通用方法
  1. package controller
  2. import "github.com/gin-gonic/gin"
  3. func checkError(err error, c *gin.Context, handlers ...gin.HandlerFunc) {
  4.         if err != nil {
  5.                 c.JSON(500, gin.H{"error": "致命错误", "back": err.Error()})
  6.                 panic(err)
  7.         }
  8. }
复制代码
1.1 列出所有表的相关信息

HBase Shell 对应代码
  1. list
复制代码
从管理客户端发出请求,获取所有表,遍历表将表的命名空间与表名存储至切片中以json格式返回
  1. type table struct {
  2.         Namespace string `json:"namespace"`
  3.         Qualifier string `json:"qualifier"`
  4. }
  5. func ShowTableList(c *gin.Context) {
  6.         var tables []table
  7.         t, err := hrpc.NewListTableNames(context.Background())
  8.         checkError(err, c)
  9.         res, err := variable.AdminClient.ListTableNames(t)
  10.         checkError(err, c)
  11.         for _, v := range res {
  12.                 tables = append(tables, table{string(v.GetNamespace()), string(v.GetQualifier())})
  13.         }
  14.         fmt.Println(tables)
  15.         c.JSON(200, tables)
  16. }
复制代码
测试结果
  1. [
  2.     {
  3.         "namespace": "default",
  4.         "qualifier": "SC"
  5.     },
  6.     {
  7.         "namespace": "default",
  8.         "qualifier": "course"
  9.     },
  10.     {
  11.         "namespace": "default",
  12.         "qualifier": "student"
  13.     },
  14.     {
  15.         "namespace": "default",
  16.         "qualifier": "test"
  17.     }
  18. ]
复制代码
1.2 打印指定表的所有记录数据

HBase Shell 对应代码
  1. scan "course"
复制代码

获取请求参数table,从普通客户端发出请求,获取scanner,遍历scanner直到io.EOF。请注意,当查询的表没数据时,第一条拿到的是nil数据,需要进行判断。
  1. func ShowTableRows(c *gin.Context) {
  2.         var Cells [][]*hrpc.Cell
  3.         t, err := hrpc.NewScan(context.Background(), []byte(c.Query("table")))
  4.         checkError(err, c)
  5.         res := variable.Client.Scan(t)
  6.         row, err := res.Next()
  7.         for err != io.EOF && row != nil {
  8.                 Cells = append(Cells, row.Cells)
  9.                 fmt.Println(row.Cells)
  10.                 row, err = res.Next()
  11.         }
  12.         c.JSON(200, Cells)
  13. }
复制代码
localhost:1313/TableRows?table=course 测试结果。
请注意,Gin框架在转换二进制数据时使用BASE64编码,使得控制字符在网络上正常传输。
  1. [
  2.     [
  3.         {
  4.             "row": "YzAwMQ==",
  5.             "family": "Q19DcmVkaXQ=",
  6.             "timestamp": 1680431640294,
  7.             "cell_type": 4,
  8.             "value": "Mi4w"
  9.         },
  10.         {
  11.             "row": "YzAwMQ==",
  12.             "family": "Q19DcmVkaXQ=",
  13.             "qualifier": "bmV3",
  14.             "timestamp": 1680432352886,
  15.             "cell_type": 4,
  16.             "value": "NS4w"
  17.         },
  18.         {
  19.             "row": "YzAwMQ==",
  20.             "family": "Q19OYW1l",
  21.             "timestamp": 1680431640279,
  22.             "cell_type": 4,
  23.             "value": "TWF0aA=="
  24.         },
  25.         {
  26.             "row": "YzAwMQ==",
  27.             "family": "Q19Obw==",
  28.             "timestamp": 1680431640250,
  29.             "cell_type": 4,
  30.             "value": "MTIzMDAx"
  31.         }
  32.     ],
  33.     [
  34.         {
  35.             "row": "YzAwMg==",
  36.             "family": "Q19DcmVkaXQ=",
  37.             "timestamp": 1680431640328,
  38.             "cell_type": 4,
  39.             "value": "NS4w"
  40.         },
  41.         {
  42.             "row": "YzAwMg==",
  43.             "family": "Q19OYW1l",
  44.             "timestamp": 1680431640318,
  45.             "cell_type": 4,
  46.             "value": "Q29tcHV0ZXI="
  47.         },
  48.         {
  49.             "row": "YzAwMg==",
  50.             "family": "Q19Obw==",
  51.             "timestamp": 1680431640305,
  52.             "cell_type": 4,
  53.             "value": "MTIzMDAy"
  54.         }
  55.     ],
  56.     [
  57.         {
  58.             "row": "YzAwMw==",
  59.             "family": "Q19DcmVkaXQ=",
  60.             "timestamp": 1680431640363,
  61.             "cell_type": 4,
  62.             "value": "My4w"
  63.         },
  64.         {
  65.             "row": "YzAwMw==",
  66.             "family": "Q19OYW1l",
  67.             "timestamp": 1680431640352,
  68.             "cell_type": 4,
  69.             "value": "RW5nbGlzaA=="
  70.         },
  71.         {
  72.             "row": "YzAwMw==",
  73.             "family": "Q19Obw==",
  74.             "timestamp": 1680431640343,
  75.             "cell_type": 4,
  76.             "value": "MTIzMDAz"
  77.         }
  78.     ]
  79. ]
复制代码
1.3 向已创建好的表添加和删除指定的列族或列

HBase Shell 对应代码
  1. put 'course','c001','C_Credit:new','5.0'
  2. delete 'course','c001','C_Credit:new'
复制代码
使用普通客户端进行put操作,需要准备一个item数据包含当前操作的列族或列以及对应的值。支持覆盖重写与新增。
  1. func TableInsertRowCol(c *gin.Context) {
  2.         table := c.PostForm("table")
  3.         rowKey := c.PostForm("rowKey")
  4.         colFamily := c.PostForm("colFamily")
  5.         col := c.PostForm("col")
  6.         val := c.PostForm("val")
  7.         var item map[string]map[string][]byte = make(map[string]map[string][]byte)
  8.         item[colFamily] = make(map[string][]byte)
  9.         item[colFamily][col] = []byte(val)
  10.         fmt.Println(item)
  11.         t, err := hrpc.NewPutStr(context.Background(), table, rowKey, item)
  12.         checkError(err, c)
  13.         res, err := variable.Client.Put(t)
  14.         checkError(err, c)
  15.         c.JSON(200, res)
  16. }
复制代码
测试结果
  1. {
  2.     "Cells": null,
  3.     "Stale": false,
  4.     "Partial": false,
  5.     "Exists": null
  6. }
复制代码

删除列族或列,不需要val数据。列可以为空字符串,仅删除列族,不为空时则删除指定的列
  1. func TableDeleteRowCol(c *gin.Context) {
  2.         table := c.PostForm("table")
  3.         rowKey := c.PostForm("rowKey")
  4.         colFamily := c.PostForm("colFamily")
  5.         col := c.PostForm("col")
  6.         // val := c.PostForm("val")
  7.         var item map[string]map[string][]byte = make(map[string]map[string][]byte)
  8.         item[colFamily] = make(map[string][]byte)
  9.         item[colFamily][col] = []byte{}
  10.         fmt.Println(item)
  11.         t, err := hrpc.NewPutStr(context.Background(), table, rowKey, item)
  12.         checkError(err, c)
  13.         res, err := variable.Client.Delete(t)
  14.         checkError(err, c)
  15.         c.JSON(200, res)
  16. }
复制代码
1.4 清空指定表的所有数据

HBase Shell 对应代码
  1. truncate 'course'
复制代码
先disable表,再删除表。
  1. func TableDelete(c *gin.Context) {
  2.         t := hrpc.NewDisableTable(context.Background(), []byte(c.Query("table")))
  3.         err := variable.AdminClient.DisableTable(t)
  4.         checkError(err, c)
  5.         t2 := hrpc.NewDeleteTable(context.Background(), []byte(c.Query("table")))
  6.         err = variable.AdminClient.DeleteTable(t2)
  7.         checkError(err, c)
  8.         c.JSON(200, gin.H{"result": "删除成功"})
  9. }
复制代码
1.5 统计表的行数

HBase Shell 对应代码
  1. count 'course'
复制代码
修改1.2的代码
  1. func ShowTableRowsCount(c *gin.Context) {
  2.         var count int
  3.         t, err := hrpc.NewScan(context.Background(), []byte(c.Query("table")))
  4.         checkError(err, c)
  5.         res := variable.Client.Scan(t)
  6.         row, err := res.Next()
  7.         for err != io.EOF && row != nil {
  8.                 count++
  9.                 row, err = res.Next()
  10.         }
  11.         c.JSON(200, count)
  12. }
复制代码
localhost:1313/TableRowsCount?table=course 测试结果
  1. 3
复制代码
3.1 创建表
  1. func TableCreate(c *gin.Context) {
  2.         table := c.PostForm("table")
  3.         fs := c.PostForm("fields")
  4.         var fields []string
  5.         // fmt.Println(table, fs)
  6.         err := json.Unmarshal([]byte(fs), &fields)
  7.         checkError(err, c)
  8.         // 验证是否存在表
  9.         flag := false
  10.         t, err := hrpc.NewListTableNames(context.Background())
  11.         checkError(err, c)
  12.         res, err := variable.AdminClient.ListTableNames(t)
  13.         checkError(err, c)
  14.         for _, v := range res {
  15.                 if string(v.GetQualifier()) == table {
  16.                         flag = true
  17.                 }
  18.         }
  19.         // 如存在删除表
  20.         if flag {
  21.                 t := hrpc.NewDisableTable(context.Background(), []byte(table))
  22.                 err := variable.AdminClient.DisableTable(t)
  23.                 checkError(err, c)
  24.                 t2 := hrpc.NewDeleteTable(context.Background(), []byte(table))
  25.                 err = variable.AdminClient.DeleteTable(t2)
  26.                 checkError(err, c)
  27.         }
  28.         // 插入新表
  29.         var items map[string]map[string]string = make(map[string]map[string]string)
  30.         for _, v := range fields {
  31.                 items[v] = make(map[string]string)
  32.         }
  33.         t2 := hrpc.NewCreateTable(context.Background(), []byte(table), items)
  34.         err = variable.AdminClient.CreateTable(t2)
  35.         checkError(err, c)
  36.         c.JSON(200, gin.H{"result": "创建成功"})
  37. }
复制代码
通过1.1函数接口我们可知test表已经存在,现在我们使用3.1函数接口重新创建该表,并为接下来的3.2函数接口调用做准备。
  1. {
  2.     "result": "创建成功"
  3. }
复制代码
3.2 新增记录

为了增强健壮性,我们需要判断传入的fields、values参数个数是否一致,否则应当主动报错。
  1. func TableInsertRow(c *gin.Context) {
  2.         table := c.PostForm("table")
  3.         rowKey := c.PostForm("rowKey")
  4.         fs := c.PostForm("fields")
  5.         vs := c.PostForm("values")
  6.         var fields []string
  7.         var values []string
  8.         err := json.Unmarshal([]byte(fs), &fields)
  9.         checkError(err, c)
  10.         err = json.Unmarshal([]byte(vs), &values)
  11.         checkError(err, c)
  12.         if len(fields) != len(values) {
  13.                 checkError(fmt.Errorf("数量不一致"), c)
  14.         }
  15.         var item map[string]map[string][]byte = make(map[string]map[string][]byte)
  16.         for i, v := range fields {
  17.                 vs := strings.Split(v, ":")
  18.                 item[vs[0]] = make(map[string][]byte)
  19.                 if len(vs) > 1 {
  20.                         item[vs[0]][vs[1]] = []byte(values[i])
  21.                 } else {
  22.                         item[vs[0]][""] = []byte(values[i])
  23.                 }
  24.         }
  25.         fmt.Println(item)
  26.         t, err := hrpc.NewPutStr(context.Background(), table, rowKey, item)
  27.         checkError(err, c)
  28.         res, err := variable.Client.Put(t)
  29.         checkError(err, c)
  30.         c.JSON(200, res)
  31. }
复制代码
3.3 通过列过滤数据

使用scanner实现,也可以通过过滤器实现。
  1. // TODO USE FILTER
  2. type item struct {
  3.         Row       string       `json:"row"`
  4.         Family    string       `json:"family"`
  5.         Qualifier string       `json:"qualifier"`
  6.         Timestamp *uint64      `json:"timestamp"`
  7.         Cell_type *pb.CellType `json:"cell_type"`
  8.         Value     string       `json:"value"`
  9. }
  10. func TableColumnScan(c *gin.Context) {
  11.         table := c.Query("table")
  12.         column := c.Query("column")
  13.         vs := strings.Split(column, ":")
  14.         var items []item
  15.         t, err := hrpc.NewScan(context.Background(), []byte(table))
  16.         checkError(err, c)
  17.         res := variable.Client.Scan(t)
  18.         row, err := res.Next()
  19.         for err != io.EOF && row != nil {
  20.                 for _, v := range row.Cells {
  21.                         if string(v.Family) != vs[0] {
  22.                                 continue
  23.                         }
  24.                         if len(vs) > 1 {
  25.                                 if string(v.Qualifier) != vs[1] {
  26.                                         continue
  27.                                 }
  28.                         }
  29.                         fmt.Println(row.Cells)
  30.                         items = append(items, item{
  31.                                 Row:       string(v.Row),
  32.                                 Family:    string(v.Family),
  33.                                 Qualifier: string(v.Qualifier),
  34.                                 Timestamp: v.Timestamp,
  35.                                 Cell_type: v.CellType,
  36.                                 Value:     string(v.Value),
  37.                         })
  38.                 }
  39.                 row, err = res.Next()
  40.         }
  41.         c.JSON(200, items)
  42. }
复制代码
再执行一遍1.3添加列的函数,调用接口,执行结果如下。
localhost:1313/TableColumnScan?table=course&column=C_Credit
  1. [
  2.     {
  3.         "row": "c001",
  4.         "family": "C_Credit",
  5.         "qualifier": "",
  6.         "timestamp": 1680431640294,
  7.         "cell_type": 4,
  8.         "value": "2.0"
  9.     },
  10.     {
  11.         "row": "c001",
  12.         "family": "C_Credit",
  13.         "qualifier": "new",
  14.         "timestamp": 1680434951646,
  15.         "cell_type": 4,
  16.         "value": "5.0"
  17.     },
  18.     {
  19.         "row": "c002",
  20.         "family": "C_Credit",
  21.         "qualifier": "",
  22.         "timestamp": 1680431640328,
  23.         "cell_type": 4,
  24.         "value": "5.0"
  25.     },
  26.     {
  27.         "row": "c003",
  28.         "family": "C_Credit",
  29.         "qualifier": "",
  30.         "timestamp": 1680431640363,
  31.         "cell_type": 4,
  32.         "value": "3.0"
  33.     }
  34. ]
复制代码
localhost:1313/TableColumnScan?table=course&column=C_Credit:new
  1. [
  2.     {
  3.         "row": "c001",
  4.         "family": "C_Credit",
  5.         "qualifier": "new",
  6.         "timestamp": 1680434951646,
  7.         "cell_type": 4,
  8.         "value": "5.0"
  9.     }
  10. ]
复制代码
3.4 修改行数据

与 1.3 函数代码一致
3.5 删除表指定记录
  1. package controller
  2. import (
  3.         "context"
  4.         "github.com/gin-gonic/gin"
  5.         "github.com/tsuna/gohbase/hrpc"
  6.         "wolflong.com/hbase_gin/variable"
  7. )
  8. func TableDeleteRow(c *gin.Context) {
  9.         table := c.PostForm("table")
  10.         rowKey := c.PostForm("rowKey")
  11.         t, err := hrpc.NewDelStr(context.Background(), table, rowKey, nil)
  12.         checkError(err, c)
  13.         res, err := variable.Client.Delete(t)
  14.         checkError(err, c)
  15.         c.JSON(200, res)
  16. }
复制代码
  1. {
  2.     "Cells": null,
  3.     "Stale": false,
  4.     "Partial": false,
  5.     "Exists": null
  6. }
复制代码
再次调用1.5函数接口,执行结果符合预期。
  1. 2
复制代码

总结

装环境2小时,代码和文档编写4.5小时,代码编写过程中查阅官方文档和解决问题的时间为3小时。
总共花费6.5个小时,共编写333行代码,56行表数据。
代码编写能力得到了提升。提高了自己对HBase的理解,作为一个典型的NoSQL数据库,其一大优点是可在廉价PC服务器上搭建起大规模结构化存储集群,并提供易使用的HBase Shell操作数据集,水平扩展方便。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表