Kafka Stream实战教程

打印 上一主题 下一主题

主题 848|帖子 848|积分 2544

Kafka Stream实战教程

1. Kafka Streams 基础入门

1.1 什么是 Kafka Streams

Kafka Streams 是 Kafka 生态中用于 处理实时流数据 的一款轻量级流处理库。它利用 Kafka 作为数据来源和数据输出,可以让开发者轻松地对实时数据进行处理,好比计数、聚合、过滤等操作。Kafka Streams 的一个显著特点是其设计简洁,帮助我们快速构建和部署实时流处理应用,而不需要复杂的集群管理。
对比传统流处理框架(如 Spark Streaming):传统流处理框架通常需要独立的集群支持,并有较重的计算资源需求。而 Kafka Streams 内置在 Kafka 中,既不需要单独的集群支持,性能上也更轻量,得当需要实时响应的场景,好比在线日记监控、实时订单处理等。
Kafka Streams 的应用场景


  • 实时数据分析:如热门商品实时排名、网站的热点数据追踪
  • 实时监控和告警:如体系指标监控,异常行为检测
  • 数据洗濯与格式转换:如从原始数据中抽取特定字段、转换格式用于下游体系
  • 复杂变乱处理:如订单状态跟踪、用户行为关联分析
1.2 Kafka Streams 核心概念

要理解 Kafka Streams,先相识几个核心概念:


  • Stream(数据流):一个数据流是源源不停的数据纪录流(类似于消息流)。在 Kafka 中,每个数据流对应 Kafka 的一个主题(topic)。
  • Table(表):类似于数据库中的表,是数据的快照,通常包含每个键的最新状态。Kafka Streams 通过将流(Stream)聚合为表(Table),提供了在实时数据上进行去重和合并的能力。
  • KStream 和 KTable

    • KStream:一个纪录的无状态流,适适用于过滤、转换等操作,得当处理简朴的逐条消息处理。
    • KTable:类似于数据库的表,有键值对的布局,得当做聚合、去重、统计等操作。
    • 两者可以相互转换,好比可以将一个 KStream 聚合成 KTable,也可以从 KTable 中天生 KStream。

  • 时间语义:Kafka Streams 提供了变乱时间(Event Time)、处理时间(Processing Time)、摄取时间(Ingestion Time)三种时间语义,帮助用户更灵活地处理时序数据。
  • 状态存储和窗口(Windows):Kafka Streams 提供内置的状态存储来生存流的中心状态,如用户登录状态等。窗口操作(windowing)答应我们在肯定的时间隔断内对流数据进行聚合和分组操作,好比每 5 分钟统计一次某产物的点击量。
表二元性描述了流和表之间的紧密关系。


  • 流作为表:流可以被视为表的变更日记,此中流中的每个数据纪录都捕捉表的状态变革。因此,流是伪装的表,可以通过从头到尾重放变更日记来重建表,从而轻松地将其转换为“真实”表。同样,在更一般的类比中,聚合流中的数据纪录(例如从页面欣赏变乱流中计算用户的总页面欣赏量)将返回一个表(此处的键和值分别是用户及其对应的页面欣赏量)。
  • 表作为流:表可以被视为某个时间点的快照,是流中每个键的最新值(流的数据纪录是键值对)。因此,表是伪装的流,通过迭代表中的每个键值条目,可以轻松地将其转换为“真实”流。

kafka文档
1.3 开发环境搭建

搭建 Kafka Streams 开发环境的步调如下:

  • 安装 Kafka

    • 下载安装 Kafka,然后启动 Kafka 服务和 Zookeeper 服务。
    • 常用命令:启动 Kafka 服务器,bin/kafka-server-start.sh config/server.properties

  • 创建 Kafka Streams 项目

    • 新建一个 Maven 或 Gradle 项目,并添加 Kafka Streams 的依赖:
      1. <!-- Maven 依赖 -->
      2. <dependency>
      3.     <groupId>org.apache.kafka</groupId>
      4.     <artifactId>kafka-streams</artifactId>
      5.     <version>3.0.0</version>
      6. </dependency>
      复制代码

  • 开发Hello Kafka Streams 应用

    • 创建一个简朴的 Kafka Streams 应用,读取输入流,进行简朴的数据处理,然后输出效果。
    1. import org.apache.kafka.streams.KafkaStreams;
    2. import org.apache.kafka.streams.StreamsBuilder;
    3. import org.apache.kafka.streams.StreamsConfig;
    4. import org.apache.kafka.streams.kstream.KStream;
    5. import java.util.Properties;
    6. public class HelloKafkaStreams {
    7.     public static void main(String[] args) {
    8.         // 配置 Kafka Streams
    9.         Properties props = new Properties();
    10.         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello-streams-app");
    11.         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    12.         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
    13.         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
    14.         // 构建流处理拓扑
    15.         StreamsBuilder builder = new StreamsBuilder();
    16.         KStream<String, String> inputStream = builder.stream("input-topic");
    17.         // 进行简单的处理,比如将消息转换为大写
    18.         KStream<String, String> processedStream = inputStream.mapValues(value -> value.toUpperCase());
    19.         // 将处理后的流写入输出主题
    20.         processedStream.to("output-topic");
    21.         // 创建并启动 Kafka Streams
    22.         KafkaStreams streams = new KafkaStreams(builder.build(), props);
    23.         streams.start();
    24.         // 添加关闭钩子
    25.         Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    26.     }
    27. }
    复制代码

  • 运行 Kafka Streams 应用

    • 确保 Kafka 服务已启动,运行该应用,将消息发到“input-topic”主题,观察“output-topic”主题中的转换效果。


完成以上步调后,你已经实现了第一个简朴的 Kafka Streams 应用。这个应用读取“input-topic”中的消息,将其内容转换为大写后写入“output-topic”中。
2. Kafka Streams 实现原理

在理解和利用 Kafka Streams 进行流处理之前,深入相识着实现原理可以帮助我们更好地优化应用性能和处理策略。Kafka Streams 作为一个轻量级、分布式的数据处理库,提供了流处理的易用性和强大的实时性。这一节将介绍 Kafka Streams 的实现原理,包罗其架构设计和核心组件。
1. Kafka Streams 架构概述

Kafka Streams 是构建在 Kafka 消息体系之上的一个流处理库,它提供了一些特性,使得其轻易集成到现有的 Kafka 基础办法中进行实时数据流的处理。Kafka Streams 的主要组成部分包罗:


  • 流处理拓扑(Topology):描述了应用中各个流处理过程的图布局,包罗数据的源、处理逻辑和输出。
  • 任务(Tasks):一个 Kafka Streams 应用程序被分配为多个任务,每个任务负责处理特定的分区数据。
  • 线程模子:每个 Kafka Streams 实例可以通过设置线程数来实现并行处理。
2. 核心组件

