拉不拉稀肚拉稀 发表于 2024-11-6 18:09:29

Kafka 之序次消息

媒介:
在分布式消息系统中,消息的序次性是一个重要的问题,也是一个常见的业务场景,那 Kafka 作为一个高性能的分布式消息中间件,又是如何实现序次消息的呢?本篇我们将对 Kafka 的序次消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及焦点概念解说
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送斲丧
Kafka 之消息广播斲丧
Kafka 之消息并发斲丧
序次消息的使用场景
序次消息的使用场景众多,这里我简单列举几个如下:


[*]即时消息中的单对单聊天和群聊,包管发送方消息发送序次与接收方的序次同等。
[*]电商中下单后,订单创建、付出、订单发货和物流更新的序次性。
[*]手机充值过程中的扣款短信和重置成功的短信应该有序次性。
[*]。。。。等等等场景。
Kafka 如何包管消息的序次性
讨论 Kafka 消息的序次性,需要分单分区和多分区来讨论,具体如下:


[*]单分区:单分区的消息序次性相对简单,因为消息在单分区中是相对有序的,只需要包管消息发送序次和斲丧序次即可。
[*]多分区:多分区要包管消息有序,就需要额外的计划来包管消息全局有序了。
根据上面的简单分析,我们知道 Kafka 单分区的消息有序相对简单,接下来我们分析一下 Kafka 如何包管单分区消息有序。
Kafka 如何包管单分区消息有序
Kafka 包管单分区消息有序需要从两个方面来讲,一个是消息生产者,一个是消息斲丧者,具体如下:
消息生产者:


[*]使用相同的分区键(Partition Key):生产者发送消息时,指定相同的分区键,使得所有消息都发送到同一个分区。
[*]指定消息 key,如果没有指定分区,我们指定一个相同的消息 Key,Kafka 会根据 Key 举行 Hash 盘算出一个分区号,如果消息的 Key 相同,那么也管帐算一个相同的分区号,消息也会发送到同一个分区了。
[*]自定义分区器:如果想要实现更复杂的分区逻辑,可以实现自定义分区器,来达到消息最终到达同一个分区。
消息斲丧者:
生产这已经包管了斲丧的发送有序,因此消息斲丧者使用单线程斲丧即可。
Kafka 序次消息实现案例

上面我们对 Kafka 序次消息的实现做了基天职析,下面我们就使用代码来实现 Kafka 的序次消息。
Kafka 序次消息 Producer
在 Producer 中分别实现了两种序次消息的方式,分别是指定分区和指定 Key,具体代码如下:
package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutionException;

/**
* @ClassName: MyKafkaOrderlyProducer
* @Author: Author
* @Date: 2024/10/22 19:22
* @Description: 顺序消息发送者
*/
@Slf4j
@Component
public class MyKafkaOrderlyProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    //指定分区
    public void sendOrderlyByPartitionMessage() {
      try {
            this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666创建").get();
            this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666支付").get();
            this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666发货").get();
      } catch (InterruptedException e) {
            e.printStackTrace();
      } catch (ExecutionException e) {
            e.printStackTrace();
      }
    }

    //指定 key
    public void sendOrderlyByKeyMessage() {
      try {
            this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();
            this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();
            this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();
      } catch (InterruptedException e) {
            e.printStackTrace();
      } catch (ExecutionException e) {
            e.printStackTrace();
      }
    }

}
在 Producer 代码中我们使用了 Kafka 的同步发送消息。
Kafka 序次消息 Consumer
序次消息的斲丧者代码十分简单,照旧使用 @KafkaListener 完成消息斲丧,注意是单线程斲丧即可。
package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* @ClassName: MyKafkaConsumer
* @Author: zhangyong
* @Date: 2024/10/22 19:22
* @Description: MyKafkaOrderlyConsumer
*/
@Slf4j
@Component
public class MyKafkaOrderlyConsumer {

    @KafkaListener(id = "my-kafka-order-consumer",
            groupId = "my-kafka-consumer-groupId",
            topics = "my-topic",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
      log.info("消息消费成功消息内容:{}", message);
    }

}
Kafka 序次消息发送斲丧验证
验证指定分区情况下的序次消息:
2024-10-28 20:55:18.495INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Partition--订单666创建
2024-10-28 20:55:18.599INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Partition--订单666支付
2024-10-28 20:55:18.704INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Partition--订单666发货
消息是按照发送序次来斲丧的,结果符合预期。
验证指定 Key 情况下的序次消息:
2024-10-28 20:56:13.238INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Key--订单666创建
2024-10-28 20:56:13.341INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Key--订单666支付
2024-10-28 20:56:13.443INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Key--订单666发货
消息是按照发送序次来斲丧的,结果符合预期。
Kafka 自定义分区器
自定义分区器就是按本身的规则来指定消息最终要发送的分区,可以根据本身的需求灵活实现,案例代码中先获取分区数目,然后使用的是 key 的 Hash 值举行 Hash 取模的方式获取分区,具体代码如下:
package com.order.service.kafka;

import com.order.service.exception.BusinessException;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

/**
* @ClassName: CustomPartitioner
* @Author: Author
* @Date: 2024/10/28 20:57
* @Description:
*/
public class CustomPartitioner implements Partitioner {


    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
      //获取 分区数量
      List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);

         if (key == null || keyBytes == null && !(key instanceof String)) {
            throw new BusinessException("key 不能为空且需要是字符串类型");
      }
      String keyStr = key.toString();
      int partition = keyStr.hashCode() % partitionInfos.size();
      return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
配置自定义分区器
自定义了分区器后还需要再 Kafka 配置中配置上我们自定义的分区器,关键配置如下:
//自定义分区器配置
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
完整的配置 KafkaProducerConfig 配置如下:
package com.order.service.config;

import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* @author :author
* @description:
* @modified By:
* @version: V1.0
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Value("${spring.kafka.producer.properties.linger.ms}")
    private String lingerMs;

    @Bean("myProducerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
      Map<String, Object> props = new HashMap<>(10);
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      //批量发送消息的大小 默认 16KB
      props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
      //生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数默认 32M
      props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
      //批量发送的的最大时间间隔,单位是毫秒
      props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
      //自定义分区器配置
      props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
      return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
      return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
      return new KafkaTemplate<>(newProducerFactory());
    }



}

自定义分区 Consumer 代码案例
自定义分区 Consumer 代码没有什么特殊之处,指定一个 key 即可,key 同等就可以包管消息发送到同一个 Partition 中,包管消息的序次,具体代码如下:
//自定义分区发送消息
public void sendOrderlyByCustomPartitionerMessage() {
        try {
                this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();
                this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();
                this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();
        } catch (InterruptedException e) {
                e.printStackTrace();
        } catch (ExecutionException e) {
                e.printStackTrace();
        }
}
自定义分区序次消息验证
触发消息发送后 debugger 如下:
https://i-blog.csdnimg.cn/direct/1fb03dbdec0b4b4bac01afa31dd7a4df.png
控制台记载斲丧日记如下:
2024-10-30 17:24:52.716INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Key--订单666创建
2024-10-30 17:24:52.819INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Key--订单666支付
2024-10-30 17:24:52.921INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer: 消息消费成功消息内容:Key--订单666发货
消息是按序次斲丧的,结果符合预期。
总结:Kafka 只能在单个 Partition 中保持消息的序次存储,要想包管消息的序次性就必须让需要保持序次的消息发送到同一个 Partition,对于斲丧端,斲丧消息的序次性只需要包管使用单线程举行斲丧即可,一样寻常来说比力少用到 Kafka 的序次消息,这里分享一下照旧希望可以资助到有需要的朋友。
如有不正确的地方接待各位指出改正。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka 之序次消息