大数据-268 实时数仓 - ODS层 将 Kafka 中的维度表写入 DIM ...

打印 上一主题 下一主题

主题 844|帖子 844|积分 2532

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!



  • MyBatis 更新完毕
  • 现在开始更新 Spring,一起深入浅出!
现在已经更新到了:



  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 实时数仓(正在更新…)
章节内容



  • ODS
  • Lambda架构
  • Kappa架构

根本介绍

在 Kafka 中写入维度表(DIM)通常涉及将实时或批处理惩罚数据从 Kafka 主题(Topic)读取,并根据数据流中的信息更新维度表(DIM),这在数据堆栈或数据湖的 ETL(提取、转换、加载)过程中非经常见。维度表(DIM)存储的是与业务数据相干的维度信息,例如客户、产品、地理位置等,用于支持 OLAP(联机分析处理惩罚)查询。
明白 Kafka 数据流

Kafka 是一个分布式流平台,用于高吞吐量的消息通报。在 ETL 过程中,Kafka 通常用作数据的消息队列大概流处理惩罚的来源。每当新数据生成时,它会被发布到 Kafka 中的某个主题(Topic),然后消耗者(Consumer)可以从主题中获取数据进行处理惩罚。
设计维度表(DIM)

维度表通常包含业务实体的详细信息,如产品名称、客户信息、时间维度等。与事实表(Fact)不同,维度表的数据较为静态,但大概会随着时间更新(例如,客户地址变更或产品种别更新)。每个维度表通常有一个唯一的主键(如 customer_id 或 product_id)来标识记录。
Kafka 消耗者(Consumer)

为了从 Kafka 中读取维度数据,需要创建一个消耗者(Consumer),它会从 Kafka 的某个主题(Topic)中读取消息。这些消息通常是 JSON 格式,包含需要写入维度表的信息。消耗者将从 Kafka 主题中获取数据,大概包括以下步骤:


  • 连接到 Kafka 集群。
  • 订阅一个或多个主题(Topics)。
  • 消耗消息并将其通报给后续的处理惩罚逻辑。
  • 消耗者的实现可以使用 Kafka 提供的客户端库,例如 Kafka 的 Java 客户端、Python 的 confluent-kafka 等。
数据处理惩罚和转换

在读取到 Kafka 消息后,消耗者需要对数据进行须要的处理惩罚和转换。对于维度数据,处理惩罚逻辑大概包括:


  • 数据剖析:将消息从 Kafka 中的格式(例如 JSON)剖析成结构化数据。
  • 校验数据:检查数据是否符合业务规则,是否完整,是否有用。
  • 维度数据更新:如果 Kafka 中的消息包含的维度信息已经存在,则更新相干记录;如果是新维度,则插入新记录。
维度表的更新

维度表的更新通常有两种常见的方式:


  • 全量更新:每次从 Kafka 获取到新的数据时,都将其覆盖到维度表中。这种方式适用于数据变动较少大概可以接受重写的场景。
  • 增量更新:根据时间戳、有用性标记或版本号等信息,更新已有的维度记录。这种方式适用于数据会有更新(如地址或状态变更)的场
    景。
增量更新时,通常会执行以下操纵:


  • 查找是否已有该维度记录(例如通过 dimension_id)。
  • 如果存在且数据发生变革,则更新该记录,同时更新 valid_to 时间,并插入一条新的记录,设置 valid_from 和 valid_to 时间。
  • 如果不存在该记录,则直接插入新的维度数据。
写入到目标存储(DIM)

在数据处理惩罚后,需要将更新后的维度数据写入目标存储。这通常是一个数据库(例如 MySQL、PostgreSQL 或 NoSQL 数据库)或数据堆栈(例如 Snowflake、Google BigQuery、Redshift)中的维度表(DIM)。
数据存储更新(事务性思量)

对于维度表的更新,通常需要确保数据的一致性。可以使用事务来确保数据在更新过程中的一致性,防止数据丢失或重复。例如,可以在事务中执行所有的更新和插入操纵,确保如果操纵失败,可以回滚。
TableObject

创建样例 TableObject
  1. case class TableObject(database: String, tableName: String, typeInfo: String, dataInfo: String) extends Serializable
复制代码
AreaInfo

  1. case class AreaInfo(
  2.   id: String,
  3.   name: String,
  4.   pid: String,
  5.   sname: String,
  6.   level: String,
  7.   citycode: String,
  8.   yzcode: String,
  9.   mername: String,
  10.   Lng: String,
  11.   Lat: String,
  12.   pinyin: String
  13.   )