1. 流处理拓扑(Topology)
流处理的核心是通过界说流处理拓扑来实现的。拓扑由多个处理节点(Processor)、source 和 sink 组成。每个节点负责执行特定的数据转换逻辑。


  • **Source Processor **:从 Kafka 主题读取数据。
  • Processor Node:应用具体的数据处理逻辑,如过滤、转换、聚合等。
  • **Sink Processor **:将处理效果输出到 Kafka 主题。

kafka stream core-concepts
   Stream Processing Topology

  

  • A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
  • A stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).
  • A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.
  There are two special processors in the topology:
  

  • Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
  • Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.
  Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.
  Example:
  1. StreamsBuilder builder = new StreamsBuilder();
  2. KStream<String, String> source = builder.stream("input-topic");
  3. // Data processing logic
  4. KStream<String, String> processed = source.filter((key, value) -> value.contains("important"));
  5. processed.to("output-topic");
复制代码
2. 状态存储(State Store)
Kafka Streams 支持有状态流处理,利用状态存储(如 RocksDB)来生存中心效果。每个处理节点都可以维护自己的状态,以便实现如计数、聚合等操作。


  • Persistent State Store:通过内存和磁盘存储队列实现持久化。
  • Changelog Topics:每次对状态的更新都会被纪录到 Kafka 中的 changelog 主题,确保数据的恢复能力。
3. 时间语义
Kafka Streams 提供了三种时间语义,用于进行窗口化的流分析:


  • Event Time:变乱或数据纪录发生的时间点,即最初在“源头”创建的时间点。**例如:**假如变乱是汽车中的 GPS 传感器陈诉的地理位置变革,则相干变乱时间将是 GPS 传感器捕捉位置变革的时间。
  • Processing Time: 变乱或数据纪录恰好被流处理应用程序处理的时间点,即纪录被利用的时间点。处理时间可能比原始变乱时间晚几毫秒、几小时或几天等。
  • Ingestion Time:变乱被纪录进入 Kafka 的时间。
4. 错误处理
通过自界说的异常处理机制(如 DeserializationExceptionHandler),Kafka Streams 能够继续处理别的数据而不影响整体流程。
3. 任务执行

Kafka Streams 将应用程序拓扑根据 Kafka 主题的分区自动分别为多个任务(Task),这些任务可以在多个线程中并行执行。每个 Task 负责处理特定的分区数据,因此从根本上进步了水平扩展能力。


  • 独立性:每个 Task 具有独立的状态和处理逻辑,与其他 Task 相互隔离。
  • 自动负载均衡:当 Kafka Streams 实例的数目改变时,任务会自动重新分配,以实现负载均衡。
4. 线程与实例



  • 线程设置:通过设置线程数,应用程序可以在单个实例中并行处理多个任务。
    1. Properties props = new Properties();
    2. props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); // 设置应用程序使用两个线程
    复制代码
  • 实例扩缩:多个实例共同构成 Stream 应用,可以水平扩展应用性能,实例之间通过协调协议共享状态。
总结

理解 Kafka Streams 的实现原理能够帮助我们更高效地开发和部署实时流应用。通过公道设计流处理拓扑、利用状态存储、制定抗故障策略,以及搭配适当的时间语义,Kafka Streams 能够有用地应对复杂的数据流处理场景。最终,这种深刻的理解可以在体系性能优化和调优中发挥关键作用。
3. Kafka Streams 的基础操作

在完成第一个 Kafka Streams 应用后,我们将进一步相识 Kafka Streams 的基础操作,重点关注一些常用的流数据处理方法,包罗数据过滤、映射、聚合、分组、和窗口操作等。这些操作让我们可以针对差别业务需求进行丰富的流数据转换和处理。

3.1 基础操作方法概览

在 Kafka Streams 中,我们通常会用 KStream 和 KTable 来表示数据流。以下是一些常见的操作方法:


  • 过滤(filter):筛选符合条件的纪录
  • 映射(map, mapValues):转换每条纪录的键和值
  • 分组(groupByKey, groupBy):将纪录按指定键分组,为聚合操作做准备
  • 聚合(count, reduce, aggregate):对纪录进行汇总,如计数、求和等
  • 窗口操作(windowedBy):按时间窗口进行分组聚合

3.2 数据过滤(Filter)

过滤操作答应我们筛选出符合条件的数据。例如,假如只想要某个主题中纪录的特定字段,我们可以利用 filter 方法进行筛选。
示例:假设我们有一个主题 orders,每条纪录包含订单的信息。我们想要过滤出金额大于100的订单:
  1. KStream<String, Order> ordersStream = builder.stream("orders");
  2. // 过滤金额大于100的订单
  3. KStream<String, Order> filteredOrders = ordersStream.filter(
  4.     (key, order) -> order.getAmount() > 100
  5. );
  6. filteredOrders.to("filtered-orders");
复制代码
在此示例中,符合条件的订单将被写入 filtered-orders 主题。

3.3 数据映射(Map 和 MapValues)

映射操作用于修改流中的每条纪录。Kafka Streams 提供了 map 和 mapValues 两种方法:


  • map 可以对纪录的键和值进行转换;
  • mapValues 只会对值进行转换,保留键稳定。
示例:将每个订单的金额增加10%并保留其他信息:
  1. KStream<String, Order> updatedOrders = ordersStream.mapValues(
  2.     order -> {
  3.         order.setAmount(order.getAmount() * 1.1);
  4.         return order;
  5.     }
  6. );
  7. updatedOrders.to("updated-orders");
复制代码
这里我们用 mapValues 调解了每个订单的金额,更新后的订单数据会被写入 updated-orders 主题。

3.4 数据分组(GroupBy 和 GroupByKey)

分组操作将数据按指定键重新分组,通常用于聚合操作的前一步。分组后的数据会被存储在 KGroupedStream 中,便于后续的聚合操作。


  • groupByKey:按现有键分组
  • groupBy:可指定新的分组键
示例:按用户 ID 对订单数据进行分组:
  1. KGroupedStream<String, Order> ordersByUser = ordersStream.groupBy(
  2.     (key, order) -> order.getUserId()
  3. );
复制代码
在这里,我们按用户 ID 重新分组,以便于在接下来的步调中对每个用户的订单进行聚合。

3.5 数据聚合(Count、Reduce 和 Aggregate)

聚合操作用于计算分组数据的汇总信息,如计数、求和等。


  • count:统计每组纪录的数目
  • reduce:可以实现自界说的聚合逻辑,例如最大值、最小值等
  • aggregate:实现更灵活的聚合操作,可创建复杂的聚合效果
示例:计算每个用户的订单总金额
  1. KTable<String, Double> totalAmountPerUser = ordersByUser.aggregate(
  2.     () -> 0.0, // 初始化值
  3.     (userId, order, total) -> total + order.getAmount(),
  4.     Materialized.with(Serdes.String(), Serdes.Double())
  5. );
  6. totalAmountPerUser.toStream().to("total-amount-per-user");
