ToB企服应用市场:ToB评测及商务社交产业平台

标题: 大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计 [打印本页]

作者: 刘俊凯    时间: 2024-10-1 17:54
标题: 大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:


章节内容

上节我们完成了如下的内容:


整体流程


案例假设

假设我们在构建一个用户举动分析体系,通过 Kafka 收罗用户点击日记,并通过 Druid 实时分析用户举动。

需求分析

场景分析


数据描述

  1. {"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":
  2. [{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}
复制代码

以上的嵌套的json数据格式,Druid不利益理,需要对数据进行预处理,将数据拉平,处理后的数据格式:
  1. {"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
  2. {"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}
  3. {"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
  4. {"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}
复制代码
Kafka生产者

很久没用Scala了,用Scala写一个:
  1. package icu.wzk.kafka
  2. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  3. import org.apache.kafka.common.serialization.StringSerializer
  4. import java.util.Properties
  5. import scala.io.BufferedSource
  6. object KafkaProducerForDruid {
  7.   def main(args: Array[String]): Unit = {
  8.     val brokers = "h121.wzk.icu:9092"
  9.     val topic = "druid2"
  10.     val prop = new Properties()
  11.     prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  12.     prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
  13.     prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
  14.     val producer = new KafkaProducer[String, String](prop);
  15.     val source: BufferedSource = scala.io.Source.fromFile("orders1.json")
  16.     val iter: Iterator[String] = source.getLines();
  17.     iter.foreach {
  18.       line => val msg = new ProducerRecord[String, String](topic, line);
  19.         producer.send(msg)
  20.         println(msg)
  21.         Thread.sleep(10)
  22.     }
  23.     producer.close()
  24.     source.close()
  25.   }
  26. }
复制代码
运行结果如下图:

Druid导入数据

这里就不详细描述了,之前入门阶段已经走过完整的流程了:

加载数据源:

JSON 拉平:

时间戳:

不要进行 RollUp:

最终结果如下图所示:

计算结果如下图所示:

运行测试的SQL,一切正常!

查询计算

订单总数

  1. -- 查询订单总数
  2. SELECT COUNT(distinct orderId) as orderscount
  3. FROM druid2
复制代码
运行结果如下图所示:

用户总数

  1. -- 查询用户总数
  2. SELECT COUNT(distinct userId) as usercount
  3. FROM druid2
复制代码
运行结果如下图:

统计结果状态订单数

  1. -- 统计各种订单状态的订单数
  2. SELECT orderStatus, count(*)
  3. FROM (
  4.   SELECT orderId, orderStatus
  5.   FROM druid2
  6.   GROUP BY orderId, orderStatus
  7. )
  8. GROUP BY orderStatus
复制代码
实行结果如下图所示:

统计各种付出方式的订单数

  1. -- 统计各种支付方式订单数
  2. SELECT payMode, count(1)
  3. FROM (
  4.   SELECT orderId, payMode
  5.   FROM druid2
  6.   GROUP BY orderId, payMode
  7. )
  8. GROUP BY payMode
复制代码
实行结果如下图所示:

订单金额最大的前10名

  1. -- 订单金额最大的前10名
  2. SELECT orderId, payment, count(1) as productcount, sum("product.productNum") as products
  3. FROM druid2
  4. GROUP BY orderId, payment
复制代码
实行结果如下图所示:

案例小节



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4