ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【2024】kafka streams结合案例分析举行实际项目开辟使用(3) [打印本页]

作者: 不到断气不罢休    时间: 2024-10-24 07:07
标题: 【2024】kafka streams结合案例分析举行实际项目开辟使用(3)
一、前沿先容

前面已经大致先容了kafka streams的基本使用了,这里结合一个实际案例来举行训练使用kafka streams。
下面案例是一个阛阓购物的场景,就比如我们去一个购物阛阓购买东西时,在购买的时候。阛阓会记录下来我们这一次消耗的信息,一样平常首先会先把银行卡等信息举行一个加***隐蔽,然后再把信息分别发送发送给需要的topic,如累计积分的,把购买的金额转为积分返回给用户账号;根据购买产品的不同发送给不同的topic。具体如下:
   实行流程
    

使用到的实体类

二、代码实现

1、依靠

和前面类似,重要是kafka的相关依靠
  1.     <dependency>
  2.       <groupId>org.apache.kafka</groupId>
  3.       <artifactId>kafka-clients</artifactId>
  4.     </dependency>
  5.     <dependency>
  6.       <groupId>org.apache.kafka</groupId>
  7.       <artifactId>kafka-streams</artifactId>
  8.     </dependency>
  9.         <dependency>
  10.       <groupId>cn.hutool</groupId>
  11.       <artifactId>hutool-all</artifactId>
  12.       <version>5.8.20</version>
  13.     </dependency>
  14.     <dependency>
  15.       <groupId>org.projectlombok</groupId>
  16.       <artifactId>lombok</artifactId>
  17.       <optional>true</optional>
  18.     </dependency>
  19.         <dependency>
  20.       <groupId>com.google.code.gson</groupId>
  21.       <artifactId>gson</artifactId>
  22.       <version>2.8.7</version>
  23.     </dependency>
复制代码
2、实体类

2.1、Transaction

写入的购买信息
  1. @Data
  2. @Builder
  3. public class Transaction {
  4.     /**性*/
  5.     private String lastName;
  6.     /**名*/
  7.     private String firstName;
  8.     /**顾客id*/
  9.     private String customerId;
  10.     /**银行卡号*/
  11.     private String creditCardNumber;
  12.     /**商品名称*/
  13.     private String itemPurchased;
  14.     /**店铺名称*/
  15.     private String department;
  16.     /**数量*/
  17.     private Integer quantity;
  18.     /**价格*/
  19.     private Double price;
  20.     /**购买时间*/
  21.     private String purchaseDate;
  22.     /**邮政编码*/
  23.     private String zipCode;
  24. }
复制代码
2.2、 TransactionKey

用作标记,转为key的
  1. @Data
  2. @Builder
  3. public class TransactionKey {
  4.     private String customerId;
  5.     private String purchaseDate;
  6. }
复制代码
2.3、TransactionPattern

记录购买信息
  1. @Data
  2. @Builder
  3. public class TransactionPattern {
  4.     private String zipCode;
  5.     private String item;
  6.     private String date;
  7.     private Double amount;
  8. }
复制代码
2.4、CustomerReward

  1. @Data
  2. @Builder
  3. public class CustomerReward {
  4.     private String customerId;
  5.     private String purchaseTotal;
  6.     private Integer rewardPoints;
  7. }
复制代码
3、序列化工具类

3.1、序列化

  1. /**
  2. * 序列化
  3. * @param <T>
  4. */
  5. public class JsonSerializer<T> implements Serializer<T> {
  6.     private Gson gson=  new Gson();
  7.     public void configure(Map<String ,?> map, boolean b) {
  8.     }
  9.     public byte[] serialize(String topic, T t) {
  10.         return gson.toJson(t).getBytes();
  11.     }
  12.     @Override
  13.     public void close() {
  14.         Serializer.super.close();
  15.     }
  16. }
复制代码
3.2、反序列化

  1. /**
  2. * 反序列化
  3. * @param <T>
  4. */
  5. public class JsonDeserializer<T> implements Deserializer<T> {
  6.     private Gson gson=  new Gson();
  7.     private Class<T> deserializeClass;
  8.     public JsonDeserializer(Class<T> deserializeClass){
  9.         this.deserializeClass=deserializeClass;
  10.     }
  11.     public JsonDeserializer(){
  12.     }
  13.     @Override
  14.     @SuppressWarnings("unchecked")
  15.     public void configure(Map<String,?> map, boolean b){
  16.         if (deserializeClass == null){
  17.             deserializeClass = (Class<T>) map.get("serializedClass");
  18.         }
  19.     }
  20.     @Override
  21.     public T deserialize(String topic, byte[] data) {
  22.         if (data == null){
  23.             return null;
  24.         }
  25.         return gson.fromJson(new String(data),deserializeClass);
  26.     }
  27.     @Override
  28.     public void close() {
  29.     }
  30. }
复制代码
3.3、Serde仓库