复制代码
DataInfo

  1. case class DataInfo(
  2.   modifiedTime: String,
  3.   orderNo: String,
  4.   isPay: String,
  5.   orderId: String,
  6.   tradeSrc: String,
  7.   payTime: String,
  8.   productMoney: String,
  9.   totalMoney: String,
  10.   dataFlag: String,
  11.   userId: String,
  12.   areaId: String,
  13.   createTime: String,
  14.   payMethod: String,
  15.   isRefund: String,
  16.   tradeType: String,
  17.   status: String
  18. )
复制代码
ConnHBase

  1. class ConnHBase {
  2.   def connToHbase:Connection ={
  3.     val conf : Configuration = HBaseConfiguration.create()
  4.     conf.set("hbase.zookeeper.quorum","h121.wzk.icu,h122.wzk.icu,h123.wzk.icu")
  5.     conf.set("hbase.zookeeper.property.clientPort","2181")
  6.     conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)
  7.     conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)
  8.     val connection = ConnectionFactory.createConnection(conf)
  9.     connection
  10.   }
  11. }
复制代码
SinkHBase

  1. class SinkHBase extends RichSinkFunction[util.ArrayList[TableObject]] {
  2.   var connection : Connection = _
  3.   var hbTable : Table = _
  4.   override def open(parameters: Configuration): Unit = {
  5.     connection = new ConnHBase().connToHbase
  6.     hbTable = connection.getTable(TableName.valueOf("wzk_area"))
  7.   }
  8.   override def close(): Unit = {
  9.     if (hbTable != null) {
  10.       hbTable.close()
  11.     }
  12.     if (connection != null) {
  13.       connection.close()
  14.     }
  15.   }
  16.   override def invoke(value: util.ArrayList[TableObject], context: SinkFunction.Context[_]): Unit = {
  17.     value.forEach(x => {
  18.       println(x.toString)
  19.       val database: String = x.database
  20.       val tableName: String = x.tableName
  21.       val typeInfo: String = x.typeInfo
  22.       if ((database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_trade_orders"))) {
  23.         if (typeInfo.equalsIgnoreCase("insert")) {
  24.           value.forEach(x => {
  25.             val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])
  26.             insertTradeOrders(hbTable, info)
  27.           })
  28.         } else if (typeInfo.equalsIgnoreCase("update")) {
  29.         } else if (typeInfo.equalsIgnoreCase("delete")) {
  30.         }
  31.       }
  32.       if (database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_area")) {
  33.         if (typeInfo.equalsIgnoreCase("insert")) {
  34.           value.forEach(x => {
  35.             val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
  36.             insertArea(hbTable, info)
  37.           })
  38.         } else if (typeInfo.equalsIgnoreCase("update")) {
  39.           value.forEach(x => {
  40.             val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
  41.             insertArea(hbTable, info)
  42.           })
  43.         } else if (typeInfo.equalsIgnoreCase("delete")) {
  44.           value.forEach(x => {
  45.             val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
  46.             deleteArea(hbTable, info)
  47.           })
  48.         }
  49.       }
  50.     })
  51.   }
  52.   def insertTradeOrders(hbTable: Table, dataInfo: DataInfo): Unit = {
  53.     val tableName = "wzk_trade_orders"
  54.     val columnFamily = "f1"
  55.     // 如果表不存在则创建
  56.     createTableIfNotExists(connection, tableName, columnFamily)
  57.     val put = new Put(dataInfo.orderId.getBytes)
  58.     put.addColumn("f1".getBytes, "modifiedTime".getBytes, dataInfo.modifiedTime.getBytes())
  59.     put.addColumn("f1".getBytes, "orderNo".getBytes, dataInfo.orderNo.getBytes())
  60.     put.addColumn("f1".getBytes, "isPay".getBytes, dataInfo.isPay.getBytes())
  61.     put.addColumn("f1".getBytes, "orderId".getBytes, dataInfo.orderId.getBytes())
  62.     put.addColumn("f1".getBytes, "tradeSrc".getBytes, dataInfo.tradeSrc.getBytes())
  63.     put.addColumn("f1".getBytes, "payTime".getBytes, dataInfo.payTime.getBytes())
  64.     put.addColumn("f1".getBytes, "productMoney".getBytes, dataInfo.productMoney.getBytes())
  65.     put.addColumn("f1".getBytes, "totalMoney".getBytes, dataInfo.totalMoney.getBytes())
  66.     put.addColumn("f1".getBytes, "dataFlag".getBytes, dataInfo.dataFlag.getBytes())
  67.     put.addColumn("f1".getBytes, "userId".getBytes, dataInfo.userId.getBytes())
  68.     put.addColumn("f1".getBytes, "areaId".getBytes, dataInfo.areaId.getBytes())
  69.     put.addColumn("f1".getBytes, "createTime".getBytes, dataInfo.createTime.getBytes())
  70.     put.addColumn("f1".getBytes, "payMethod".getBytes, dataInfo.payMethod.getBytes())
  71.     put.addColumn("f1".getBytes, "isRefund".getBytes, dataInfo.isRefund.getBytes())
  72.     put.addColumn("f1".getBytes, "tradeType".getBytes, dataInfo.tradeType.getBytes())
  73.     put.addColumn("f1".getBytes, "status".getBytes, dataInfo.status.getBytes())
  74.     hbTable.put(put)
  75.   }
  76.   def insertArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
  77.     // val tableName = "wzk_area"
  78.     // val columnFamily = "f1"
  79.     // 如果表不存在则创建
  80.     // createTableIfNotExists(connection, tableName, columnFamily)
  81.     println(areaInfo.toString)
  82.     val put = new Put(areaInfo.id.getBytes())
  83.     put.addColumn("f1".getBytes(), "name".getBytes(), areaInfo.name.getBytes())
  84.     put.addColumn("f1".getBytes(), "pid".getBytes(), areaInfo.pid.getBytes())
  85.     put.addColumn("f1".getBytes(), "sname".getBytes(), areaInfo.sname.getBytes())
  86.     put.addColumn("f1".getBytes(), "level".getBytes(), areaInfo.level.getBytes())
  87.     put.addColumn("f1".getBytes(), "citycode".getBytes(), areaInfo.citycode.getBytes())
  88.     put.addColumn("f1".getBytes(), "yzcode".getBytes(), areaInfo.yzcode.getBytes())
  89.     put.addColumn("f1".getBytes(), "mername".getBytes(), areaInfo.mername.getBytes())
  90.     put.addColumn("f1".getBytes(), "lng".getBytes(), areaInfo.Lng.getBytes())
  91.     put.addColumn("f1".getBytes(), "lat".getBytes(), areaInfo.Lat.getBytes())
  92.     put.addColumn("f1".getBytes(), "pinyin".getBytes(), areaInfo.pinyin.getBytes())
  93.     hbTable.put(put)
  94.   }
  95.   def deleteArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
  96.     val delete = new Delete(areaInfo.id.getBytes)
  97.     hbTable.delete(delete)
  98.   }
  99.   def createTableIfNotExists(connection: Connection, tableName: String, columnFamily: String): Unit = {
  100.     val admin = connection.getAdmin
  101.     try {
  102.       val table = TableName.valueOf(tableName)
  103.       // 检查表是否存在
  104.       if (!admin.tableExists(table)) {
  105.         val tableDescriptor = new HTableDescriptor(table)
  106.         val columnDescriptor = new HColumnDescriptor(columnFamily.getBytes())
  107.         tableDescriptor.addFamily(columnDescriptor)
  108.         // 创建表
  109.         admin.createTable(tableDescriptor)
  110.         println(s"表 $tableName 创建成功")
  111.       } else {
  112.         println(s"表 $tableName 已存在")
  113.       }
  114.     } finally {
  115.       admin.close()
  116.     }
  117.   }
  118. }