复制代码
这里我们利用 aggregate 方法,按用户 ID 统计每个用户的订单总金额,效果会被写入 total-amount-per-user 主题。

3.6 窗口操作(WindowedBy)

窗口操作用于在时间窗口内对流数据进行分组和聚合,非常得当处理时序数据,例如每小时统计一次销售数据。常用的窗口类型有:


  • Tumbling Window:固定长度的窗口,不重叠
  • Hopping Window:固定长度,答应窗口之间重叠
  • Session Window:根据活动时间自动调解的窗口
示例:每隔5分钟统计一次订单数目
  1. KTable<Windowed<String>, Long> orderCountByWindow = ordersByUser
  2.     .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
  3.     .count();
  4. orderCountByWindow.toStream().to("order-count-by-window");
复制代码
在这个示例中,我们按5分钟窗口统计每个用户的订单数目,效果会被写入 order-count-by-window 主题。
  1. @Override
  2. public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
  3. final Aggregator<? super K, ? super V, T> aggregator,
  4. final Merger<? super K, T> sessionMerger) {
  5. return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty());
  6. }
  7. @Override
  8. public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
  9. final Aggregator<? super K, ? super V, VR> aggregator,
  10. final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
  11. return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
  12. }
复制代码

3.7 实战案例

案例1:订单流数据处理示例

我们将多个操作组合起来,创建一个实际的订单数据处理流程。
需求:对 orders 主题中的订单数据进行以下处理:

  • 过滤出金额大于100的订单
  • 按用户 ID 重新分组
  • 计算每个用户过去1小时的订单数目(利用滚动窗口)
  • 将效果写入 high-value-orders 和 order-count-by-hour 主题
代码实现
  1. KStream<String, Order> ordersStream = builder.stream("orders");
  2. // 1. 过滤金额大于100的订单
  3. KStream<String, Order> highValueOrders = ordersStream.filter(
  4.     (key, order) -> order.getAmount() > 100
  5. );
  6. highValueOrders.to("high-value-orders");
  7. // 2. 按用户 ID 分组
  8. KGroupedStream<String, Order> ordersByUser = highValueOrders.groupBy(
  9.     (key, order) -> order.getUserId()
  10. );
  11. // 3. 每小时统计一次订单数量
  12. KTable<Windowed<String>, Long> hourlyOrderCount = ordersByUser
  13.     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
  14.     .count();
  15. // 4. 将统计结果写入主题
  16. hourlyOrderCount.toStream().to("order-count-by-hour");
复制代码

通过以上步调,我们利用 Kafka Streams 的基础操作完成了一个流数据的实时处理任务。
案例 2:销售额实时统计

本案例将带各人相识如何利用 Kafka Streams 实现销售额的实时统计。假设我们有一个主题 sales,每条纪录包含一个订单的销售信息,我们将计算每个商品的实时总销售额和每小时的销售额。

需求分析

我们需要从 sales 主题中读取订单纪录,并进行以下处理:

  • 过滤出金额大于0的有用订单;
  • 按商品 ID 分组计算每个商品的总销售额;
  • 对每个商品进行时间窗口统计,计算每小时的销售额;
  • 将实时总销售额和每小时的销售额写入差别的 Kafka 主题。

步调详解

以下是每个步调的具体实现和代码示例。
步调 1:过滤有用订单

我们首先从 sales 主题中读取订单流,并过滤掉销售金额小于或等于0的无效订单纪录。
  1. KStream<String, SaleOrder> salesStream = builder.stream("sales");
  2. // 过滤出有效的销售记录
  3. KStream<String, SaleOrder> validSalesStream = salesStream.filter(
  4.     (key, saleOrder) -> saleOrder.getAmount() > 0
  5. );
复制代码
在这个代码片段中,我们读取 sales 主题中的数据,利用 filter 方法筛选出 amount 大于0的有用销售纪录。

步调 2:按商品 ID 计算总销售额

接下来,我们将按商品 ID 对订单流重新分组,并计算每个商品的总销售额。
  1. KGroupedStream<String, SaleOrder> salesByProduct = validSalesStream.groupBy(
  2.     (key, saleOrder) -> saleOrder.getProductId()
  3. );
  4. KTable<String, Double> totalSalesByProduct = salesByProduct.aggregate(
  5.     () -> 0.0, // 初始化值
  6.     (productId, saleOrder, total) -> total + saleOrder.getAmount(),
  7.     Materialized.with(Serdes.String(), Serdes.Double())
  8. );
  9. totalSalesByProduct.toStream().to("total-sales-by-product");
复制代码
在这段代码中:


  • 我们按商品 ID 分组;
  • 利用 aggregate 方法为每个商品累计销售额;
  • 将计算出的每个商品的总销售额效果写入 total-sales-by-product 主题。

步调 3:按小时计算每个商品的销售额

我们为每个商品创建一个滚动窗口,每小时计算一次销售额。这有助于我们按时间区间相识每个商品的销售趋势。
  1. KTable<Windowed<String>, Double> hourlySalesByProduct = salesByProduct
  2.     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
  3.     .aggregate(
  4.         () -> 0.0,
  5.         (productId, saleOrder, total) -> total + saleOrder.getAmount(),
  6.         Materialized.with(Serdes.String(), Serdes.Double())
  7.     );
  8. hourlySalesByProduct.toStream().to("hourly-sales-by-product");
复制代码
在这段代码中:


  • windowedBy 方法界说了一个每小时的时间窗口;
  • aggregate 计算每小时的销售额;
  • 效果数据会写入 hourly-sales-by-product 主题,此中窗口包含商品 ID 和每小时的销售额。

步调 4:综合输出

将上述两种统计效果分别输出到 total-sales-by-product 和 hourly-sales-by-product 主题中,消耗者可以订阅这两个主题,获取商品的实时销售额及每小时的销售额动态变革。

完整代码示例

将上述步调组合成完整的 Kafka Streams 程序代码如下:
  1. StreamsBuilder builder = new StreamsBuilder();// 1. 从 'sales' 主题读取数据KStream<String, SaleOrder> salesStream = builder.stream("sales");// 2. 过滤有用的销售纪录KStream<String, SaleOrder> validSalesStream = salesStream.filter(    (key, saleOrder) -> saleOrder.getAmount() > 0);// 3. 按商品 ID 计算总销售额KGroupedStream<String, SaleOrder> salesByProduct = validSalesStream.groupBy(    (key, saleOrder) -> saleOrder.getProductId());KTable<String, Double> totalSalesByProduct = salesByProduct.aggregate(    () -> 0.0,    (productId, saleOrder, total) -> total + saleOrder.getAmount(),    Materialized.with(Serdes.String(), Serdes.Double()));totalSalesByProduct.toStream().to("total-sales-by-product");// 4. 按小时计算每个商品的销售额KTable<Windowed<String>, Double> hourlySalesByProduct = salesByProduct
  2.     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
  3.     .aggregate(
  4.         () -> 0.0,
  5.         (productId, saleOrder, total) -> total + saleOrder.getAmount(),
  6.         Materialized.with(Serdes.String(), Serdes.Double())
  7.     );
  8. hourlySalesByProduct.toStream().to("hourly-sales-by-product");
  9. // 启动流处理应用程序KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();
复制代码

总结

通过该案例,我们完成了:


  • 利用 filter 进行数据筛选;
  • 利用 aggregate 计算总销售额和窗口销售额;
  • 界说每小时窗口,帮助我们跟踪产物的实时销售趋势。
这套流程可广泛用于实时数据分析,帮助业务监控产物销量、把握销售动态等。
4.Kafka Streams 状态管理与持久化

在数据流处理过程中,有时需要维护一些中心状态或纪录,以便进行更复杂的操作。这一章将介绍 Kafka Streams 的状态管理功能,包罗如何利用内置的状态存储,以及如何实现自界说的状态存储。
4.1 状态存储(State Store)

概述
Kafka Streams 提供了本地状态存储的能力,答应我们在进行流处理时纪录和查询中心状态。这是进行高级流计算操作的基础,好比保持当前计数、天生聚合效果等。
内部状态存储的类型

  • 内存存储:实用于轻量级、快速的状态存储场景,但受到内存限定。
  • RocksDB:默认情况下,Kafka Streams 利用 RocksDB 作为嵌入式数据库来存储状态。它支持磁盘存储,得当大量数据的情况。
状态存储与拓扑的关系
状态存储紧密集成在 Kafka Streams 的流处理拓扑中,可以在流处理逻辑中随时读取或更新状态。
实践:创建一个状态存储
在 Kafka Streams 程序中利用 store 方法,将状态存储与流处理连接起来:
  1. KStream<String, Long> views = builder.stream("user-views");
  2. KTable<String, Long> viewCounts = views
  3.     .groupByKey()
  4.     .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("view-counts-store"));
  5. // “view-counts-store” 是用于保存当前视图计数的状态存储
复制代码
4.2 定制状态存储

有时,内置的状态存储不能完全满足需求。Kafka Streams 提供了扩展 API,可以实现自界说状态存储。
自界说 State Store
通过实现 StateStore 接口以及创建自界说的 Processor,可以将流处理的状态生存到外部数据库或自界说存储中。
利用 Processor API 进行状态管理
Processor API 提供了低级别的流处理控制能力,答应我们直接操作状态存储,提供了更多灵活性。
实战:实时账户余额监控
设计一个实时账户余额监控体系,每当用户进行消耗或充值时,体系更新用户的账户余额并将其存于状态存储中。
步调:

  • 界说处理逻辑:实现一个自界说 Processor 以更新账户余额。
  • 设置拓扑:利用 Topology 类来界说流处理的拓扑布局,包罗数据的来源、处理器、状态更新以及输出。
  • 部署与测试:将流处理任务部署到 Kafka Streams,进行实时数据处理和验证。
代码示例
  1. public class BalanceProcessorSupplier implements ProcessorSupplier<String, Long> {
  2.     @Override
  3.     public Processor<String, Long> get() {
  4.         return new BalanceProcessor();
  5.     }
  6. }
  7. public class BalanceProcessor extends AbstractProcessor<String, Long> {
  8.     private KeyValueStore<String, Long> balanceStore;
  9.     @Override
  10.     public void init(ProcessorContext context) {
  11.         super.init(context);
  12.         balanceStore = (KeyValueStore) context.getStateStore("balance-store");
  13.     }
  14.     @Override
  15.     public void process(String accountId, Long amount) {
  16.         Long currentBalance = balanceStore.get(accountId);
  17.         Long updatedBalance = (currentBalance == null ? 0L : currentBalance) + amount;
  18.         balanceStore.put(accountId, updatedBalance);
  19.     }
  20. }
  21. // 示例拓扑结构
  22. Topology topology = new Topology();
  23. topology.addSource("Source", "transactions")
  24.         .addProcessor("Process", new BalanceProcessorSupplier(), "Source")
  25.         .addStateStore(Stores.keyValueStoreBuilder(
  26.                 Stores.persistentKeyValueStore("balance-store"),
  27.                 Serdes.String(),
  28.                 Serdes.Long()), "Process");
  29. KafkaStreams streams = new KafkaStreams(topology, props);
  30. streams.start();
复制代码
总结
通过学习这一章,你应把握 Kafka Streams 中的状态存储功能,包罗如何利用内置存储以及如何进行自界说存储。通过状态存储,流处理程序可以保持中心状态,为更复杂的计算提供支持。在实践中,可以利用状态存储来实现很多实时计算体系的关键功能。
5.Kafka Streams 的高级数据流操作

在进行基本的数据流操作之后,你会发现需要处理更加复杂的数据流场景,好比流的连接、复杂的拓扑界说以及更高级的数据转换。这一章将深入探讨 Kafka Streams 中的高级数据流操作。
5.1 数据连接(Join)操作

概述
连接(Join)操作是数据流处理中非常有用的能力,能够把多个数据流合并在一起,以便从差别来源的信息中获取更丰富的数据关系。Kafka Streams 支持多种类型的 Join,包罗 KStream 和 KTable 之间的差别组合。
差别类型的 Join 操作

  • KStream-KStream Join:用于两个流之间的连接。每当一个流中收到新数据时,查找另一流中满足时间窗口条件的数据进行合并。
    用例:订单流和支付流的合并,产生包含订单支付状态的新纪录。
  • KStream-KTable Join:流和表之间的连接。得当需要查找静态或相对稳定的数据进行关联的场景。
    用例:用户购买行为流与用户信息表的连接,获取更具体的用户信息。
  • KTable-KTable Join:表和表之间的连接,得当静态信息的合并。
    用例:用户信息表和地点信息表的合并。
时间窗口及其留意事项
Join 操作中的数据通常需要界说一个时间窗口,答应合并操作在流中差别步到达的数据间执行。重要的是选择符合的时间窗口以及处理时间的边界情况。
代码示例:KStream-KStream Join
以下代码示例展示了如何在 Kafka Streams 中进行 KStream-KStream Join 操作:
  1. KStream<String, Order> orders = builder.stream("orders");
  2. KStream<String, Payment> payments = builder.stream("payments");
  3. KStream<String, EnrichedOrder> enrichedOrders = orders.join(
  4.     payments,
  5.     (order, payment) -> new EnrichedOrder(order, payment),
  6.     JoinWindows.of(Duration.ofMinutes(5))
  7. );
  8. // orders 和 payments 流中的数据依据订单id进行连接,JoinWindows 指定了5分钟的时间窗口
复制代码
5.2 数据拓扑(Topology)与 Processor API

流处理拓扑的概念与布局
在 Kafka Streams 中,拓扑(Topology)是一系列有序的处理节点,界说了信息从输入到输出的流经路径。每个拓扑都包含一个或多个处理器节点,节点之间可以通过多个流进行连接。
Processor API 基本操作
Kafka Streams 提供的 Processor API 是一个更底层的 API,答应对流处理任务进行细粒度的可控操作。主要组件包罗:


  • Processor:流处理逻辑单位,可以处理输入、更新状态,以及天生输出。
  • Transformer:用于转换现有数据并可能保留处理状态。
  • Punctuator:可以在特定时间触发操作,实用于定时任务。
创建自界说流处理拓扑
通过 Processor API,你可以创建自界说的流处理拓扑,以更灵活地处理流数据。
  1. Topology topology = new Topology();
  2. topology.addSource("Source", "source-topic")
  3.         .addProcessor("Process", MyProcessor::new, "Source")
  4.         .addSink("Sink", "output-topic", "Process");
  5. KafkaStreams streams = new KafkaStreams(topology, props);
  6. streams.start();
复制代码
5.3实战:订单实时分析体系

在当代电子商务平台上,实时订单分析对于理解用户行为和优化业务运作至关重要。本实战项目将带你实现一个通过 Kafka Streams 进行的订单实时分析体系,团结订单流与用户信息,从而实现用户行为的实时洞察。
项目目的



  • 实现订单流与用户信息数据的实时关联查询。
  • 利用 Kafka Streams 的 Join 操作,团结差别类型的数据流。
  • 构建自界说的数据处理拓扑,实现特定的业务逻辑。
步调详解

1. 数据流准备
在本项目中,假设我们有以下两种数据来源:


  • 订单流(orders):包含订单的基本信息,如订单 ID、用户 ID、产物详情、代价等。
  • 用户信息表(user-info):包含用户的静态信息,如用户 ID、姓名、城市等。
2. 界说数据连接
首先,我们需要从 Kafka Topic 中读取订单流和用户信息表。然后,利用 Kafka Streams 的 Join 操作,将两个数据流接洽在一起。
  1. KStream<String, Order> orders = builder.stream("orders");
  2. KTable<String, UserInfo> userInfo = builder.table("user-info");
  3. // 使用用户 ID 作为连接键,将订单流与用户信息表结合
  4. KStream<String, EnrichedOrder> enrichedOrders = orders.join(
  5.     userInfo,
  6.     (order, user) -> new EnrichedOrder(order, user)
  7. );
  8. // enrichedOrders 流现在包含了结合用户信息的完整订单记录
复制代码
3. 设置处理拓扑
在 Kafka Streams 中,我们需要界说数据从输入到输出经过的路径,即所谓的“拓扑布局”。
  1. Topology topology = new Topology();
  2. topology.addSource("OrderSource", "orders")
  3.         .addSource("UserSource", "user-info")
  4.         .addProcessor("JoinProcessor", () -> new JoinProcessor(), "OrderSource", "UserSource")
  5.         .addSink("EnrichedOrderSink", "enriched-orders", "JoinProcessor");
  6. KafkaStreams streams = new KafkaStreams(topology, props);
  7. streams.start();
复制代码
4. 开发自界说处理器
你可能需要更多定制的逻辑以增强流处理。在此项目中,可以编写一个自界说处理器(Processor)来复杂化分析,好比过滤订单、打标签、或变更格式。
  1. public class JoinProcessor extends AbstractProcessor<String, Order> {
  2.     private KeyValueStore<String, UserInfo> userStore;
  3.     @Override
  4.     public void init(ProcessorContext context) {
  5.         super.init(context);
  6.         userStore = (KeyValueStore<String, UserInfo>) context.getStateStore("user-info-store");
  7.     }
  8.     @Override
  9.     public void process(String key, Order order) {
  10.         UserInfo userInfo = userStore.get(order.getUserId());
  11.         if (userInfo != null) {
  12.             EnrichedOrder enrichedOrder = new EnrichedOrder(order, userInfo);
  13.             context().forward(key, enrichedOrder);
  14.         }
  15.     }
  16. }
  17. // 注:此处理器假定 "user-info-store" 是一个存储用户信息的状态存储。
复制代码
5. 部署与监控
完成逻辑开发后,部署 Kafka Streams 应用并设置监控以保障实时数据处理的可靠性。


  • 部署:应用可以通过本地、容器(Docker)、大概 Kubernetes 等环境进行部署。
  • 监控:利用监控工具(如 Prometheus、Grafana)实时分析吞吐量、延迟等关键指标,确保流处理的性能和稳定性。
总结与扩展

经过本实战项目的学习,你已经把握了如何通过 Kafka Streams 实现订单数据流和用户信息表的实时数据加工作业。选择符合的 Join 操作、公道设计拓扑布局,以及灵活运用自界说处理器,可以进步实时分析体系的精确性和服从。
扩展


  • 增加进一步的分析功能,好比趋势分析、异常检测等。
  • 探索分布式体系设计优化,提升数据流处理的拓展性。
  • 实现更多异构数据源的整合,拓展数据处理链条。
总结
通过学习这一章,你将把握如何利用 Kafka Streams 进行高级数据流操作。这些技能使你有能力构建复杂的数据流网络,满足现实天下应用场景中对数据处理的高级需求。精确理解和利用 Join 操作和 Processor API,是实现高效流处理体系的关键。
6.错误处理、容错与调试

在构建实时数据处理体系时,错误处理、容错和调试是确保体系稳定性和可靠性的关键。这一章将介绍 Kafka Streams 如那边理错误,如何保障体系的容错能力,并提供调试技巧来帮助开发和维护。
6.1 错误处理

概述
在流处理过程中,可能会碰到各种错误,包罗数据格式错误、网络题目或体系异常。Kafka Streams 提供了多种机制来帮助处理这些错误,以包管流处理程序的健壮性。
错误处理策略

  • 全局异常处理器:可以通过 Kafka Streams 设置全局异常处理策略,以便在出现无法处理的异常时做出适当响应。
    1. Properties props = new Properties();
    2. props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    3.           LogAndContinueExceptionHandler.class.getName());
    复制代码

    • LogAndContinueExceptionHandler:日记纪录之后继续处理。
    • LogAndFailExceptionHandler:日记纪录后终止处理。

  • 局部异常处理:在特定的操作中捕捉和处理异常。例如,在 Java 代码中利用 try-catch 块来处理特定操作中的异常。
  • 自界说错误处理:可以实现自己的 DeserializationExceptionHandler 以处理反序列化过程中发生的错误。
实践中的错误处理
在实现过程中,利用 try-catch 块掩护可能出现题目的处理逻辑,如先辈的解析或网络操作。
  1. KStream<String, String> stream = builder.stream("input-topic");
  2. stream.foreach((key, value) -> {
  3.     try {
  4.         // 业务逻辑
  5.     } catch (Exception e) {
  6.         // 错误处理逻辑
  7.         System.err.println("Error processing record: " + e.getMessage());
  8.     }
  9. });
复制代码
6.2 容错机制

基本原理
Kafka Streams 自带强大的容错能力,包罗自我修复和状态恢复,以确保处理任务的持续运行及数据处理的一致性。
容错策略

  • State Store 的备份与恢复:利用 Kafka 的 changelog topic,确保数据在处理节点故障时可以恢复。RocksDB 提供了本地持久化存储,团结 changelog 作数据恢复。
  • 端点冗余节点:Kafka Streams 集群可以自动分配任务到多个实例上。当某一部分的实例失败,任务会在其他实例上重新分配和执行。
  • 自动检查与重新启动:Kafka Streams 的心跳机制会定期检查实例的状态,并在发现故障时自动重新启动处理任务。
示例设置
  1. Properties props = new Properties();
  2. props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); // 设置备用副本数
  3. props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);  // 状态主题的副本因子
复制代码
6.3 性能优化

优化策略
在流数据处理过程中,性能调优是实现高效处理的关键。Kafka Streams 提供的多种设置可以帮助我们实现性能优化。

  • 优化缓存和批处理

    • 适当加大缓存设置,以减少请求负荷。
      1. props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760L); // 10 MB 缓存大小
      复制代码
    • 设置批处理大小,顺应网络和处理能力。
      1. props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); // 每分区的缓存记录数
      复制代码

  • 线程设置与资源管理

    • 适当设置线程数,确保充分利用 CPU 而不导致线程竞争。
      1. props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
      2. // 设定流处理线程数
      复制代码

  • 负载均衡和扩展

    • 增加更多的 Kafka 实例,负载均衡处理任务。

6.4 实战:构建高可用的实时订单处理体系

在实际业务环境中,实时订单处理体系需要处理大量的订单数据,并进行高效可靠的处理。在这部分,我们将团结错误处理、容错机制和性能优化知识,构建一个高可用的实时订单处理体系。
体系设计目的



  • 高可用性:通过冗余和真实状态的恢复能力,包管体系在故障后能够迅速恢复。
  • 高性能:确保体系可以在高并发情况下维持低延迟和高吞吐量。
  • 稳定性:有用处理和制止运行时错误,保障流处理正常运行。
步调详解

1. 界说数据流处理逻辑
我们假定我们的订单流包含订单 ID、用户 ID、订单金额、产物信息等。我们将从订单数据中分析出每个用户的实时消耗情况。
  1. // 创建 Kafka Streams Builder
  2. StreamsBuilder builder = new StreamsBuilder();
  3. // 从 "orders" 主题读取订单流
  4. KStream<String, Order> orders = builder.stream("orders");
  5. // 示例处理:计算每个用户的总消费
  6. KGroupedStream<String, Order> groupedByUser = orders.groupBy((key, order) -> order.getUserId());
  7. KTable<String, Double> totalSpentByUser = groupedByUser
  8.     .aggregate(
  9.         () -> 0.0,
  10.         (key, order, total) -> total + order.getAmount(),
  11.         Materialized.with(Serdes.String(), Serdes.Double())
  12.     );
复制代码
2. 实施错误处理
根据差别的场景设置错误处理逻辑,特别是反序列化错误。在本例中,接纳 LogAndContinueExceptionHandler,确保即便碰到数据题目,也不会影响整体流处理。
  1. Properties props = new Properties();
  2. props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  3.           LogAndContinueExceptionHandler.class.getName());
复制代码
3. 设置容错策略
确保在节点故障时体系能够迅速恢复。设置应用的容错机制,包罗设置 Replica 和 Standby 副本,制止单点故障。
  1. props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
  2. props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
复制代码
4. 性能调优
确保体系能够以高效能运行,即便在订单高峰期。


  • 缓存与批处理:利用适当的缓存和批处理,将数据延迟降到最低。
    1. props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10485760L); // 设定10 MB的缓存
    复制代码
  • 线程设置:设置适当的流处理线程数,以充分利用体系资源。
    1. props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
    复制代码
5. 部署与监控
部署流处理体系时,考虑到实际的生产环境,保举利用 Docker 或 Kubernetes 等工具来管理应用的生命周期。


  • 监控关键指标:利用监控工具如 Prometheus 和 Grafana 实时监控体系的延迟、吞吐量和错误率,尽早发现并办理潜在题目。
    利用 Kafka Streams 内置的 JMX 指标导出器,配合 Prometheus 的 JMX Exporter 收集数据,Grafana 用于可视化展示。
总结
本章介绍了处理实时流处理中常见题目的方法,包罗错误处理、提供容灾步伐以及性能调优。通过公道化的策略和设置,可以大大进步 Kafka Streams 应用程序的稳定性和服从。最后的实战案例展示了如何将这些概念应用于构建高可用的数据处理体系。
7.Kafka Streams 部署与监控

在完成 Kafka Streams 应用的开发后,部署和监控是确保其在生产环境中高效稳定运行的关键步调。本章将介绍如何在差别环境下部署 Kafka Streams 应用,以及如何对其进行监控,以实时发现并办理潜在题目。
7.1 Kafka Streams 部署

概述
Kafka Streams 应用的部署需要考虑运行环境的条件和特点,也需要做好相应的设置以满足性能和稳定性要求。常见的部署方式包罗本地部署、容器化部署(如 Docker)和 Kubernetes 部署。
1. 本地部署
在本地环境下,Kafka Streams 可以通过直接运行 Java 应用程序来部署。这种方式便于开发和调试,但不实用于生产环境。


  • 步调

    • 将 Kafka Streams 应用打包为 JAR 文件。
    • 在运行时附带设置文件,利用 java 命令运行 JAR。

  1. java -jar your-kafka-streams-app.jar --server.port=8080
复制代码


  • 留意事项:确保本地安装的 Kafka 及其相干服务正常运行,并设置好网络和端口。
2. 容器化部署(Docker)
利用 Docker 可以创建 Kafka Streams 应用的轻量级容器,使其具有跨平台的兼容性。


  • 步调

    • 编写 Dockerfile 描述如何构建应用的 Docker 镜像。
      1. FROM openjdk:11-jre
      2. COPY target/your-kafka-streams-app.jar /usr/app/
      3. WORKDIR /usr/app
      4. CMD ["java", "-jar", "your-kafka-streams-app.jar"]
      复制代码
    • 利用 Docker 命令构建镜像并运行容器。
      1. docker build -t kafka-streams-app .
      2. docker run -d -p 8080:8080 kafka-streams-app
      复制代码

  • 留意事项:确保 Kafka 服务的网络设置能被 Docker 容器访问。
3. Kubernetes 部署
Kubernetes 提供了更强大的编排功能,得当在生产环境中管理和扩展 Kafka Streams 应用。


  • 步调

    • 编写 Kubernetes 部署设置文件(YAML)描述应用部署方式。
      1. apiVersion: apps/v1
      2. kind: Deployment
      3. metadata:
      4.   name: kafka-streams-app
      5. spec:
      6.   replicas: 3
      7.   selector:
      8.     matchLabels:
      9.       app: kafka-streams
      10.   template:
      11.     metadata:
      12.       labels:
      13.         app: kafka-streams
      14.     spec:
      15.       containers:
      16.       - name: kafka-streams-app
      17.         image: kafka-streams-app:latest
      18.         ports:
      19.         - containerPort: 8080
      复制代码
    • 利用 kubectl 命令进行部署。
      1. kubectl apply -f kafka-streams-deployment.yaml
      复制代码

  • 留意事项:设置 Kubernetes 集群以确保服务发现和负载均衡。
7.2 Kafka Streams 监控

概述
在生产中监控 Kafka Streams 应用的状态和性能是确保其正常运行的基础。监控涉及到延迟、吞吐量、状态存储等多个指标。
1. 利用内置 JMX 指标
Kafka Streams 支持通过 JMX 输出应用的运行指标。这些指标可以被其他监控体系(如 Prometheus)收集和分析。


  • 设置 Kafka Streams 以启用 JMX
    在应用启动参数中指定 JMX 端口。
    1. java -Dcom.sun.management.jmxremote \
    2.      -Dcom.sun.management.jmxremote.port=9010 \
    3.      -Dcom.sun.management.jmxremote.local.only=false \
    4.      -Dcom.sun.management.jmxremote.authenticate=false \
    5.      -Dcom.sun.management.jmxremote.ssl=false \
    6.      -jar your-kafka-streams-app.jar
    复制代码
  • 常见指标

    • 处理延迟:从吸收到消息随处理完成所需的时间。
    • 吞吐量:单位时间内处理的消息数目。
    • 错误率:处理数据时发生的错误数目。

2. 利用 Prometheus 和 Grafana
Prometheus 可以从 Kafka Streams 收集 JMX 指标,Grafana 则用于将这些指标进行可视化,以便于监控和分析。


  • 集成步调

    • 安装和设置 Prometheus 以抓取 Kafka Streams 应用的 JMX 指标。
    • 在 Grafana 上设置仪表板,通过 Prometheus 数据源展示实时指标。

  • 监控内容

    • 实时监控吞吐量和延迟:实时检测性能瓶颈。
    • 异常告警:设置告警规则,实时通知潜在题目。

总结
本章中,我们具体介绍了 Kafka Streams 应用的部署和监控方法,覆盖了从本地简朴部署到生产级的容器化及 Kubernetes 部署。监控部分强调了通过 JMX 以及 Prometheus 和 Grafana 进行体系运行状态的检测,这些技能是维持 Kafka Streams 应用稳定性的核心。本章所学将帮助你在差别环境下以最佳实践方式管理和监控你的 Kafka Streams 项目。
8.springboot集成kafka 与kafkaStream

1.引入依赖

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6.     <groupId>org.springframework.kafka</groupId>
  7.     <artifactId>spring-kafka</artifactId>
  8.     <exclusions>
  9.         <exclusion>
  10.             <groupId>org.apache.kafka</groupId>
  11.             <artifactId>kafka-clients</artifactId>
  12.         </exclusion>
  13.     </exclusions>
  14. </dependency>
  15. <dependency>
  16.     <groupId>org.apache.kafka</groupId>
  17.     <artifactId>kafka-clients</artifactId>
  18. </dependency>
  19. <dependency>
  20.     <groupId>org.apache.kafka</groupId>
  21.     <artifactId>kafka-streams</artifactId>
  22.     <exclusions>
  23.         <exclusion>
  24.             <artifactId>connect-json</artifactId>
  25.             <groupId>org.apache.kafka</groupId>
  26.         </exclusion>
  27.         <exclusion>
  28.             <groupId>org.apache.kafka</groupId>
  29.             <artifactId>kafka-clients</artifactId>
  30.         </exclusion>
  31.     </exclusions>
  32. </dependency>
  33. <dependency>
  34.     <groupId>org.projectlombok</groupId>
  35.     <artifactId>lombok</artifactId>
  36. </dependency>
  37. <dependency>
  38.     <groupId>org.springframework.boot</groupId>
  39.     <artifactId>spring-boot-starter-test</artifactId>
  40.     <scope>test</scope>
  41. </dependency>
  42. <dependency>
  43.     <groupId>org.springframework.boot</groupId>
  44.     <artifactId>spring-boot-autoconfigure</artifactId>
  45. </dependency>
复制代码
2.application设置文件

  1. server:
  2.   port: 8088
  3. spring:
  4.   application:
  5.     name: spring-kafka
  6.   kafka:
  7.     bootstrap-servers: kafka:9092
  8.     producer:
  9.       retries: 5
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  12.     consumer:
  13.       group-id: ${spring.application.name}-test
  14.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15.       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer  
  16.     properties:
  17.       # 序列化的时候,解决不信任kafka If you believe this class is safe to deserialize
  18.       spring.json.trusted.packages: "*"
  19. kafka:
  20.   hosts: kafka:9092
  21.   group: ${spring.application.name}
复制代码
3.kafka stream的设置需要单独配一下

  1. package com.example.springkafka.config;
  2. import com.example.springkafka.serializer.OrderDeserializer;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import org.apache.kafka.common.serialization.Serdes;
  6. import org.apache.kafka.streams.StreamsConfig;
  7. import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
  8. import org.springframework.boot.context.properties.ConfigurationProperties;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.kafka.annotation.EnableKafkaStreams;
  12. import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
  13. import org.springframework.kafka.config.KafkaStreamsConfiguration;
  14. import org.springframework.kafka.support.serializer.JsonSerde;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. @Setter
  18. @Getter
  19. @Configuration
  20. @EnableKafkaStreams
  21. @ConfigurationProperties(prefix="kafka")
  22. public class KafkaStreamConfig {
  23.     private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
  24.     private String hosts;
  25.     private String group;
  26.     @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  27.     public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
  28.         Map<String, Object> props = new HashMap<>();
  29.         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
  30.         props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
  31.         props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_id");
  32.         props.put(StreamsConfig.RETRIES_CONFIG, 5);
  33.         // 序列化方式
  34.         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  35.         // 反序列化方式
  36.         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
  37.         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
  38.         // 自定义实体时,防止报路径不信任错误
  39.         props.put("spring.json.trusted.packages", "*");
  40.         return new KafkaStreamsConfiguration(props);
  41.     }
  42. }
复制代码
4.消息实体

  1. @NoArgsConstructor
  2. @AllArgsConstructor
  3. @Accessors(chain = true)
  4. @Data
  5. public class Order implements Serializable{
  6.     private String orderId;
  7.     private String userId;
  8.     private String userName;
  9.     private String productId;
  10.     private String productName;
  11.     private Integer amount;
  12.    
  13. }