用做直接通过调用实现Serde使用json序列化转换,也可以参考Serdes方法实现
  1. /**
  2. * 序列化和反序列化
  3. */
  4. public class JsonSerdes {
  5.     /**获取Serde*/
  6.     public static TransactionPatternWrapSerde TransactionPattern() {
  7.         return new TransactionPatternWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionPattern.class));
  8.     }
  9.     public static TransactionKeyWrapSerde TransactionKey() {
  10.         return new TransactionKeyWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionKey.class));
  11.     }
  12.     public static CustomerRewardWrapSerde CustomerReward() {
  13.         return new CustomerRewardWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(CustomerReward.class));
  14.     }
  15.     public static TransactionWrapSerde Transaction() {
  16.         return new TransactionWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(Transaction.class));
  17.     }
  18.     /**创建Serde*/
  19.     private final static class TransactionPatternWrapSerde extends WrapSerde<TransactionPattern>{
  20.         public TransactionPatternWrapSerde(Serializer<TransactionPattern> serializer, Deserializer<TransactionPattern> deserializer) {
  21.             super(serializer, deserializer);
  22.         }
  23.     }
  24.     private final static class TransactionKeyWrapSerde extends WrapSerde<TransactionKey>{
  25.         public TransactionKeyWrapSerde(Serializer<TransactionKey> serializer, Deserializer<TransactionKey> deserializer) {
  26.             super(serializer, deserializer);
  27.         }
  28.     }
  29.     private final static class CustomerRewardWrapSerde extends WrapSerde<CustomerReward>{
  30.         public CustomerRewardWrapSerde(Serializer<CustomerReward> serializer, Deserializer<CustomerReward> deserializer) {
  31.             super(serializer, deserializer);
  32.         }
  33.     }
  34.     private final static class TransactionWrapSerde extends WrapSerde<Transaction>{
  35.         public TransactionWrapSerde(Serializer<Transaction> serializer, Deserializer<Transaction> deserializer) {
  36.             super(serializer, deserializer);
  37.         }
  38.     }
  39.     /** WrapSerde父类*/
  40.     private static class WrapSerde<T> implements Serde<T>{
  41.         private final Serializer<T> serializer;
  42.         private final Deserializer<T> deserializer;
  43.         public WrapSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
  44.             this.serializer = serializer;
  45.             this.deserializer = deserializer;
  46.         }
  47.         @Override
  48.         public Serializer<T> serializer() {
  49.             return serializer;
  50.         }
  51.         @Override
  52.         public Deserializer<T> deserializer() {
  53.             return deserializer;
  54.         }
  55.     }
  56. }
复制代码
4、具体streams实现

