点一下关注吧!!!非常感谢!!持续更新!!!
Java篇开始了!
- MyBatis 更新完毕
- 现在开始更新 Spring,一起深入浅出!
现在已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据挖掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(已更完)
- 实时数仓(正在更新…)
章节内容
根本介绍
在 Kafka 中写入维度表(DIM)通常涉及将实时或批处理惩罚数据从 Kafka 主题(Topic)读取,并根据数据流中的信息更新维度表(DIM),这在数据堆栈或数据湖的 ETL(提取、转换、加载)过程中非经常见。维度表(DIM)存储的是与业务数据相干的维度信息,例如客户、产品、地理位置等,用于支持 OLAP(联机分析处理惩罚)查询。
明白 Kafka 数据流
Kafka 是一个分布式流平台,用于高吞吐量的消息通报。在 ETL 过程中,Kafka 通常用作数据的消息队列大概流处理惩罚的来源。每当新数据生成时,它会被发布到 Kafka 中的某个主题(Topic),然后消耗者(Consumer)可以从主题中获取数据进行处理惩罚。
设计维度表(DIM)
维度表通常包含业务实体的详细信息,如产品名称、客户信息、时间维度等。与事实表(Fact)不同,维度表的数据较为静态,但大概会随着时间更新(例如,客户地址变更或产品种别更新)。每个维度表通常有一个唯一的主键(如 customer_id 或 product_id)来标识记录。
Kafka 消耗者(Consumer)
为了从 Kafka 中读取维度数据,需要创建一个消耗者(Consumer),它会从 Kafka 的某个主题(Topic)中读取消息。这些消息通常是 JSON 格式,包含需要写入维度表的信息。消耗者将从 Kafka 主题中获取数据,大概包括以下步骤:
- 连接到 Kafka 集群。
- 订阅一个或多个主题(Topics)。
- 消耗消息并将其通报给后续的处理惩罚逻辑。
- 消耗者的实现可以使用 Kafka 提供的客户端库,例如 Kafka 的 Java 客户端、Python 的 confluent-kafka 等。
数据处理惩罚和转换
在读取到 Kafka 消息后,消耗者需要对数据进行须要的处理惩罚和转换。对于维度数据,处理惩罚逻辑大概包括:
- 数据剖析:将消息从 Kafka 中的格式(例如 JSON)剖析成结构化数据。
- 校验数据:检查数据是否符合业务规则,是否完整,是否有用。
- 维度数据更新:如果 Kafka 中的消息包含的维度信息已经存在,则更新相干记录;如果是新维度,则插入新记录。
维度表的更新
维度表的更新通常有两种常见的方式:
- 全量更新:每次从 Kafka 获取到新的数据时,都将其覆盖到维度表中。这种方式适用于数据变动较少大概可以接受重写的场景。
- 增量更新:根据时间戳、有用性标记或版本号等信息,更新已有的维度记录。这种方式适用于数据会有更新(如地址或状态变更)的场
景。
增量更新时,通常会执行以下操纵:
- 查找是否已有该维度记录(例如通过 dimension_id)。
- 如果存在且数据发生变革,则更新该记录,同时更新 valid_to 时间,并插入一条新的记录,设置 valid_from 和 valid_to 时间。
- 如果不存在该记录,则直接插入新的维度数据。
写入到目标存储(DIM)
在数据处理惩罚后,需要将更新后的维度数据写入目标存储。这通常是一个数据库(例如 MySQL、PostgreSQL 或 NoSQL 数据库)或数据堆栈(例如 Snowflake、Google BigQuery、Redshift)中的维度表(DIM)。
数据存储更新(事务性思量)
对于维度表的更新,通常需要确保数据的一致性。可以使用事务来确保数据在更新过程中的一致性,防止数据丢失或重复。例如,可以在事务中执行所有的更新和插入操纵,确保如果操纵失败,可以回滚。
TableObject
创建样例 TableObject
- case class TableObject(database: String, tableName: String, typeInfo: String, dataInfo: String) extends Serializable
复制代码 AreaInfo
- case class AreaInfo(
- id: String,
- name: String,
- pid: String,
- sname: String,
- level: String,
- citycode: String,
- yzcode: String,
- mername: String,
- Lng: String,
- Lat: String,
- pinyin: String
- )
复制代码 DataInfo
- case class DataInfo(
- modifiedTime: String,
- orderNo: String,
- isPay: String,
- orderId: String,
- tradeSrc: String,
- payTime: String,
- productMoney: String,
- totalMoney: String,
- dataFlag: String,
- userId: String,
- areaId: String,
- createTime: String,
- payMethod: String,
- isRefund: String,
- tradeType: String,
- status: String
- )
复制代码 ConnHBase
- class ConnHBase {
- def connToHbase:Connection ={
- val conf : Configuration = HBaseConfiguration.create()
- conf.set("hbase.zookeeper.quorum","h121.wzk.icu,h122.wzk.icu,h123.wzk.icu")
- conf.set("hbase.zookeeper.property.clientPort","2181")
- conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)
- conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)
- val connection = ConnectionFactory.createConnection(conf)
- connection
- }
- }
复制代码 SinkHBase
- class SinkHBase extends RichSinkFunction[util.ArrayList[TableObject]] {
- var connection : Connection = _
- var hbTable : Table = _
- override def open(parameters: Configuration): Unit = {
- connection = new ConnHBase().connToHbase
- hbTable = connection.getTable(TableName.valueOf("wzk_area"))
- }
- override def close(): Unit = {
- if (hbTable != null) {
- hbTable.close()
- }
- if (connection != null) {
- connection.close()
- }
- }
- override def invoke(value: util.ArrayList[TableObject], context: SinkFunction.Context[_]): Unit = {
- value.forEach(x => {
- println(x.toString)
- val database: String = x.database
- val tableName: String = x.tableName
- val typeInfo: String = x.typeInfo
- if ((database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_trade_orders"))) {
- if (typeInfo.equalsIgnoreCase("insert")) {
- value.forEach(x => {
- val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])
- insertTradeOrders(hbTable, info)
- })
- } else if (typeInfo.equalsIgnoreCase("update")) {
- } else if (typeInfo.equalsIgnoreCase("delete")) {
- }
- }
- if (database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_area")) {
- if (typeInfo.equalsIgnoreCase("insert")) {
- value.forEach(x => {
- val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
- insertArea(hbTable, info)
- })
- } else if (typeInfo.equalsIgnoreCase("update")) {
- value.forEach(x => {
- val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
- insertArea(hbTable, info)
- })
- } else if (typeInfo.equalsIgnoreCase("delete")) {
- value.forEach(x => {
- val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])
- deleteArea(hbTable, info)
- })
- }
- }
- })
- }
- def insertTradeOrders(hbTable: Table, dataInfo: DataInfo): Unit = {
- val tableName = "wzk_trade_orders"
- val columnFamily = "f1"
- // 如果表不存在则创建
- createTableIfNotExists(connection, tableName, columnFamily)
- val put = new Put(dataInfo.orderId.getBytes)
- put.addColumn("f1".getBytes, "modifiedTime".getBytes, dataInfo.modifiedTime.getBytes())
- put.addColumn("f1".getBytes, "orderNo".getBytes, dataInfo.orderNo.getBytes())
- put.addColumn("f1".getBytes, "isPay".getBytes, dataInfo.isPay.getBytes())
- put.addColumn("f1".getBytes, "orderId".getBytes, dataInfo.orderId.getBytes())
- put.addColumn("f1".getBytes, "tradeSrc".getBytes, dataInfo.tradeSrc.getBytes())
- put.addColumn("f1".getBytes, "payTime".getBytes, dataInfo.payTime.getBytes())
- put.addColumn("f1".getBytes, "productMoney".getBytes, dataInfo.productMoney.getBytes())
- put.addColumn("f1".getBytes, "totalMoney".getBytes, dataInfo.totalMoney.getBytes())
- put.addColumn("f1".getBytes, "dataFlag".getBytes, dataInfo.dataFlag.getBytes())
- put.addColumn("f1".getBytes, "userId".getBytes, dataInfo.userId.getBytes())
- put.addColumn("f1".getBytes, "areaId".getBytes, dataInfo.areaId.getBytes())
- put.addColumn("f1".getBytes, "createTime".getBytes, dataInfo.createTime.getBytes())
- put.addColumn("f1".getBytes, "payMethod".getBytes, dataInfo.payMethod.getBytes())
- put.addColumn("f1".getBytes, "isRefund".getBytes, dataInfo.isRefund.getBytes())
- put.addColumn("f1".getBytes, "tradeType".getBytes, dataInfo.tradeType.getBytes())
- put.addColumn("f1".getBytes, "status".getBytes, dataInfo.status.getBytes())
- hbTable.put(put)
- }
- def insertArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
- // val tableName = "wzk_area"
- // val columnFamily = "f1"
- // 如果表不存在则创建
- // createTableIfNotExists(connection, tableName, columnFamily)
- println(areaInfo.toString)
- val put = new Put(areaInfo.id.getBytes())
- put.addColumn("f1".getBytes(), "name".getBytes(), areaInfo.name.getBytes())
- put.addColumn("f1".getBytes(), "pid".getBytes(), areaInfo.pid.getBytes())
- put.addColumn("f1".getBytes(), "sname".getBytes(), areaInfo.sname.getBytes())
- put.addColumn("f1".getBytes(), "level".getBytes(), areaInfo.level.getBytes())
- put.addColumn("f1".getBytes(), "citycode".getBytes(), areaInfo.citycode.getBytes())
- put.addColumn("f1".getBytes(), "yzcode".getBytes(), areaInfo.yzcode.getBytes())
- put.addColumn("f1".getBytes(), "mername".getBytes(), areaInfo.mername.getBytes())
- put.addColumn("f1".getBytes(), "lng".getBytes(), areaInfo.Lng.getBytes())
- put.addColumn("f1".getBytes(), "lat".getBytes(), areaInfo.Lat.getBytes())
- put.addColumn("f1".getBytes(), "pinyin".getBytes(), areaInfo.pinyin.getBytes())
- hbTable.put(put)
- }
- def deleteArea(hbTable: Table, areaInfo: AreaInfo): Unit = {
- val delete = new Delete(areaInfo.id.getBytes)
- hbTable.delete(delete)
- }
- def createTableIfNotExists(connection: Connection, tableName: String, columnFamily: String): Unit = {
- val admin = connection.getAdmin
- try {
- val table = TableName.valueOf(tableName)
- // 检查表是否存在
- if (!admin.tableExists(table)) {
- val tableDescriptor = new HTableDescriptor(table)
- val columnDescriptor = new HColumnDescriptor(columnFamily.getBytes())
- tableDescriptor.addFamily(columnDescriptor)
- // 创建表
- admin.createTable(tableDescriptor)
- println(s"表 $tableName 创建成功")
- } else {
- println(s"表 $tableName 已存在")
- }
- } finally {
- admin.close()
- }
- }
- }
复制代码 SourceKafka
- class SourceKafka {
- def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] = {
- val props = new Properties()
- props.setProperty("bootstrap.servers", "h121.wzk.icu:9092")
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
- props.setProperty("group.id", "hbase-test")
- props.setProperty("auto.offset.reset", "earliest")
- new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(), props)
- }
- }
复制代码 KafkaToHBase
- object KafkaToHBase {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val kafkaConsumer = new SourceKafka().getKafkaSource("dwshow")
- kafkaConsumer.setStartFromLatest()
- val sourceStream = env.addSource(kafkaConsumer)
- val mapped: DataStream[util.ArrayList[TableObject]] = sourceStream.map(x => {
- val jsonObj: JSONObject = JSON.parseObject(x)
- val database: AnyRef = jsonObj.get("database")
- val table: AnyRef = jsonObj.get("table")
- val typeInfo: AnyRef = jsonObj.get("type")
- val objects = new util.ArrayList[TableObject]()
- jsonObj.getJSONArray("data").forEach(x => {
- objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))
- println(x.toString)
- })
- objects
- })
- mapped.addSink(new SinkHBase)
- env.execute()
- }
- }
复制代码 启动项目
我们对表进行修改:
可以看到控制台对饮输出了内容:
别的表也实行修改一下:
查看 HBase 可以看到数据已经有了:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |