ToB企服应用市场:ToB评测及商务社交产业平台

标题: springboot~kafka-stream实现实时统计 [打印本页]

作者: 大连全瓷种植牙齿制作中心    时间: 2024-3-27 07:48
标题: springboot~kafka-stream实现实时统计
实时统计,也可以理解为流式计算,一个输入流,一个输出流,源源不断。
Kafka Stream

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点

相关术语

源处理器和Sink处理器是Kafka Streams中的两个重要组件,它们分别用于从输入流获取数据并将处理后的数据发送到输出流。以下是它们的工作流程的文字图示表达:
  1. [Source Processor] -> [Processor Topology] -> [Sink Processor]
复制代码
通过这种处理流程,Kafka Streams可以实现对数据流的灵活处理和转换,使得你能够方便地构建实时流处理应用程序。
kafka stream demo

依赖
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4.     <version>2.5.5.RELEASE</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.kafka</groupId>
  8.     <artifactId>kafka-clients</artifactId>
  9.     <version>2.5.1</version>
  10. </dependency>
  11. <dependency>
  12.     <groupId>org.apache.kafka</groupId>
  13.     <artifactId>kafka-streams</artifactId>
  14.   <version>2.5.1</version>
  15. </dependency>
复制代码
环境准备

业务代码

  1. @Configuration
  2. @EnableKafkaStreams
  3. public class KafkaStreamConfig {
  4.         private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
  5.         @Value("${spring.kafka.bootstrap-servers}")
  6.         private String hosts;
  7.         @Value("${spring.kafka.consumer.group-id}")
  8.         private String group;
  9.         @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  10.         public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
  11.                 Map<String, Object> props = new HashMap<>();
  12.                 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
  13.                 props.put(StreamsConfig.APPLICATION_ID_CONFIG, group + "_stream_aid");
  14.                 props.put(StreamsConfig.CLIENT_ID_CONFIG, group + "_stream_cid");
  15.                 props.put(StreamsConfig.RETRIES_CONFIG, 3);
  16.                 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//从最近的消息开始消费
  17.                 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  18.                 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  19.                 return new KafkaStreamsConfiguration(props);
  20.         }
  21. }
复制代码
  1. @Configuration
  2. @Slf4j
  3. public class KafkaStreamListener {
  4.         @Autowired
  5.         ReportLoginTypeMapper reportLoginTypeMapper;
  6.         @KafkaListener(topics = "total_record")
  7.         public void listen(ConsumerRecord<String, String> record) {
  8.                 // 将时间戳转换为本地日期时间
  9.                 LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault());
  10.                 ReportLoginType reportLoginType=new ReportLoginType();
  11.                 reportLoginType.setLoginType(record.key());
  12.                 reportLoginType.setCreateAt(dateTime);
  13.                 reportLoginType.setCount(Integer.parseInt(record.value()));
  14.                 reportLoginTypeMapper.insert(reportLoginType);
  15.         }
  16.         @Bean
  17.         public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
  18.                 KStream<String, String> stream = streamsBuilder.stream("KC_LOGIN");
  19.                 KStream<String, String> serializedStream = stream.mapValues(jsonString -> {
  20.                         // 分组依据
  21.                         if (JSONUtil.parseObj(jsonString).containsKey("details")) {
  22.                                 JSONObject details = JSONUtil.parseObj(jsonString).getJSONObject("details");
  23.                                 if (details.containsKey("loginType")) {
  24.                                         String loginType = details.getStr("loginType");
  25.                                         return loginType;
  26.                                 }
  27.                                 return "";
  28.                         }
  29.                         else {
  30.                                 return "";
  31.                         }
  32.                 });
  33.                 /**
  34.                  * 处理消息的value
  35.                  */
  36.                 serializedStream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
  37.                         @Override
  38.                         public Iterable<String> apply(String value) {
  39.                                 return Arrays.asList(value.split(" "));
  40.                         }
  41.                 }).filter((key, value) -> !value.equals(""))
  42.                                 // 按照value进行聚合处理
  43.                                 .groupBy((key, value) -> value)// 这进而的value是kafka的消息内容
  44.                                 // 时间窗口
  45.                                 .windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
  46.                                 // 统计单词的个数
  47.                                 .count()
  48.                                 // 转换为kStream
  49.                                 .toStream().map((key, value) -> {
  50.                                         // key是分组的key,value是分组count的结果
  51.                                         return new KeyValue<>(key.key().toString(), value.toString());
  52.                                 })
  53.                                 // 发送消息
  54.                                 .to("topic-out");
  55.                 return stream;
  56.         }
  57. }
复制代码
最后看一下total_record的内容



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4