复制代码
SourceKafka

  1. class SourceKafka {
  2.   def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] = {
  3.     val props = new Properties()
  4.     props.setProperty("bootstrap.servers", "h121.wzk.icu:9092")
  5.     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  6.     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  7.     props.setProperty("group.id", "hbase-test")
  8.     props.setProperty("auto.offset.reset", "earliest")
  9.     new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(), props)
  10.   }
  11. }
复制代码
KafkaToHBase

  1. object KafkaToHBase {
  2.   def main(args: Array[String]): Unit = {
  3.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  4.     val kafkaConsumer = new SourceKafka().getKafkaSource("dwshow")
  5.     kafkaConsumer.setStartFromLatest()
  6.     val sourceStream = env.addSource(kafkaConsumer)
  7.     val mapped: DataStream[util.ArrayList[TableObject]] = sourceStream.map(x => {
  8.       val jsonObj: JSONObject = JSON.parseObject(x)
  9.       val database: AnyRef = jsonObj.get("database")
  10.       val table: AnyRef = jsonObj.get("table")
  11.       val typeInfo: AnyRef = jsonObj.get("type")
  12.       val objects = new util.ArrayList[TableObject]()
  13.       jsonObj.getJSONArray("data").forEach(x => {
  14.         objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))
  15.         println(x.toString)
  16.       })
  17.       objects
  18.     })
  19.     mapped.addSink(new SinkHBase)
  20.     env.execute()
  21.   }
  22. }
复制代码
启动项目

我们对表进行修改:

可以看到控制台对饮输出了内容:

别的表也实行修改一下:

查看 HBase 可以看到数据已经有了:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

卖不甜枣

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表