【Spark分析HBase数据】Spark读取并分析HBase数据

打印 上一主题 下一主题

主题 2084|帖子 2084|积分 6252

一、择要

Apache Spark 是一个快速、通用的大数据处置惩罚引擎,提供了丰富的 API 用于数据处置惩罚和分析。HBase 是一个分布式、可扩展的 NoSQL 数据库,适合存储海量结构化和半结构化数据。Spark 与 HBase 的结合可以充分发挥两者的上风,实现高效的数据处置惩罚和分析。
Spark 可以通过 HBase 的 Java API 大概专用的连接器来读取 HBase 中的数据。在读取数据时,Spark 可以将 HBase 表中的数据转换为 RDD(弹性分布式数据集)大概 DataFrame,然后使用 Spark 的各种操作进行数据处置惩罚和分析。
本文以Spark2.3.2读取HBase1.4.8中的hbase_emp_table表数据进行简单分析,用户实现相关的业务逻辑。
二、实现过程


  • 在IDEA创建工程SparkReadHBaseData
  • 在pom.xml文件中添加依赖
    1. <properties>
    2.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    3.     <maven.compiler.source>1.8</maven.compiler.source>
    4.     <maven.compiler.target>1.8</maven.compiler.target>
    5.     <scala.version>2.11.8</scala.version>
    6.     <spark.version>2.3.3</spark.version>
    7.     <hbase.version>1.4.8</hbase.version>
    8. </properties>
    9. <dependencies>
    10.     <!-- Spark 依赖 -->
    11.     <dependency>
    12.         <groupId>org.apache.spark</groupId>
    13.         <artifactId>spark-core_2.11</artifactId>
    14.         <version>${spark.version}</version>
    15.     </dependency>
    16.     <dependency>
    17.         <groupId>org.apache.spark</groupId>
    18.         <artifactId>spark-sql_2.11</artifactId>
    19.         <version>${spark.version}</version>
    20.     </dependency>
    21.     <!-- HBase 依赖 -->
    22.     <dependency>
    23.         <groupId>org.apache.hbase</groupId>
    24.         <artifactId>hbase-client</artifactId>
    25.         <version>${hbase.version}</version>
    26.     </dependency>
    27.     <dependency>
    28.         <groupId>org.apache.hbase</groupId>
    29.         <artifactId>hbase-common</artifactId>
    30.         <version>${hbase.version}</version>
    31.     </dependency>
    32.     <dependency>
    33.         <groupId>org.apache.hbase</groupId>
    34.         <artifactId>hbase-server</artifactId>
    35.         <version>${hbase.version}</version>
    36.     </dependency>
    37.     <dependency>
    38.         <groupId>org.apache.hbase</groupId>
    39.         <artifactId>hbase-hadoop-compat</artifactId>
    40.         <version>${hbase.version}</version>
    41.     </dependency>
    42.     <!-- Hadoop 依赖 -->
    43.     <dependency>
    44.         <groupId>org.apache.hadoop</groupId>
    45.         <artifactId>hadoop-client</artifactId>
    46.         <version>2.7.4</version>
    47.         <scope>provided</scope>
    48.     </dependency>
    49.     <!-- 处理依赖冲突 -->
    50.     <dependency>
    51.         <groupId>com.google.guava</groupId>
    52.         <artifactId>guava</artifactId>
    53.         <version>12.0.1</version>
    54.     </dependency>
    55.     <!-- 使用scala2.11.8进行编译和打包 -->
    56.     <dependency>
    57.         <groupId>org.scala-lang</groupId>
    58.         <artifactId>scala-library</artifactId>
    59.         <version>${scala.version}</version>
    60.     </dependency>
    61. </dependencies>
    62. <build>
    63.     <!-- 指定scala源代码所在的目录 -->
    64.     <sourceDirectory>src/main/scala</sourceDirectory>
    65.     <plugins>
    66.         <!-- Scala 编译插件 -->
    67.         <plugin>
    68.             <groupId>net.alchim31.maven</groupId>
    69.             <artifactId>scala-maven-plugin</artifactId>
    70.             <version>3.4.6</version>
    71.             <executions>
    72.                 <execution>
    73.                     <goals>
    74.                         <goal>compile</goal>
    75.                         <goal>testCompile</goal>
    76.                     </goals>
    77.                 </execution>
    78.             </executions>
    79.         </plugin>
    80.         <plugin>
    81.             <groupId>org.apache.maven.plugins</groupId>
    82.             <artifactId>maven-assembly-plugin</artifactId>
    83.             <version>3.6.0</version>
    84.             <configuration>
    85.                 <archive>
    86.                     <!-- 项目中有多个主类时,采用不指定主类规避pom中只能配置一个主类的问题 -->
    87.                     <manifest/>
    88.                 </archive>
    89.                 <descriptorRefs>
    90.                     <descriptorRef>jar-with-dependencies</descriptorRef>
    91.                 </descriptorRefs>
    92.             </configuration>
    93.             <executions>
    94.                 <execution>
    95.                     <id>make-assembly</id>
    96.                     <phase>package</phase>
    97.                     <goals>
    98.                         <goal>single</goal>
    99.                     </goals>
    100.                 </execution>
    101.             </executions>
    102.         </plugin>
    103.     </plugins>
    104. </build>
    复制代码
  • 新建com.lpssfxy的package

  • 在该package下新建名为SparkReadHBaseData的Object,编写步伐实现业务逻辑:
    1. import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    2. import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
    3. import org.apache.hadoop.hbase.util.Bytes
    4. import org.apache.spark.sql.{SparkSession}
    5. /**
    6. * Employee样例类
    7. *
    8. * @param empNo
    9. * @param eName
    10. * @param job
    11. * @param mgr
    12. * @param hireDate
    13. * @param salary
    14. * @param comm
    15. * @param deptNo
    16. */
    17. case class Employee(empNo: Int, eName: String, job: String, mgr: Int, hireDate: String, salary: Double, comm: Double, deptNo: Int)
    18. object SparkReadHBaseData {
    19.   private val TABLE_NAME = "hbase_emp_table"
    20.   private val INFO_CF = "info"
    21.   def main(args: Array[String]): Unit = {
    22.     val spark = SparkSession.builder()
    23.       .appName("SparkHBaseIntegration")
    24.       .master("local[*]")
    25.       .getOrCreate()
    26.     val conf = HBaseConfiguration.create()
    27.     conf.set("hbase.zookeeper.quorum", "s1,s2,s3")
    28.     conf.set("hbase.zookeeper.property.clientPort", "2181")
    29.     val connection = ConnectionFactory.createConnection(conf)
    30.     val table = connection.getTable(TableName.valueOf(TABLE_NAME))
    31.     val scan = new Scan()
    32.     scan.addFamily(Bytes.toBytes(INFO_CF))
    33.     // 扫描 HBase 表并转换为 RDD
    34.     val results = table.getScanner(scan)
    35.     val data = Iterator.continually(results.next()).takeWhile(_ != null).map { result =>
    36.       val rowKey = Bytes.toString(result.getRow())
    37.       val eName = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("ename")))
    38.       val job = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("job")))
    39.       val mgrString = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("mgr")))
    40.       var mgr: Int = 0
    41.       if (!"".equals(mgrString) && null != mgrString) {
    42.         mgr = mgrString.toInt
    43.       }
    44.       val hireDate = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("hiredate")))
    45.       val salary = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("sal")))
    46.       val commString = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("comm")))
    47.       var comm: Double = 0
    48.       if (!"".equals(commString) && null != commString) {
    49.         comm = commString.toDouble
    50.       }
    51.       val deptNo = Bytes.toString(result.getValue(Bytes.toBytes(INFO_CF), Bytes.toBytes("deptno")))
    52.       (rowKey.toInt, eName, job, mgr, hireDate, salary.toDouble, comm, deptNo.toInt)
    53.     }.toList
    54.     // 转换为 DataFrame
    55.     import spark.implicits._
    56.     val df = spark.sparkContext.parallelize(data).map(item => {
    57.       Employee(item._1, item._2, item._3, item._4, item._5, item._6, item._7, item._8)
    58.     }).toDF()
    59.     // 将df注册成临时表
    60.     df.createOrReplaceTempView("emp")
    61.     // 需求1:统计各个部门总支出
    62.     val totalExpense = spark.sql("select deptNo,sum(salary) as total from emp group by deptNo order by total desc")
    63.     totalExpense.show()
    64.     // 需求2: 统计各个部门总的支出(包括工资和奖金),并按照总支出升序排
    65.         val totalExpense2 = spark.sql("select deptNo,sum(salary + comm) as total from emp group by deptNo order by total")
    66.         totalExpense2.show()
    67.         // TODO:需求3-结合dept部门表来实现多表关联查询,请同学自行实现
    68.     // 关闭连接
    69.     connection.close()
    70.     // 停止spark,释放资源
    71.     spark.stop()
    72.   }
    73. }
    复制代码
  • 为了没有大量无关日记输出,在resources目录下新建log4j.properties,添加如下内容:
    1. log4j.rootLogger=ERROR,stdout
    2. # write to stdout
    3. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    4. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    5. log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
    复制代码
  • 启动虚拟机中的hdfs、zookeeper和hbase
    1. start-dfs.sh
    2. zkServer.sh start
    3. start-hbase.sh
    复制代码
  • 运行代码,查看实行结果

三、小结


  • 本实验仅仅演示Spark读取HBase表数据并简单分析的过程,可以作为复杂的业务逻辑分析的基础。
  • Spark 读取并分析 HBase 数据具有高性能、丰富的数据分析功能、可扩展性、灵活性和实时性等上风。然而,也存在数据一致性、复杂的配置和管理、资源消耗和兼容性等不敷。在实际应用中,必要根据详细的需求和场景来选择是否使用 Spark 和 HBase 的组合,并注意解决大概出现的问题。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表