一、前沿先容
前面已经大致先容了kafka streams的基本使用了,这里结合一个实际案例来举行训练使用kafka streams。
下面案例是一个阛阓购物的场景,就比如我们去一个购物阛阓购买东西时,在购买的时候。阛阓会记录下来我们这一次消耗的信息,一样平常首先会先把银行卡等信息举行一个加***隐蔽,然后再把信息分别发送发送给需要的topic,如累计积分的,把购买的金额转为积分返回给用户账号;根据购买产品的不同发送给不同的topic。具体如下:
实行流程
- 通过split把用户购买产品的记录存入到不同的分支
- 咖啡写入caffee 处置处罚器
- 电子产品写入electronics处置处罚器
- 把支付的金额以积分的形式传入到pattem处置处罚器
- 把transactionKey作为key,value为原始数据传入到purchase处置处罚器
- 把支付的金额以积分的形式传入到reward处置处罚器
- 再把原始数据全部写到data数据仓库去
使用到的实体类
二、代码实现
1、依靠
和前面类似,重要是kafka的相关依靠
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- </dependency>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.8.20</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.7</version>
- </dependency>
复制代码 2、实体类
2.1、Transaction
写入的购买信息
- @Data
- @Builder
- public class Transaction {
- /**性*/
- private String lastName;
- /**名*/
- private String firstName;
- /**顾客id*/
- private String customerId;
- /**银行卡号*/
- private String creditCardNumber;
- /**商品名称*/
- private String itemPurchased;
- /**店铺名称*/
- private String department;
- /**数量*/
- private Integer quantity;
- /**价格*/
- private Double price;
- /**购买时间*/
- private String purchaseDate;
- /**邮政编码*/
- private String zipCode;
- }
复制代码 2.2、 TransactionKey
用作标记,转为key的
- @Data
- @Builder
- public class TransactionKey {
- private String customerId;
- private String purchaseDate;
- }
复制代码 2.3、TransactionPattern
记录购买信息
- @Data
- @Builder
- public class TransactionPattern {
- private String zipCode;
- private String item;
- private String date;
- private Double amount;
- }
复制代码 2.4、CustomerReward
- @Data
- @Builder
- public class CustomerReward {
- private String customerId;
- private String purchaseTotal;
- private Integer rewardPoints;
- }
复制代码 3、序列化工具类
3.1、序列化
- /**
- * 序列化
- * @param <T>
- */
- public class JsonSerializer<T> implements Serializer<T> {
- private Gson gson= new Gson();
- public void configure(Map<String ,?> map, boolean b) {
- }
- public byte[] serialize(String topic, T t) {
- return gson.toJson(t).getBytes();
- }
- @Override
- public void close() {
- Serializer.super.close();
- }
- }
复制代码 3.2、反序列化
- /**
- * 反序列化
- * @param <T>
- */
- public class JsonDeserializer<T> implements Deserializer<T> {
- private Gson gson= new Gson();
- private Class<T> deserializeClass;
- public JsonDeserializer(Class<T> deserializeClass){
- this.deserializeClass=deserializeClass;
- }
- public JsonDeserializer(){
- }
- @Override
- @SuppressWarnings("unchecked")
- public void configure(Map<String,?> map, boolean b){
- if (deserializeClass == null){
- deserializeClass = (Class<T>) map.get("serializedClass");
- }
- }
- @Override
- public T deserialize(String topic, byte[] data) {
- if (data == null){
- return null;
- }
- return gson.fromJson(new String(data),deserializeClass);
- }
- @Override
- public void close() {
- }
- }
复制代码 3.3、Serde仓库
用做直接通过调用实现Serde使用json序列化转换,也可以参考Serdes方法实现
- /**
- * 序列化和反序列化
- */
- public class JsonSerdes {
- /**获取Serde*/
- public static TransactionPatternWrapSerde TransactionPattern() {
- return new TransactionPatternWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionPattern.class));
- }
- public static TransactionKeyWrapSerde TransactionKey() {
- return new TransactionKeyWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionKey.class));
- }
- public static CustomerRewardWrapSerde CustomerReward() {
- return new CustomerRewardWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(CustomerReward.class));
- }
- public static TransactionWrapSerde Transaction() {
- return new TransactionWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(Transaction.class));
- }
- /**创建Serde*/
- private final static class TransactionPatternWrapSerde extends WrapSerde<TransactionPattern>{
- public TransactionPatternWrapSerde(Serializer<TransactionPattern> serializer, Deserializer<TransactionPattern> deserializer) {
- super(serializer, deserializer);
- }
- }
- private final static class TransactionKeyWrapSerde extends WrapSerde<TransactionKey>{
- public TransactionKeyWrapSerde(Serializer<TransactionKey> serializer, Deserializer<TransactionKey> deserializer) {
- super(serializer, deserializer);
- }
- }
- private final static class CustomerRewardWrapSerde extends WrapSerde<CustomerReward>{
- public CustomerRewardWrapSerde(Serializer<CustomerReward> serializer, Deserializer<CustomerReward> deserializer) {
- super(serializer, deserializer);
- }
- }
- private final static class TransactionWrapSerde extends WrapSerde<Transaction>{
- public TransactionWrapSerde(Serializer<Transaction> serializer, Deserializer<Transaction> deserializer) {
- super(serializer, deserializer);
- }
- }
- /** WrapSerde父类*/
- private static class WrapSerde<T> implements Serde<T>{
- private final Serializer<T> serializer;
- private final Deserializer<T> deserializer;
- public WrapSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
- this.serializer = serializer;
- this.deserializer = deserializer;
- }
- @Override
- public Serializer<T> serializer() {
- return serializer;
- }
- @Override
- public Deserializer<T> deserializer() {
- return deserializer;
- }
- }
- }
复制代码 4、具体streams实现
使用上比力简单,重要是通过前面学的方法举行实现不同的处置处罚器转换数据,然后在发送到不同的topic中去,编写好之后,我们需要创建需要使用到的topic
- @Slf4j
- public class ShoppingStreams {
- private static final String BOOTSTRAP_SERVERS = "localhost:9092";
- private static final String APPLICATION_ID = "shopping-streams";
- private static final String SELL_TRANSACTION_SOURCE_TOPIC = "sell.transaction";
- private static final String SELL_TRANSACTION_PATTERN_TOPIC = "sell.pattern.transaction";
- private static final String SELL_TRANSACTION_REWARDS_TOPIC = "sell.rewards.transaction";
- private static final String SELL_TRANSACTION_COFFEE_TOPIC = "sell.coffee.transaction";
- private static final String SELL_TRANSACTION_ELECT_TOPIC = "sell.elect.transaction";
- private static final String SELL_TRANSACTION_PURCHASE_TOPIC = "sell.purchase.transaction";
- public static void main(String[] args) throws InterruptedException {
- Properties properties = new Properties();
- properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
- properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");
- StreamsConfig streamsConfig = new StreamsConfig(properties);
- Serde<String> stringSerde = Serdes.String();
- StreamsBuilder builder = new StreamsBuilder();
- // 1、到topic中去读取数据
- KStream<String, Transaction> k0 = builder
- .stream(SELL_TRANSACTION_SOURCE_TOPIC, Consumed.with(stringSerde, JsonSerdes.Transaction())
- .withName("transaction-source")
- .withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST)); //指定偏移重置策略。当没有初始偏移量或偏移量超出范围时,消费将从最新的记录开始。
- // 2、把原始数据进行加密
- KStream<String, Transaction> k1 = k0.peek((k, v) -> log.info("k:{},v:{}", k, v))
- .mapValues(v -> {
- String encryption = v.getCreditCardNumber().replaceAll("(?<=^.{4}).*", "****");
- v.setCreditCardNumber(encryption);
- return v;
- }, Named.as("pattern-sink"));
- // 2、记录商品购买
- k1.mapValues(v -> TransactionPattern.builder()
- .zipCode(v.getZipCode())
- .item(v.getItemPurchased())
- .date(v.getPurchaseDate().toString())
- .amount(v.getPrice())
- .build()
- , Named.as("transaction-pattern"))
- .to(SELL_TRANSACTION_PATTERN_TOPIC, Produced.with(stringSerde, JsonSerdes.TransactionPattern()));
- // 3、奖励用户积分
- k1.mapValues(v -> CustomerReward.builder()
- .customerId(v.getCustomerId())
- .purchaseTotal(v.getItemPurchased())
- .rewardPoints(v.getPrice().intValue())
- .build()
- , Named.as("transaction-rewards"))
- .to(SELL_TRANSACTION_REWARDS_TOPIC, Produced.with(stringSerde, JsonSerdes.CustomerReward()));
- // 4、把消费金额大于5的记录下来(标注为key,发送出去)
- k1.filter((k, v) -> v.getPrice() > 5)
- .selectKey((k, v) -> TransactionKey.builder()
- .customerId(v.getCustomerId())
- .purchaseDate(v.getPurchaseDate())
- .build()
- , Named.as("transaction-purchase"))
- .to(SELL_TRANSACTION_PURCHASE_TOPIC, Produced.with(JsonSerdes.TransactionKey(), JsonSerdes.Transaction()));
- // 5、把购买的商品根据类型分别发送到不同的topic中
- k1.split(Named.as("branch-"))
- .branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("coffee")
- , Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_COFFEE_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
- .branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("elect")
- , Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_ELECT_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
- .noDefaultBranch();
- // 模拟把数据全部写入到数据仓库
- k1.print(Printed.<String,Transaction>toSysOut().withName("DW"));
- k1.foreach((k, v) -> log.info("数据存入数据仓库=========>,k:{},v:{}", k, v));
- KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
- CountDownLatch latch = new CountDownLatch(1);
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- kafkaStreams.close();
- latch.countDown();
- log.info("The Kafka Streams 执行关闭!");
- }));
- kafkaStreams.start();
- log.info("kafka streams 启动成功!>>>>");
- latch.await();
- }
- }
复制代码 5、其他测试使用
5.1、生产者
创建一个生产消息的类,往topic发送消息
- /**
- * 生产购物消息到kafka
- */
- @Slf4j
- public class ShoppingProducer {
- private final static String TOPIC_NAME = "sell.transaction";
- private static final String BOOTSTRAP_SERVERS = "localhost:9092";
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties props = new Properties();
- // 设置参数
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
- // 设置序列化
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- // 连接客户端
- KafkaProducer<String, String> producer = new KafkaProducer<>(props);
- // 发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)
- for (int i = 0; i < 5; i++) {
- Transaction transaction = Transaction.builder()
- .customerId("011223")
- .itemPurchased("elect")
- .quantity(i)
- .zipCode("10100")
- .firstName("李")
- .lastName("四")
- .price(i * 100.0)
- .purchaseDate(new Date().toString())
- .creditCardNumber("4322-1223-1123-000" + i)
- .department("体育西路35号")
- .build();
- String json = new JSONObject(transaction).toString();
- // 默认partition数量和Broker创建的数量一致
- ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);
- // 同步
- send(producer,producerRecord);
- }
- }
- /**
- * @param producer: 客户端对象
- * @return void
- * 同步发送
- * @date 2024/3/22 17:09
- */
- private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {
- // 等待发送成功的阻塞方法
- RecordMetadata metadata = producer.send(producerRecord).get();
- log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()
- +"=====offset:"+metadata.offset());
- }
- }
复制代码 5.2、日志文件
由于kafka一直会刷日志,以是需要有一个日志文件屏蔽debug类型的日志输出
在resources路径下创建一个logback.xml文件
- <configuration scon="true" scanPeriod="10 seconds">
- <include resource="org/springframework/boot/logging/logback/base.xml"/>
- <!-- 屏蔽kafka debug -->
- <logger name="org.apache.kafka.clients" level="ERROR" />
- </configuration>
复制代码 6、创建topic
首先需要有自己的kafka,如何创建可以看我前面的文章 |