使用上比力简单,重要是通过前面学的方法举行实现不同的处置处罚器转换数据,然后在发送到不同的topic中去,编写好之后,我们需要创建需要使用到的topic
  1. @Slf4j
  2. public class ShoppingStreams {
  3.     private static final String BOOTSTRAP_SERVERS = "localhost:9092";
  4.     private static final String APPLICATION_ID = "shopping-streams";
  5.     private static final String SELL_TRANSACTION_SOURCE_TOPIC = "sell.transaction";
  6.     private static final String SELL_TRANSACTION_PATTERN_TOPIC = "sell.pattern.transaction";
  7.     private static final String SELL_TRANSACTION_REWARDS_TOPIC = "sell.rewards.transaction";
  8.     private static final String SELL_TRANSACTION_COFFEE_TOPIC = "sell.coffee.transaction";
  9.     private static final String SELL_TRANSACTION_ELECT_TOPIC = "sell.elect.transaction";
  10.     private static final String SELL_TRANSACTION_PURCHASE_TOPIC = "sell.purchase.transaction";
  11.     public static void main(String[] args) throws InterruptedException {
  12.         Properties properties = new Properties();
  13.         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
  14.         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  15.         properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");
  16.         StreamsConfig streamsConfig = new StreamsConfig(properties);
  17.         Serde<String> stringSerde = Serdes.String();
  18.         StreamsBuilder builder = new StreamsBuilder();
  19. //        1、到topic中去读取数据
  20.         KStream<String, Transaction> k0 = builder
  21.                 .stream(SELL_TRANSACTION_SOURCE_TOPIC, Consumed.with(stringSerde, JsonSerdes.Transaction())
  22.                         .withName("transaction-source")
  23.                         .withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST)); //指定偏移重置策略。当没有初始偏移量或偏移量超出范围时,消费将从最新的记录开始。
  24. //       2、把原始数据进行加密
  25.         KStream<String, Transaction> k1 = k0.peek((k, v) -> log.info("k:{},v:{}", k, v))
  26.                 .mapValues(v -> {
  27.                     String encryption = v.getCreditCardNumber().replaceAll("(?<=^.{4}).*", "****");
  28.                     v.setCreditCardNumber(encryption);
  29.                     return v;
  30.                 }, Named.as("pattern-sink"));
  31. //         2、记录商品购买
  32.         k1.mapValues(v -> TransactionPattern.builder()
  33.                                         .zipCode(v.getZipCode())
  34.                                         .item(v.getItemPurchased())
  35.                                         .date(v.getPurchaseDate().toString())
  36.                                         .amount(v.getPrice())
  37.                                         .build()
  38.                         , Named.as("transaction-pattern"))
  39.                 .to(SELL_TRANSACTION_PATTERN_TOPIC, Produced.with(stringSerde, JsonSerdes.TransactionPattern()));
  40. //        3、奖励用户积分
  41.         k1.mapValues(v -> CustomerReward.builder()
  42.                                 .customerId(v.getCustomerId())
  43.                                 .purchaseTotal(v.getItemPurchased())
  44.                                 .rewardPoints(v.getPrice().intValue())
  45.                                 .build()
  46.                     , Named.as("transaction-rewards"))
  47.                 .to(SELL_TRANSACTION_REWARDS_TOPIC, Produced.with(stringSerde, JsonSerdes.CustomerReward()));
  48. //        4、把消费金额大于5的记录下来(标注为key,发送出去)
  49.         k1.filter((k, v) -> v.getPrice() > 5)
  50.                 .selectKey((k, v) -> TransactionKey.builder()
  51.                                         .customerId(v.getCustomerId())
  52.                                         .purchaseDate(v.getPurchaseDate())
  53.                                         .build()
  54.                     , Named.as("transaction-purchase"))
  55.                 .to(SELL_TRANSACTION_PURCHASE_TOPIC, Produced.with(JsonSerdes.TransactionKey(), JsonSerdes.Transaction()));
  56. //        5、把购买的商品根据类型分别发送到不同的topic中
  57.         k1.split(Named.as("branch-"))
  58.                 .branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("coffee")
  59.                         , Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_COFFEE_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
  60.                 .branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("elect")
  61.                         , Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_ELECT_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
  62.                 .noDefaultBranch();
  63. //        模拟把数据全部写入到数据仓库
  64.         k1.print(Printed.<String,Transaction>toSysOut().withName("DW"));
  65.         k1.foreach((k, v) -> log.info("数据存入数据仓库=========>,k:{},v:{}", k, v));
  66.         KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
  67.         CountDownLatch latch = new CountDownLatch(1);
  68.         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  69.             kafkaStreams.close();
  70.             latch.countDown();
  71.             log.info("The Kafka Streams 执行关闭!");
  72.         }));
  73.         kafkaStreams.start();
  74.         log.info("kafka streams 启动成功!>>>>");
  75.         latch.await();
  76.     }
  77. }
复制代码
5、其他测试使用

5.1、生产者

创建一个生产消息的类,往topic发送消息
  1. /**
  2. * 生产购物消息到kafka
  3. */
  4. @Slf4j
  5. public class ShoppingProducer {
  6.     private final static String TOPIC_NAME = "sell.transaction";
  7.     private static final String BOOTSTRAP_SERVERS = "localhost:9092";
  8.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  9.         Properties props = new Properties();
  10. //        设置参数
  11.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
  12. //        设置序列化
  13.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15. //        连接客户端
  16.         KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  17.         //        发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)
  18.         for (int i = 0; i < 5; i++) {
  19.             Transaction transaction = Transaction.builder()
  20.                     .customerId("011223")
  21.                     .itemPurchased("elect")
  22.                     .quantity(i)
  23.                     .zipCode("10100")
  24.                     .firstName("李")
  25.                     .lastName("四")
  26.                     .price(i * 100.0)
  27.                     .purchaseDate(new Date().toString())
  28.                     .creditCardNumber("4322-1223-1123-000" + i)
  29.                     .department("体育西路35号")
  30.                     .build();
  31.             String json = new JSONObject(transaction).toString();
  32. //          默认partition数量和Broker创建的数量一致
  33.             ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);
  34. //        同步
  35.             send(producer,producerRecord);
  36.         }
  37.     }
  38.     /**
  39.      * @param producer: 客户端对象
  40.      * @return void
  41.      * 同步发送
  42.      * @date 2024/3/22 17:09
  43.      */
  44.     private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {
  45. //          等待发送成功的阻塞方法
  46.         RecordMetadata metadata = producer.send(producerRecord).get();
  47.         log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()
  48.                 +"=====offset:"+metadata.offset());
  49.     }
  50. }
复制代码
5.2、日志文件

由于kafka一直会刷日志,以是需要有一个日志文件屏蔽debug类型的日志输出
在resources路径下创建一个logback.xml文件
  1. <configuration scon="true" scanPeriod="10 seconds">
  2.     <include resource="org/springframework/boot/logging/logback/base.xml"/>
  3.     <!-- 屏蔽kafka debug -->
  4.     <logger name="org.apache.kafka.clients" level="ERROR" />
  5. </configuration>
复制代码
6、创建topic

首先需要有自己的kafka,如何创建可以看我前面的文章




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4