一、择要
Apache Spark 是一个快速、通用的大数据处置惩罚引擎,提供了丰富的 API 用于数据处置惩罚和分析。HBase 是一个分布式、可扩展的 NoSQL 数据库,适合存储海量结构化和半结构化数据。Spark 与 HBase 的结合可以充分发挥两者的上风,实现高效的数据处置惩罚和分析。
Spark 可以通过 HBase 的 Java API 大概专用的连接器来读取 HBase 中的数据。在读取数据时,Spark 可以将 HBase 表中的数据转换为 RDD(弹性分布式数据集)大概 DataFrame,然后使用 Spark 的各种操作进行数据处置惩罚和分析。
本文以Spark2.3.2读取HBase1.4.8中的hbase_emp_table表数据进行简单分析,用户实现相关的业务逻辑。
二、实现过程
- 在IDEA创建工程SparkReadHBaseData
- 在pom.xml文件中添加依赖
- 新建com.lpssfxy的package
- 在该package下新建名为SparkReadHBaseData的Object,编写步伐实现业务逻辑:
- import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
- import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.spark.sql.{SparkSession}
- /**
- * Employee样例类
- *
- * @param empNo
- * @param eName
- * @param job
- * @param mgr
- * @param hireDate
- * @param salary
- * @param comm
- * @param deptNo
- */
- case class Employee(empNo: Int, eName: String, job: String, mgr: Int, hireDate: String, salary: Double, comm: Double, deptNo: Int)
- object SparkReadHBaseData {
- private val TABLE_NAME = "hbase_emp_table"
- private val INFO_CF = "info"
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder()
- .appName("SparkHBaseIntegration")
- .master("local[*]")
- .getOrCreate()
- val conf = HBaseConfiguration.create()
- conf.set("hbase.zookeeper.quorum", "s1,s2,s3")
- conf.set("hbase.zookeeper.property.clientPort", "2181")
- val connection = ConnectionFactory.createConnection(conf)
- val table = connection.getTable(TableName.valueOf(TABLE_NAME))
- val scan = new Scan()
- scan.addFamily(Bytes.toBytes(INFO_CF))
- // 扫描 HBase 表并转换为 RDD
- val results = table.getScanner(scan)
- val data = Iterator.continually(results.next()).takeWhile(_ != null).map { result =>
- val rowKey = Bytes.toString(result.getRow())
- val eName = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("ename")))
- val job = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("job")))
- val mgrString = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("mgr")))
- var mgr: Int = 0
- if (!"".equals(mgrString) && null != mgrString) {
- mgr = mgrString.toInt
- }
- val hireDate = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("hiredate")))
- val salary = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("sal")))
- val commString = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("comm")))
- var comm: Double = 0
- if (!"".equals(commString) && null != commString) {
- comm = commString.toDouble
- }
- val deptNo = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("deptno")))
- (rowKey.toInt, eName, job, mgr, hireDate, salary.toDouble, comm, deptNo.toInt)
- }.toList
- // 转换为 DataFrame
- import spark.implicits._
- val df = spark.sparkContext.parallelize(data).map(item => {
- Employee(item._1, item._2, item._3, item._4, item._5, item._6, item._7, item._8)
- }).toDF()
- // 将df注册成临时表
- df.createOrReplaceTempView("emp")
- // 需求1:统计各个部门总支出
- val totalExpense = spark.sql("select deptNo,sum(salary) as total from emp group by deptNo order by total desc")
- totalExpense.show()
- // 需求2: 统计各个部门总的支出(包括工资和奖金),并按照总支出升序排
- val totalExpense2 = spark.sql("select deptNo,sum(salary + comm) as total from emp group by deptNo order by total")
- totalExpense2.show()
- // TODO:需求3-结合dept部门表来实现多表关联查询,请同学自行实现
- // 关闭连接
- connection.close()
- // 停止spark,释放资源
- spark.stop()
- }
- }
复制代码 - 为了没有大量无关日记输出,在resources目录下新建log4j.properties,添加如下内容:
- log4j.rootLogger=ERROR,stdout
- # write to stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
复制代码 - 启动虚拟机中的hdfs、zookeeper和hbase
- start-dfs.sh
- zkServer.sh start
- start-hbase.sh
复制代码 - 运行代码,查看实行结果
三、小结
- 本实验仅仅演示Spark读取HBase表数据并简单分析的过程,可以作为复杂的业务逻辑分析的基础。
- Spark 读取并分析 HBase 数据具有高性能、丰富的数据分析功能、可扩展性、灵活性和实时性等上风。然而,也存在数据一致性、复杂的配置和管理、资源消耗和兼容性等不敷。在实际应用中,必要根据详细的需求和场景来选择是否使用 Spark 和 HBase 的组合,并注意解决大概出现的问题。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |