ToB企服应用市场:ToB评测及商务社交产业平台

标题: Kafka Stream实战教程 [打印本页]

作者: 三尺非寒    时间: 2024-11-21 20:35
标题: Kafka Stream实战教程
Kafka Stream实战教程

1. Kafka Streams 基础入门

1.1 什么是 Kafka Streams

Kafka Streams 是 Kafka 生态中用于 处理实时流数据 的一款轻量级流处理库。它利用 Kafka 作为数据来源和数据输出,可以让开发者轻松地对实时数据进行处理,好比计数、聚合、过滤等操作。Kafka Streams 的一个显著特点是其设计简洁,帮助我们快速构建和部署实时流处理应用,而不需要复杂的集群管理。
对比传统流处理框架(如 Spark Streaming):传统流处理框架通常需要独立的集群支持,并有较重的计算资源需求。而 Kafka Streams 内置在 Kafka 中,既不需要单独的集群支持,性能上也更轻量,得当需要实时响应的场景,好比在线日记监控、实时订单处理等。
Kafka Streams 的应用场景

1.2 Kafka Streams 核心概念

要理解 Kafka Streams,先相识几个核心概念:

表二元性描述了流和表之间的紧密关系。


kafka文档
1.3 开发环境搭建

搭建 Kafka Streams 开发环境的步调如下:

完成以上步调后,你已经实现了第一个简朴的 Kafka Streams 应用。这个应用读取“input-topic”中的消息,将其内容转换为大写后写入“output-topic”中。
2. Kafka Streams 实现原理

在理解和利用 Kafka Streams 进行流处理之前,深入相识着实现原理可以帮助我们更好地优化应用性能和处理策略。Kafka Streams 作为一个轻量级、分布式的数据处理库,提供了流处理的易用性和强大的实时性。这一节将介绍 Kafka Streams 的实现原理,包罗其架构设计和核心组件。
1. Kafka Streams 架构概述

Kafka Streams 是构建在 Kafka 消息体系之上的一个流处理库,它提供了一些特性,使得其轻易集成到现有的 Kafka 基础办法中进行实时数据流的处理。Kafka Streams 的主要组成部分包罗:

2. 核心组件

1. 流处理拓扑(Topology)
流处理的核心是通过界说流处理拓扑来实现的。拓扑由多个处理节点(Processor)、source 和 sink 组成。每个节点负责执行特定的数据转换逻辑。


kafka stream core-concepts
   Stream Processing Topology

  
  There are two special processors in the topology:
  
  Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.
  Example:
  1. StreamsBuilder builder = new StreamsBuilder();
  2. KStream<String, String> source = builder.stream("input-topic");
  3. // Data processing logic
  4. KStream<String, String> processed = source.filter((key, value) -> value.contains("important"));
  5. processed.to("output-topic");
复制代码
2. 状态存储(State Store)
Kafka Streams 支持有状态流处理,利用状态存储(如 RocksDB)来生存中心效果。每个处理节点都可以维护自己的状态,以便实现如计数、聚合等操作。

3. 时间语义
Kafka Streams 提供了三种时间语义,用于进行窗口化的流分析:

4. 错误处理
通过自界说的异常处理机制(如 DeserializationExceptionHandler),Kafka Streams 能够继续处理别的数据而不影响整体流程。
3. 任务执行

Kafka Streams 将应用程序拓扑根据 Kafka 主题的分区自动分别为多个任务(Task),这些任务可以在多个线程中并行执行。每个 Task 负责处理特定的分区数据,因此从根本上进步了水平扩展能力。

4. 线程与实例


总结

理解 Kafka Streams 的实现原理能够帮助我们更高效地开发和部署实时流应用。通过公道设计流处理拓扑、利用状态存储、制定抗故障策略,以及搭配适当的时间语义,Kafka Streams 能够有用地应对复杂的数据流处理场景。最终,这种深刻的理解可以在体系性能优化和调优中发挥关键作用。
3. Kafka Streams 的基础操作

在完成第一个 Kafka Streams 应用后,我们将进一步相识 Kafka Streams 的基础操作,重点关注一些常用的流数据处理方法,包罗数据过滤、映射、聚合、分组、和窗口操作等。这些操作让我们可以针对差别业务需求进行丰富的流数据转换和处理。

3.1 基础操作方法概览

在 Kafka Streams 中,我们通常会用 KStream 和 KTable 来表示数据流。以下是一些常见的操作方法:


3.2 数据过滤(Filter)

过滤操作答应我们筛选出符合条件的数据。例如,假如只想要某个主题中纪录的特定字段,我们可以利用 filter 方法进行筛选。
示例:假设我们有一个主题 orders,每条纪录包含订单的信息。我们想要过滤出金额大于100的订单:
  1. KStream<String, Order> ordersStream = builder.stream("orders");
  2. // 过滤金额大于100的订单
  3. KStream<String, Order> filteredOrders = ordersStream.filter(
  4.     (key, order) -> order.getAmount() > 100
  5. );
  6. filteredOrders.to("filtered-orders");
复制代码
在此示例中,符合条件的订单将被写入 filtered-orders 主题。

3.3 数据映射(Map 和 MapValues)

映射操作用于修改流中的每条纪录。Kafka Streams 提供了 map 和 mapValues 两种方法:

示例:将每个订单的金额增加10%并保留其他信息:
  1. KStream<String, Order> updatedOrders = ordersStream.mapValues(
  2.     order -> {
  3.         order.setAmount(order.getAmount() * 1.1);
  4.         return order;
  5.     }
  6. );
  7. updatedOrders.to("updated-orders");
复制代码
这里我们用 mapValues 调解了每个订单的金额,更新后的订单数据会被写入 updated-orders 主题。

3.4 数据分组(GroupBy 和 GroupByKey)

分组操作将数据按指定键重新分组,通常用于聚合操作的前一步。分组后的数据会被存储在 KGroupedStream 中,便于后续的聚合操作。

示例:按用户 ID 对订单数据进行分组:
  1. KGroupedStream<String, Order> ordersByUser = ordersStream.groupBy(
  2.     (key, order) -> order.getUserId()
  3. );
复制代码
在这里,我们按用户 ID 重新分组,以便于在接下来的步调中对每个用户的订单进行聚合。

3.5 数据聚合(Count、Reduce 和 Aggregate)

聚合操作用于计算分组数据的汇总信息,如计数、求和等。

示例:计算每个用户的订单总金额
  1. KTable<String, Double> totalAmountPerUser = ordersByUser.aggregate(
  2.     () -> 0.0, // 初始化值
  3.     (userId, order, total) -> total + order.getAmount(),
  4.     Materialized.with(Serdes.String(), Serdes.Double())
  5. );
  6. totalAmountPerUser.toStream().to("total-amount-per-user");
复制代码
这里我们利用 aggregate 方法,按用户 ID 统计每个用户的订单总金额,效果会被写入 total-amount-per-user 主题。

3.6 窗口操作(WindowedBy)

窗口操作用于在时间窗口内对流数据进行分组和聚合,非常得当处理时序数据,例如每小时统计一次销售数据。常用的窗口类型有:

示例:每隔5分钟统计一次订单数目
  1. KTable<Windowed<String>, Long> orderCountByWindow = ordersByUser
  2.     .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
  3.     .count();
  4. orderCountByWindow.toStream().to("order-count-by-window");
复制代码
在这个示例中,我们按5分钟窗口统计每个用户的订单数目,效果会被写入 order-count-by-window 主题。
  1. @Override
  2. public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
  3. final Aggregator<? super K, ? super V, T> aggregator,
  4. final Merger<? super K, T> sessionMerger) {
  5. return aggregate(initializer, aggregator, sessionMerger, NamedInternal.empty());
  6. }
  7. @Override
  8. public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
  9. final Aggregator<? super K, ? super V, VR> aggregator,
  10. final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
  11. return aggregate(initializer, aggregator, NamedInternal.empty(), materialized);
  12. }
复制代码

3.7 实战案例

案例1:订单流数据处理示例

我们将多个操作组合起来,创建一个实际的订单数据处理流程。
需求:对 orders 主题中的订单数据进行以下处理:
代码实现
  1. KStream<String, Order> ordersStream = builder.stream("orders");
  2. // 1. 过滤金额大于100的订单
  3. KStream<String, Order> highValueOrders = ordersStream.filter(
  4.     (key, order) -> order.getAmount() > 100
  5. );
  6. highValueOrders.to("high-value-orders");
  7. // 2. 按用户 ID 分组
  8. KGroupedStream<String, Order> ordersByUser = highValueOrders.groupBy(
  9.     (key, order) -> order.getUserId()
  10. );
  11. // 3. 每小时统计一次订单数量
  12. KTable<Windowed<String>, Long> hourlyOrderCount = ordersByUser
  13.     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
  14.     .count();
  15. // 4. 将统计结果写入主题
  16. hourlyOrderCount.toStream().to("order-count-by-hour");
复制代码

通过以上步调,我们利用 Kafka Streams 的基础操作完成了一个流数据的实时处理任务。
案例 2:销售额实时统计

本案例将带各人相识如何利用 Kafka Streams 实现销售额的实时统计。假设我们有一个主题 sales,每条纪录包含一个订单的销售信息,我们将计算每个商品的实时总销售额和每小时的销售额。

需求分析

我们需要从 sales 主题中读取订单纪录,并进行以下处理:

步调详解

以下是每个步调的具体实现和代码示例。
步调 1:过滤有用订单

我们首先从 sales 主题中读取订单流,并过滤掉销售金额小于或等于0的无效订单纪录。
  1. KStream<String, SaleOrder> salesStream = builder.stream("sales");
  2. // 过滤出有效的销售记录
  3. KStream<String, SaleOrder> validSalesStream = salesStream.filter(
  4.     (key, saleOrder) -> saleOrder.getAmount() > 0
  5. );
复制代码
在这个代码片段中,我们读取 sales 主题中的数据,利用 filter 方法筛选出 amount 大于0的有用销售纪录。

步调 2:按商品 ID 计算总销售额

接下来,我们将按商品 ID 对订单流重新分组,并计算每个商品的总销售额。
  1. KGroupedStream<String, SaleOrder> salesByProduct = validSalesStream.groupBy(
  2.     (key, saleOrder) -> saleOrder.getProductId()
  3. );
  4. KTable<String, Double> totalSalesByProduct = salesByProduct.aggregate(
  5.     () -> 0.0, // 初始化值
  6.     (productId, saleOrder, total) -> total + saleOrder.getAmount(),
  7.     Materialized.with(Serdes.String(), Serdes.Double())
  8. );
  9. totalSalesByProduct.toStream().to("total-sales-by-product");
复制代码
在这段代码中:


步调 3:按小时计算每个商品的销售额

我们为每个商品创建一个滚动窗口,每小时计算一次销售额。这有助于我们按时间区间相识每个商品的销售趋势。
  1. KTable<Windowed<String>, Double> hourlySalesByProduct = salesByProduct
  2.     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
  3.     .aggregate(
  4.         () -> 0.0,
  5.         (productId, saleOrder, total) -> total + saleOrder.getAmount(),
  6.         Materialized.with(Serdes.String(), Serdes.Double())
  7.     );
  8. hourlySalesByProduct.toStream().to("hourly-sales-by-product");
复制代码
在这段代码中:


步调 4:综合输出

将上述两种统计效果分别输出到 total-sales-by-product 和 hourly-sales-by-product 主题中,消耗者可以订阅这两个主题,获取商品的实时销售额及每小时的销售额动态变革。

完整代码示例

将上述步调组合成完整的 Kafka Streams 程序代码如下:
  1. StreamsBuilder builder = new StreamsBuilder();// 1. 从 'sales' 主题读取数据KStream<String, SaleOrder> salesStream = builder.stream("sales");// 2. 过滤有用的销售纪录KStream<String, SaleOrder> validSalesStream = salesStream.filter(    (key, saleOrder) -> saleOrder.getAmount() > 0);// 3. 按商品 ID 计算总销售额KGroupedStream<String, SaleOrder> salesByProduct = validSalesStream.groupBy(    (key, saleOrder) -> saleOrder.getProductId());KTable<String, Double> totalSalesByProduct = salesByProduct.aggregate(    () -> 0.0,    (productId, saleOrder, total) -> total + saleOrder.getAmount(),    Materialized.with(Serdes.String(), Serdes.Double()));totalSalesByProduct.toStream().to("total-sales-by-product");// 4. 按小时计算每个商品的销售额KTable<Windowed<String>, Double> hourlySalesByProduct = salesByProduct
  2.     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
  3.     .aggregate(
  4.         () -> 0.0,
  5.         (productId, saleOrder, total) -> total + saleOrder.getAmount(),
  6.         Materialized.with(Serdes.String(), Serdes.Double())
  7.     );
  8. hourlySalesByProduct.toStream().to("hourly-sales-by-product");
  9. // 启动流处理应用程序KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();
复制代码

总结

通过该案例,我们完成了:

这套流程可广泛用于实时数据分析,帮助业务监控产物销量、把握销售动态等。
4.Kafka Streams 状态管理与持久化

在数据流处理过程中,有时需要维护一些中心状态或纪录,以便进行更复杂的操作。这一章将介绍 Kafka Streams 的状态管理功能,包罗如何利用内置的状态存储,以及如何实现自界说的状态存储。
4.1 状态存储(State Store)

概述
Kafka Streams 提供了本地状态存储的能力,答应我们在进行流处理时纪录和查询中心状态。这是进行高级流计算操作的基础,好比保持当前计数、天生聚合效果等。
内部状态存储的类型
状态存储与拓扑的关系
状态存储紧密集成在 Kafka Streams 的流处理拓扑中,可以在流处理逻辑中随时读取或更新状态。
实践:创建一个状态存储
在 Kafka Streams 程序中利用 store 方法,将状态存储与流处理连接起来:
  1. KStream<String, Long> views = builder.stream("user-views");
  2. KTable<String, Long> viewCounts = views
  3.     .groupByKey()
  4.     .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("view-counts-store"));
  5. // “view-counts-store” 是用于保存当前视图计数的状态存储
复制代码
4.2 定制状态存储

有时,内置的状态存储不能完全满足需求。Kafka Streams 提供了扩展 API,可以实现自界说状态存储。
自界说 State Store
通过实现 StateStore 接口以及创建自界说的 Processor,可以将流处理的状态生存到外部数据库或自界说存储中。
利用 Processor API 进行状态管理
Processor API 提供了低级别的流处理控制能力,答应我们直接操作状态存储,提供了更多灵活性。
实战:实时账户余额监控
设计一个实时账户余额监控体系,每当用户进行消耗或充值时,体系更新用户的账户余额并将其存于状态存储中。
步调:
代码示例
  1. public class BalanceProcessorSupplier implements ProcessorSupplier<String, Long> {
  2.     @Override
  3.     public Processor<String, Long> get() {
  4.         return new BalanceProcessor();
  5.     }
  6. }
  7. public class BalanceProcessor extends AbstractProcessor<String, Long> {
  8.     private KeyValueStore<String, Long> balanceStore;
  9.     @Override
  10.     public void init(ProcessorContext context) {
  11.         super.init(context);
  12.         balanceStore = (KeyValueStore) context.getStateStore("balance-store");
  13.     }
  14.     @Override
  15.     public void process(String accountId, Long amount) {
  16.         Long currentBalance = balanceStore.get(accountId);
  17.         Long updatedBalance = (currentBalance == null ? 0L : currentBalance) + amount;
  18.         balanceStore.put(accountId, updatedBalance);
  19.     }
  20. }
  21. // 示例拓扑结构
  22. Topology topology = new Topology();
  23. topology.addSource("Source", "transactions")
  24.         .addProcessor("Process", new BalanceProcessorSupplier(), "Source")
  25.         .addStateStore(Stores.keyValueStoreBuilder(
  26.                 Stores.persistentKeyValueStore("balance-store"),
  27.                 Serdes.String(),
  28.                 Serdes.Long()), "Process");
  29. KafkaStreams streams = new KafkaStreams(topology, props);
  30. streams.start();
复制代码
总结
通过学习这一章,你应把握 Kafka Streams 中的状态存储功能,包罗如何利用内置存储以及如何进行自界说存储。通过状态存储,流处理程序可以保持中心状态,为更复杂的计算提供支持。在实践中,可以利用状态存储来实现很多实时计算体系的关键功能。
5.Kafka Streams 的高级数据流操作

在进行基本的数据流操作之后,你会发现需要处理更加复杂的数据流场景,好比流的连接、复杂的拓扑界说以及更高级的数据转换。这一章将深入探讨 Kafka Streams 中的高级数据流操作。
5.1 数据连接(Join)操作

概述
连接(Join)操作是数据流处理中非常有用的能力,能够把多个数据流合并在一起,以便从差别来源的信息中获取更丰富的数据关系。Kafka Streams 支持多种类型的 Join,包罗 KStream 和 KTable 之间的差别组合。
差别类型的 Join 操作
时间窗口及其留意事项
Join 操作中的数据通常需要界说一个时间窗口,答应合并操作在流中差别步到达的数据间执行。重要的是选择符合的时间窗口以及处理时间的边界情况。
代码示例:KStream-KStream Join
以下代码示例展示了如何在 Kafka Streams 中进行 KStream-KStream Join 操作:
  1. KStream<String, Order> orders = builder.stream("orders");
  2. KStream<String, Payment> payments = builder.stream("payments");
  3. KStream<String, EnrichedOrder> enrichedOrders = orders.join(
  4.     payments,
  5.     (order, payment) -> new EnrichedOrder(order, payment),
  6.     JoinWindows.of(Duration.ofMinutes(5))
  7. );
  8. // orders 和 payments 流中的数据依据订单id进行连接,JoinWindows 指定了5分钟的时间窗口
复制代码
5.2 数据拓扑(Topology)与 Processor API

流处理拓扑的概念与布局
在 Kafka Streams 中,拓扑(Topology)是一系列有序的处理节点,界说了信息从输入到输出的流经路径。每个拓扑都包含一个或多个处理器节点,节点之间可以通过多个流进行连接。
Processor API 基本操作
Kafka Streams 提供的 Processor API 是一个更底层的 API,答应对流处理任务进行细粒度的可控操作。主要组件包罗:

创建自界说流处理拓扑
通过 Processor API,你可以创建自界说的流处理拓扑,以更灵活地处理流数据。
  1. Topology topology = new Topology();
  2. topology.addSource("Source", "source-topic")
  3.         .addProcessor("Process", MyProcessor::new, "Source")
  4.         .addSink("Sink", "output-topic", "Process");
  5. KafkaStreams streams = new KafkaStreams(topology, props);
  6. streams.start();
复制代码
5.3实战:订单实时分析体系

在当代电子商务平台上,实时订单分析对于理解用户行为和优化业务运作至关重要。本实战项目将带你实现一个通过 Kafka Streams 进行的订单实时分析体系,团结订单流与用户信息,从而实现用户行为的实时洞察。
项目目的


步调详解

1. 数据流准备
在本项目中,假设我们有以下两种数据来源:

2. 界说数据连接
首先,我们需要从 Kafka Topic 中读取订单流和用户信息表。然后,利用 Kafka Streams 的 Join 操作,将两个数据流接洽在一起。
  1. KStream<String, Order> orders = builder.stream("orders");
  2. KTable<String, UserInfo> userInfo = builder.table("user-info");
  3. // 使用用户 ID 作为连接键,将订单流与用户信息表结合
  4. KStream<String, EnrichedOrder> enrichedOrders = orders.join(
  5.     userInfo,
  6.     (order, user) -> new EnrichedOrder(order, user)
  7. );
  8. // enrichedOrders 流现在包含了结合用户信息的完整订单记录
复制代码
3. 设置处理拓扑
在 Kafka Streams 中,我们需要界说数据从输入到输出经过的路径,即所谓的“拓扑布局”。
  1. Topology topology = new Topology();
  2. topology.addSource("OrderSource", "orders")
  3.         .addSource("UserSource", "user-info")
  4.         .addProcessor("JoinProcessor", () -> new JoinProcessor(), "OrderSource", "UserSource")
  5.         .addSink("EnrichedOrderSink", "enriched-orders", "JoinProcessor");
  6. KafkaStreams streams = new KafkaStreams(topology, props);
  7. streams.start();
复制代码
4. 开发自界说处理器
你可能需要更多定制的逻辑以增强流处理。在此项目中,可以编写一个自界说处理器(Processor)来复杂化分析,好比过滤订单、打标签、或变更格式。
  1. public class JoinProcessor extends AbstractProcessor<String, Order> {
  2.     private KeyValueStore<String, UserInfo> userStore;
  3.     @Override
  4.     public void init(ProcessorContext context) {
  5.         super.init(context);
  6.         userStore = (KeyValueStore<String, UserInfo>) context.getStateStore("user-info-store");
  7.     }
  8.     @Override
  9.     public void process(String key, Order order) {
  10.         UserInfo userInfo = userStore.get(order.getUserId());
  11.         if (userInfo != null) {
  12.             EnrichedOrder enrichedOrder = new EnrichedOrder(order, userInfo);
  13.             context().forward(key, enrichedOrder);
  14.         }
  15.     }
  16. }
  17. // 注:此处理器假定 "user-info-store" 是一个存储用户信息的状态存储。
复制代码
5. 部署与监控
完成逻辑开发后,部署 Kafka Streams 应用并设置监控以保障实时数据处理的可靠性。

总结与扩展

经过本实战项目的学习,你已经把握了如何通过 Kafka Streams 实现订单数据流和用户信息表的实时数据加工作业。选择符合的 Join 操作、公道设计拓扑布局,以及灵活运用自界说处理器,可以进步实时分析体系的精确性和服从。
扩展

总结
通过学习这一章,你将把握如何利用 Kafka Streams 进行高级数据流操作。这些技能使你有能力构建复杂的数据流网络,满足现实天下应用场景中对数据处理的高级需求。精确理解和利用 Join 操作和 Processor API,是实现高效流处理体系的关键。
6.错误处理、容错与调试

在构建实时数据处理体系时,错误处理、容错和调试是确保体系稳定性和可靠性的关键。这一章将介绍 Kafka Streams 如那边理错误,如何保障体系的容错能力,并提供调试技巧来帮助开发和维护。
6.1 错误处理

概述
在流处理过程中,可能会碰到各种错误,包罗数据格式错误、网络题目或体系异常。Kafka Streams 提供了多种机制来帮助处理这些错误,以包管流处理程序的健壮性。
错误处理策略
实践中的错误处理
在实现过程中,利用 try-catch 块掩护可能出现题目的处理逻辑,如先辈的解析或网络操作。
  1. KStream<String, String> stream = builder.stream("input-topic");
  2. stream.foreach((key, value) -> {
  3.     try {
  4.         // 业务逻辑
  5.     } catch (Exception e) {
  6.         // 错误处理逻辑
  7.         System.err.println("Error processing record: " + e.getMessage());
  8.     }
  9. });
复制代码
6.2 容错机制

基本原理
Kafka Streams 自带强大的容错能力,包罗自我修复和状态恢复,以确保处理任务的持续运行及数据处理的一致性。
容错策略
示例设置
  1. Properties props = new Properties();
  2. props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); // 设置备用副本数
  3. props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);  // 状态主题的副本因子
复制代码
6.3 性能优化

优化策略
在流数据处理过程中,性能调优是实现高效处理的关键。Kafka Streams 提供的多种设置可以帮助我们实现性能优化。
6.4 实战:构建高可用的实时订单处理体系

在实际业务环境中,实时订单处理体系需要处理大量的订单数据,并进行高效可靠的处理。在这部分,我们将团结错误处理、容错机制和性能优化知识,构建一个高可用的实时订单处理体系。
体系设计目的


步调详解

1. 界说数据流处理逻辑
我们假定我们的订单流包含订单 ID、用户 ID、订单金额、产物信息等。我们将从订单数据中分析出每个用户的实时消耗情况。
  1. // 创建 Kafka Streams Builder
  2. StreamsBuilder builder = new StreamsBuilder();
  3. // 从 "orders" 主题读取订单流
  4. KStream<String, Order> orders = builder.stream("orders");
  5. // 示例处理:计算每个用户的总消费
  6. KGroupedStream<String, Order> groupedByUser = orders.groupBy((key, order) -> order.getUserId());
  7. KTable<String, Double> totalSpentByUser = groupedByUser
  8.     .aggregate(
  9.         () -> 0.0,
  10.         (key, order, total) -> total + order.getAmount(),
  11.         Materialized.with(Serdes.String(), Serdes.Double())
  12.     );
复制代码
2. 实施错误处理
根据差别的场景设置错误处理逻辑,特别是反序列化错误。在本例中,接纳 LogAndContinueExceptionHandler,确保即便碰到数据题目,也不会影响整体流处理。
  1. Properties props = new Properties();
  2. props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  3.           LogAndContinueExceptionHandler.class.getName());
复制代码
3. 设置容错策略
确保在节点故障时体系能够迅速恢复。设置应用的容错机制,包罗设置 Replica 和 Standby 副本,制止单点故障。
  1. props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
  2. props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
复制代码
4. 性能调优
确保体系能够以高效能运行,即便在订单高峰期。

5. 部署与监控
部署流处理体系时,考虑到实际的生产环境,保举利用 Docker 或 Kubernetes 等工具来管理应用的生命周期。

总结
本章介绍了处理实时流处理中常见题目的方法,包罗错误处理、提供容灾步伐以及性能调优。通过公道化的策略和设置,可以大大进步 Kafka Streams 应用程序的稳定性和服从。最后的实战案例展示了如何将这些概念应用于构建高可用的数据处理体系。
7.Kafka Streams 部署与监控

在完成 Kafka Streams 应用的开发后,部署和监控是确保其在生产环境中高效稳定运行的关键步调。本章将介绍如何在差别环境下部署 Kafka Streams 应用,以及如何对其进行监控,以实时发现并办理潜在题目。
7.1 Kafka Streams 部署

概述
Kafka Streams 应用的部署需要考虑运行环境的条件和特点,也需要做好相应的设置以满足性能和稳定性要求。常见的部署方式包罗本地部署、容器化部署(如 Docker)和 Kubernetes 部署。
1. 本地部署
在本地环境下,Kafka Streams 可以通过直接运行 Java 应用程序来部署。这种方式便于开发和调试,但不实用于生产环境。

  1. java -jar your-kafka-streams-app.jar --server.port=8080
复制代码

2. 容器化部署(Docker)
利用 Docker 可以创建 Kafka Streams 应用的轻量级容器,使其具有跨平台的兼容性。

3. Kubernetes 部署
Kubernetes 提供了更强大的编排功能,得当在生产环境中管理和扩展 Kafka Streams 应用。

7.2 Kafka Streams 监控

概述
在生产中监控 Kafka Streams 应用的状态和性能是确保其正常运行的基础。监控涉及到延迟、吞吐量、状态存储等多个指标。
1. 利用内置 JMX 指标
Kafka Streams 支持通过 JMX 输出应用的运行指标。这些指标可以被其他监控体系(如 Prometheus)收集和分析。

2. 利用 Prometheus 和 Grafana
Prometheus 可以从 Kafka Streams 收集 JMX 指标,Grafana 则用于将这些指标进行可视化,以便于监控和分析。

总结
本章中,我们具体介绍了 Kafka Streams 应用的部署和监控方法,覆盖了从本地简朴部署到生产级的容器化及 Kubernetes 部署。监控部分强调了通过 JMX 以及 Prometheus 和 Grafana 进行体系运行状态的检测,这些技能是维持 Kafka Streams 应用稳定性的核心。本章所学将帮助你在差别环境下以最佳实践方式管理和监控你的 Kafka Streams 项目。
8.springboot集成kafka 与kafkaStream

1.引入依赖

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6.     <groupId>org.springframework.kafka</groupId>
  7.     <artifactId>spring-kafka</artifactId>
  8.     <exclusions>
  9.         <exclusion>
  10.             <groupId>org.apache.kafka</groupId>
  11.             <artifactId>kafka-clients</artifactId>
  12.         </exclusion>
  13.     </exclusions>
  14. </dependency>
  15. <dependency>
  16.     <groupId>org.apache.kafka</groupId>
  17.     <artifactId>kafka-clients</artifactId>
  18. </dependency>
  19. <dependency>
  20.     <groupId>org.apache.kafka</groupId>
  21.     <artifactId>kafka-streams</artifactId>
  22.     <exclusions>
  23.         <exclusion>
  24.             <artifactId>connect-json</artifactId>
  25.             <groupId>org.apache.kafka</groupId>
  26.         </exclusion>
  27.         <exclusion>
  28.             <groupId>org.apache.kafka</groupId>
  29.             <artifactId>kafka-clients</artifactId>
  30.         </exclusion>
  31.     </exclusions>
  32. </dependency>
  33. <dependency>
  34.     <groupId>org.projectlombok</groupId>
  35.     <artifactId>lombok</artifactId>
  36. </dependency>
  37. <dependency>
  38.     <groupId>org.springframework.boot</groupId>
  39.     <artifactId>spring-boot-starter-test</artifactId>
  40.     <scope>test</scope>
  41. </dependency>
  42. <dependency>
  43.     <groupId>org.springframework.boot</groupId>
  44.     <artifactId>spring-boot-autoconfigure</artifactId>
  45. </dependency>
复制代码
2.application设置文件

  1. server:
  2.   port: 8088
  3. spring:
  4.   application:
  5.     name: spring-kafka
  6.   kafka:
  7.     bootstrap-servers: kafka:9092
  8.     producer:
  9.       retries: 5
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  12.     consumer:
  13.       group-id: ${spring.application.name}-test
  14.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15.       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer  
  16.     properties:
  17.       # 序列化的时候,解决不信任kafka If you believe this class is safe to deserialize
  18.       spring.json.trusted.packages: "*"
  19. kafka:
  20.   hosts: kafka:9092
  21.   group: ${spring.application.name}
复制代码
3.kafka stream的设置需要单独配一下

  1. package com.example.springkafka.config;
  2. import com.example.springkafka.serializer.OrderDeserializer;
  3. import lombok.Getter;
  4. import lombok.Setter;
  5. import org.apache.kafka.common.serialization.Serdes;
  6. import org.apache.kafka.streams.StreamsConfig;
  7. import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
  8. import org.springframework.boot.context.properties.ConfigurationProperties;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.kafka.annotation.EnableKafkaStreams;
  12. import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
  13. import org.springframework.kafka.config.KafkaStreamsConfiguration;
  14. import org.springframework.kafka.support.serializer.JsonSerde;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. @Setter
  18. @Getter
  19. @Configuration
  20. @EnableKafkaStreams
  21. @ConfigurationProperties(prefix="kafka")
  22. public class KafkaStreamConfig {
  23.     private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
  24.     private String hosts;
  25.     private String group;
  26.     @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  27.     public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
  28.         Map<String, Object> props = new HashMap<>();
  29.         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
  30.         props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
  31.         props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_id");
  32.         props.put(StreamsConfig.RETRIES_CONFIG, 5);
  33.         // 序列化方式
  34.         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  35.         // 反序列化方式
  36.         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
  37.         props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
  38.         // 自定义实体时,防止报路径不信任错误
  39.         props.put("spring.json.trusted.packages", "*");
  40.         return new KafkaStreamsConfiguration(props);
  41.     }
  42. }
复制代码
4.消息实体

  1. @NoArgsConstructor
  2. @AllArgsConstructor
  3. @Accessors(chain = true)
  4. @Data
  5. public class Order implements Serializable{
  6.     private String orderId;
  7.     private String userId;
  8.     private String userName;
  9.     private String productId;
  10.     private String productName;
  11.     private Integer amount;
  12.    
  13. }
复制代码
5.自界说消息监听者stream listener,获取topic消息,进行流处理

  1. import com.example.springkafka.entity.Order;
  2. import org.apache.kafka.common.serialization.Serdes;
  3. import org.apache.kafka.streams.StreamsBuilder;
  4. import org.apache.kafka.streams.kstream.KStream;
  5. import org.apache.kafka.streams.kstream.Produced;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.support.serializer.JsonSerde;
  9. @Configuration
  10. public class StreamCountListener {
  11.     @Bean
  12.     public KStream<String, Order> upperCaseStream(StreamsBuilder streamsBuilder){
  13.                
  14.                 // 获取topic的消息
  15.         KStream<String, Order> inputStream = streamsBuilder.stream("order-topic");
  16.         // 进行简单的处理
  17.         // 1.查询获取订单金额大于100的订单数据
  18.         KStream<String, Order> processedStream = inputStream.filter((key, order) ->  order.getAmount() > 100);
  19.         //processedStream.foreach((key, value) -> System.out.println("------result Received message: "+ key +" : "+ value));
  20.         // 将处理后的流写入输出主题
  21.         processedStream.to("data-topic", Produced.with(Serdes.String(), new JsonSerde<>()));
  22.         return inputStream;
  23.     }
  24. }
复制代码
6.kafka消耗者担当消息并打印

  1. import com.example.springkafka.entity.Order;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaConsumer  {
  6.     @KafkaListener(topics = "data-topic", groupId = "spring-kafka-test")
  7.     public void listen(Order msg) {
  8.         System.out.printf("普通A message: %s%n", msg.toString());
  9.     }
  10. }
复制代码
7.发送消息

  1. @Autowired
  2. private KafkaTemplate<String, Order> kafkaTemplate;
  3. @Test
  4. public void testSend2() {
  5.     System.out.println("----------开始发送数据-----------");
  6.     Order order = new Order("1", "1", "张三", "1", "商品1", 200);
  7.     kafkaTemplate.send("order-topic", order);
  8. }
复制代码
8.执行效果


9. 实战项目:综合应用

在前面的章节中,我们已经学习了 Kafka Streams 的基础知识、高级操作、错误处理、容错和监控方法。现在,我们来进行一个综合性实战项目——构建一个用户订单实时分析体系。在这个项目中,你将利用到 Kafka Streams 的多种功能,并体验如何将这些技术团结在一起。
8.1 用户行为实时分析体系

项目目的

1. 项目布局
我们将设计一个由下列环节组成的数据处理管道:

2. 数据流设计
假设我们有一个 Kafka Topic "order-topic",该主题中的每条纪录包含用户、订单金额 以实时间戳等字段。我们的目的是统计每个客户的在5分钟之内的订单总金额。
3. 数据处理逻辑
  1. @Bean
  2. public KStream<String, Order> countCustomerOrderStreamSession(StreamsBuilder streamsBuilder){
  3.     // 获取消息
  4.     KStream<String, Order> ordersStream = streamsBuilder.stream("order-topic");
  5.     KStream<String, KeyValue<String, Double>> orderAmountStream = ordersStream
  6.         .mapValues(order -> new KeyValue<>(order.getUserId(), order.getAmount()));
  7.     // 2.使用固定窗口函数统计3分钟之内,每个用户的订单总金额
  8.     KTable<Windowed<String>, Double> aggregate = orderAmountStream
  9.         .groupByKey() // 根据用户ID(key)进行分组
  10.         .windowedBy(SessionWindows.with(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))) // 设置 3 分钟窗口
  11.         .aggregate(
  12.         () -> 0.0, // 初始化值
  13.         (key, orderAmount, aggAmount) -> orderAmount.value + aggAmount, // 聚合逻辑
  14.         (key, agg1, agg2) -> agg1 + agg2, // 合并多个会话
  15.         Materialized.<String, Double, SessionStore<Bytes, byte[]>>as("customer-order-session-store")
  16.         .withKeySerde(Serdes.String())
  17.         .withValueSerde(Serdes.Double()) // 配置序列化器
  18.     );
  19.     // 将聚合结果转换为普通流
  20.     KStream<String, OrderData> map = aggregate.toStream()
  21.         .filter((key, value) -> null != value) // 处理因为会话窗口合并而产生的脏数据
  22.         .map((key, value) -> KeyValue.pair(key.key(), new OrderData(key.key(), value)));
  23.     //map.foreach((key, value) -> System.out.println("windowedBy--------key:"+key+" value:"+value));
  24.     // 发送消息到下游
  25.     map.to("order-count-topic", Produced.with(Serdes.String(), new JsonSerde<>()));
  26.     return ordersStream;
  27. }
复制代码
4. 吸收处理后的数据
  1. @KafkaListener(topics = "order-count-topic", groupId = "order-count-test")
  2. public void consumeData(OrderData orderData) {
  3.     // 每次接收到消息时,会自动打印出用户ID和订单总金额
  4.     log.info("---Consumed Message - User ID: " + orderData.getUserId() + ", Total Amount: " + orderData.getTotalAmount());
  5. }
复制代码

  1. 2024-11-20 11:23:07.811  INFO 26000 --- [ntainer#1-0-C-1] c.e.springkafka.listener.KafkaConsumer   : ---Consumed Message - User ID: "1", Total Amount: 45.0
  2. 2024-11-20 11:23:07.811  INFO 26000 --- [ntainer#1-0-C-1] c.e.springkafka.listener.KafkaConsumer   : ---Consumed Message - User ID: "2", Total Amount: 215.0
复制代码
5. 体系部署与监控

通过本实战项目,你已经实践了如何设计和实现一个用户订单分析体系。从数据洗濯、预处理到数据的统计与展示,每一步都突出了 Kafka Streams 在实时流处理中的强大功能。完成项目后,你不仅对 Kafka Streams 的各个功能有更深入的理解,且能实际应用于办理复杂的数据处理题目。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4