SnapshotScanMR速率比TableScanMR快10~30倍,那Spark如何实现SnapshotScanM ...

打印 上一主题 下一主题

主题 1501|帖子 1501|积分 4513

HBase 提供的 TableScanMR 和 SnapshotScanMR 是两种用于在大数据集中举行扫描的 MapReduce 作业,网上也有很多介绍Spark如何实现TableScanMR,但是对SnapshotScanMR的实现方式很少几乎没找到可用的,接下来我们先说说这两者的一些共同点以及不同的实现原理,再介绍Spark是如何实现的。
雷同点


  • 目的
    两者都是为了高效地扫描 HBase 中大量的数据,适用于数据分析和批处理使命。
  • 基于 MapReduce
    两个作业都基于 Hadoop 的 MapReduce 框架,实现分布式盘算,能够利用集群资源举行并行处理。
  • 支持高并发
    这两种扫描方式都能充实利用 HBase 的分布式特性,支持高并发的读操作,适合处理大规模数据集。
不同点


  • 数据来源

    • TableScanMR
      直接对 HBase 实时表举行访问,读取当前的数据状态,包含所有的行和最新的变革。
    • SnapshotScanMR
      读取特定快照的数据,快照创建时数据的一个静态视图,不会反映后续的更改。

  • 一致性

    • TableScanMR
      由于是实时访问,扫描期间大概受到其他写操作的影响,因此返回的数据大概存在一致性问题。
    • SnapshotScanMR
      提供了对某一时刻的数据访问,因此在扫描时数据是一致的,不受后续操作影响。

  • 性能特性

    • TableScanMR
      性能大概受到数据库实时写入和更新的影响,适用于实时数据分析,但在写入繁重的环境中性能大概波动。
    • SnapshotScanMR
      通常具有较稳定的性能,适合于对历史数据的分析和处理,性能受后续写入影响较小。

  • 实现的原理

    • TableScanMR
      TableScan现实上还是一种并行的ScanApi,它离不开RegionServer,所有的Scan请求都会打到RegionServer上,所以假如RegionServer有压力时这种Scan效果并不理想。比如我们在Scan的同时,服务还在大量的compact 大概另有其他的bulkload的等操作影响RegionServer压力的时,Scan效果不是很理想。工作原理如下

    • SnapshotScanMR
      它的Scan方式是直接绕过了RegionServer,直接读取Hbase的HDFS文件,所以RegionServer的压力对他无影响。影响它的就是磁盘IO大概网络。所以当执利用命和Hbase数据是在同机房时SnapshotScan的速率是TableScan 的10~30倍假如跨机房数据量大了大概还不如TableScan,所以得包管同机房。工作原理如下


  • 使用场景

    • TableScanMR

      • 适用于必要访问最新数据的场景,如实时盘算、在线分析等。
      • 适用于数据分布均匀的场景

    • SnapshotScanMR

      • 适用于Bulkload后直接Scan的场景
      • 适用于缓解RegionServer压力大大概压力分布不均匀的场景(前提可以忽略实时的写入数据,究竟使用的是快照)
      • 适用于大部分表数据分布均匀,但是部分大表数据分布不均匀,经常大量长时的compact影响单RegionServer压力,导致在Scan均匀表时出现部分长尾使命的情况(这种也是我所遇到的)。
      • 盘算使命和数据在同机房的情况这点很重要,假如跨机房数据复制的RPC就会非常的耗时


  • Spark实现方式

    • TableScanMR

  1.    val sc = new SparkContext(sparkConf)
  2.    val scan = new Scan();
  3.    scan.addFamily(Bytes.toBytes("c"))
  4.    scan.setTimeStamp(timeStamp)
  5.    scan.withStartRow(Bytes.toBytes(startRow))
  6.    scan.withStopRow(Bytes.toBytes(stopRow))
  7.    
  8.    val hbaseConf = new Configuration()
  9.    hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
  10.    hbaseConf.set(TableInputFormat.INPUT_TABLE, "Hbase的表名")
  11.    val dataRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
  12.   //  处理后续逻辑
复制代码


  • SnapshotScanMR
  1.   //1.定义扫描范围
  2.    val sc = new SparkContext(sparkConf)
  3.    val scan = new Scan();
  4.    scan.addFamily(Bytes.toBytes("c"))
  5.    scan.setTimeStamp(timeStamp)
  6.    scan.withStartRow(Bytes.toBytes(startRow))
  7.    scan.withStopRow(Bytes.toBytes(stopRow))
  8.   //2.创建快照
  9.    val table = TableName.valueOf("Hbase的表名")
  10.    val snapshotName = "快照名称"
  11.    val tmpRestoreDir = new Path("临时目录")
  12.    //创建快照(可以参考Hbase创建快照)
  13.    Admin.snapshot(snapshotName, table);
  14.   //3.创建SnapshotScan并返回一个RDD
  15.    val job: Job = Job.getInstance()
  16.    TableMapReduceUtil.initCredentials(job)
  17.    TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,classOf[IdentityTableMapper], null, null, job, true, tmpRestoreDir)
  18.   //最后一个参数是HBaseContext 可以根据具体的实现传入
  19.   val dataRDD: RDD[(ImmutableBytesWritable, Result)] = new NewHBaseRDD(sc,classOf[TableSnapshotInputFormat],classOf[ImmutableBytesWritable],classOf[Result], job.getConfiguration,HBaseContext)
  20.   //  处理后续逻辑
  21. //4.删除快照和目录
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

来自云龙湖轮廓分明的月亮

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表