复制代码
5.自界说消息监听者stream listener,获取topic消息,进行流处理

  1. import com.example.springkafka.entity.Order;
  2. import org.apache.kafka.common.serialization.Serdes;
  3. import org.apache.kafka.streams.StreamsBuilder;
  4. import org.apache.kafka.streams.kstream.KStream;
  5. import org.apache.kafka.streams.kstream.Produced;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.support.serializer.JsonSerde;
  9. @Configuration
  10. public class StreamCountListener {
  11.     @Bean
  12.     public KStream<String, Order> upperCaseStream(StreamsBuilder streamsBuilder){
  13.                
  14.                 // 获取topic的消息
  15.         KStream<String, Order> inputStream = streamsBuilder.stream("order-topic");
  16.         // 进行简单的处理
  17.         // 1.查询获取订单金额大于100的订单数据
  18.         KStream<String, Order> processedStream = inputStream.filter((key, order) ->  order.getAmount() > 100);
  19.         //processedStream.foreach((key, value) -> System.out.println("------result Received message: "+ key +" : "+ value));
  20.         // 将处理后的流写入输出主题
  21.         processedStream.to("data-topic", Produced.with(Serdes.String(), new JsonSerde<>()));
  22.         return inputStream;
  23.     }
  24. }
复制代码
6.kafka消耗者担当消息并打印

  1. import com.example.springkafka.entity.Order;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaConsumer  {
  6.     @KafkaListener(topics = "data-topic", groupId = "spring-kafka-test")
  7.     public void listen(Order msg) {
  8.         System.out.printf("普通A message: %s%n", msg.toString());
  9.     }
  10. }
复制代码
7.发送消息

  1. @Autowired
  2. private KafkaTemplate<String, Order> kafkaTemplate;
  3. @Test
  4. public void testSend2() {
  5.     System.out.println("----------开始发送数据-----------");
  6.     Order order = new Order("1", "1", "张三", "1", "商品1", 200);
  7.     kafkaTemplate.send("order-topic", order);
  8. }
复制代码
8.执行效果


9. 实战项目:综合应用

在前面的章节中,我们已经学习了 Kafka Streams 的基础知识、高级操作、错误处理、容错和监控方法。现在,我们来进行一个综合性实战项目——构建一个用户订单实时分析体系。在这个项目中,你将利用到 Kafka Streams 的多种功能,并体验如何将这些技术团结在一起。
8.1 用户行为实时分析体系

项目目的


  • 实现一个能够实时分析用户订单数据的体系。
  • 解析、过滤并聚合来自用户的订单变乱。
  • 输出分析效果,如用户订单的总金额。
1. 项目布局
我们将设计一个由下列环节组成的数据处理管道:


  • 数据流输入:从 Kafka 主题中读取用户订单数据。
  • 数据处理:通过 Kafka Streams 进行实时分析,包罗数据过滤、转换和聚合。
  • 效果输出:处理效果写入到另一个 Kafka 主题或存储体系,以供后续分析或展示。
2. 数据流设计
假设我们有一个 Kafka Topic "order-topic",该主题中的每条纪录包含用户、订单金额 以实时间戳等字段。我们的目的是统计每个客户的在5分钟之内的订单总金额。
3. 数据处理逻辑
  1. @Bean
  2. public KStream<String, Order> countCustomerOrderStreamSession(StreamsBuilder streamsBuilder){
  3.     // 获取消息
  4.     KStream<String, Order> ordersStream = streamsBuilder.stream("order-topic");
  5.     KStream<String, KeyValue<String, Double>> orderAmountStream = ordersStream
  6.         .mapValues(order -> new KeyValue<>(order.getUserId(), order.getAmount()));
  7.     // 2.使用固定窗口函数统计3分钟之内,每个用户的订单总金额
  8.     KTable<Windowed<String>, Double> aggregate = orderAmountStream
  9.         .groupByKey() // 根据用户ID(key)进行分组
  10.         .windowedBy(SessionWindows.with(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))) // 设置 3 分钟窗口
  11.         .aggregate(
  12.         () -> 0.0, // 初始化值
  13.         (key, orderAmount, aggAmount) -> orderAmount.value + aggAmount, // 聚合逻辑
  14.         (key, agg1, agg2) -> agg1 + agg2, // 合并多个会话
  15.         Materialized.<String, Double, SessionStore<Bytes, byte[]>>as("customer-order-session-store")
  16.         .withKeySerde(Serdes.String())
  17.         .withValueSerde(Serdes.Double()) // 配置序列化器
  18.     );
  19.     // 将聚合结果转换为普通流
  20.     KStream<String, OrderData> map = aggregate.toStream()
  21.         .filter((key, value) -> null != value) // 处理因为会话窗口合并而产生的脏数据
  22.         .map((key, value) -> KeyValue.pair(key.key(), new OrderData(key.key(), value)));
  23.     //map.foreach((key, value) -> System.out.println("windowedBy--------key:"+key+" value:"+value));
  24.     // 发送消息到下游
  25.     map.to("order-count-topic", Produced.with(Serdes.String(), new JsonSerde<>()));
  26.     return ordersStream;
  27. }
复制代码
4. 吸收处理后的数据
  1. @KafkaListener(topics = "order-count-topic", groupId = "order-count-test")
  2. public void consumeData(OrderData orderData) {
  3.     // 每次接收到消息时,会自动打印出用户ID和订单总金额
  4.     log.info("---Consumed Message - User ID: " + orderData.getUserId() + ", Total Amount: " + orderData.getTotalAmount());
  5. }
复制代码


  • 实时更新与持久化:计算每个页面的实时访问量,并可选择持久化效果用于历史数据分析。
  1. 2024-11-20 11:23:07.811  INFO 26000 --- [ntainer#1-0-C-1] c.e.springkafka.listener.KafkaConsumer   : ---Consumed Message - User ID: "1", Total Amount: 45.0
  2. 2024-11-20 11:23:07.811  INFO 26000 --- [ntainer#1-0-C-1] c.e.springkafka.listener.KafkaConsumer   : ---Consumed Message - User ID: "2", Total Amount: 215.0
复制代码
5. 体系部署与监控


  • 部署:可以选择在本地开发环境测试后,利用 Docker 或 Kubernetes 将应用部署到生产环境。
  • 监控:通过 JMX + Prometheus + Grafana 方案监控体系康健状态,例如延迟、处理错误和吞吐量。设置告警可以快速应对题目。
通过本实战项目,你已经实践了如何设计和实现一个用户订单分析体系。从数据洗濯、预处理到数据的统计与展示,每一步都突出了 Kafka Streams 在实时流处理中的强大功能。完成项目后,你不仅对 Kafka Streams 的各个功能有更深入的理解,且能实际应用于办理复杂的数据处理题目。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

三尺非寒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表