滴水恩情 发表于 2024-8-19 02:14:11

Kafka生产与斲丧详解

Kafka生产与斲丧全流程

Kafka是一款消息中心件,消息中心件本质就是收消息与发消息,以是这节课我们会从一条消息开始生产出发,去了解生产端的运行流程,然后简单的了解一下broker的存储流程,最后这条消息是如何被斲丧者斲丧掉的。其中最核心的有以下内容。
1、Kafka客户端是如何去设计一个非常良好的生产级的保证高吞吐的一个缓冲机制
2、斲丧端的原理:每个斲丧组的群主如何选择,斲丧组的群组协调器如何选择,分区分配的方法,分布式斲丧的实现机制,拉取消息的原理,offset提交的原理。
Kafka一条消息发送和斲丧的流程(非集群)

https://i-blog.csdnimg.cn/blog_migrate/c33144e51690cb78305c8325c6f00405.png
简单入门

我们这里利用Kafka内置的客户端API开辟kafka应用程序。因为我们是Java程序员,以是这里我们利用Maven,利用较新的版本
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.3.1</version>
</dependency>
生产者

先创建一个主题,推荐在消息发送时创建对应的主题。当然就算没有创建主题,Kafka也能主动创建。
auto.create.topics.enable
是否答应主动创建主题。如果设为true,那么produce(生产者往主题写消息),consume(斲丧者从主题读消息)或者fetch metadata(恣意客户端向主题发送元数据哀求时)一个不存在的主题时,就会主动创建。缺省为true。
num.partitions
每个新建主题的分区个数(分区个数只能增长,不能减少 )。这个参数默认值是1(最新版本)
必选属性

创建生产者对象时有三个属性必须指定。
bootstrap.servers

该属性指定broker的地点清单,地点的格式为host:port。
清单里不必要包含所有的broker地点,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息(用逗号分隔,好比:127.0.0.1:9092,192.168.0.13:9092),一旦其中一个宕机,生产者仍能毗连到集群上。
key.serializer

生产者接口答应利用参数化类型,可以把Java对象作为键和值传broker,但是broker希望收到的消息的键和值都是字节数组,以是,必须提供将对象序列化成字节数组的序列化器。
key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类
Kafka的客户端默认提供了ByteArraySerializer,IntegerSerializer,StringSerializer,也可以实现自定义的序列化器。
value.serializer

同 key.serializer。
三种发送方式

我们通过生成者的send方法进行发送。send方法会返回一个包含RecordMetadata的Future对象。RecordMetadata里包含了目标主题,分区信息和消息的偏移量。
发送并忘记

忽略send方法的返回值,不做任那边理。大多数环境下,消息会正常到达,而且生产者会主动重试,但有时会丢失消息。
package com.msb.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
* 类说明:kafak生产者
*/
public class HelloKafkaProducer {

    public static void main(String[] args) {
      // 设置属性
      Properties properties = new Properties();
      // 指定连接的kafka服务器的地址
      properties.put("bootstrap.servers","127.0.0.1:9092");
      // 设置String的序列化
      properties.put("key.serializer", StringSerializer.class);
      properties.put("value.serializer", StringSerializer.class);

      // 构建kafka生产者对象
      KafkaProducer<String,String> producer= new KafkaProducer<String, String>(properties);
      try {
            ProducerRecord<String,String> record;
            try {
                // 构建消息
                record = new ProducerRecord<String,String>("msb", "teacher","lijin");
                // 发送消息
                producer.send(record);
                System.out.println("message is sent.");
            } catch (Exception e) {
                e.printStackTrace();
            }
      } finally {
            // 释放连接
            producer.close();
      }
    }


}

同步发送

获得send方法返回的Future对象,在符合的时间调用Future的get方法。参见代码。
package com.msb.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

/**
* 类说明:发送消息--同步模式
*/
public class SynProducer {

    public static void main(String[] args) {
      // 设置属性
      Properties properties = new Properties();
      // 指定连接的kafka服务器的地址
      properties.put("bootstrap.servers","127.0.0.1:9092");
      // 设置String的序列化
      properties.put("key.serializer", StringSerializer.class);
      properties.put("value.serializer", StringSerializer.class);

      // 构建kafka生产者对象
      KafkaProducer<String,String> producer= new KafkaProducer<String, String>(properties);
      try {
            ProducerRecord<String,String> record;
            try {
                // 构建消息
                record = new ProducerRecord<String,String>("msb", "teacher2333","lijin");
                // 发送消息
                Future<RecordMetadata> future =producer.send(record);
                RecordMetadata recordMetadata = future.get();
                if(null!=recordMetadata){
                  System.out.println("offset:"+recordMetadata.offset()+","
                            +"partition:"+recordMetadata.partition());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
      } finally {
            // 释放连接
            producer.close();
      }
    }




}

异步发送

实现接口org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给send方法。
package com.msb.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

/**
* 类说明:发送消息--异步模式
*/
public class AsynProducer {

    public static void main(String[] args) {
      // 设置属性
      Properties properties = new Properties();
      // 指定连接的kafka服务器的地址
      properties.put("bootstrap.servers","127.0.0.1:9092");
      // 设置String的序列化
      properties.put("key.serializer", StringSerializer.class);
      properties.put("value.serializer", StringSerializer.class);

      // 构建kafka生产者对象
      KafkaProducer<String,String> producer= new KafkaProducer<String, String>(properties);

      try {
            ProducerRecord<String,String> record;
            try {
                // 构建消息
                record = new ProducerRecord<String,String>("msb", "teacher","lijin");
                // 发送消息
                producer.send(record, new Callback() {
                  @Override
                  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null){
                            // 没有异常,输出信息到控制台
                            System.out.println("offset:"+recordMetadata.offset()+"," +"partition:"+recordMetadata.partition());
                        } else {
                            // 出现异常打印
                            e.printStackTrace();
                        }
                  }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
      } finally {
            // 释放连接
            producer.close();
      }
    }




}

斲丧者

斲丧者的含义,同一般消息中心件中斲丧者的概念。在高并发的环境下,生产者产生消息的速度是远大于斲丧者斲丧的速度,单个斲丧者很可能会负担不起,此时有必要对斲丧者进行横向伸缩,于是我们可以利用多个斲丧者从同一个主题读取消息,对消息进行分流。
必选属性

创建斲丧者对象时一般有四个属性必须指定。
bootstrap.servers、value.Deserializer key.Deserializer 含义同生产者
可选属性

group.id 并非完全必需,它指定了斲丧者属于哪一个群组,但是创建不属于任何一个群组的斲丧者并没有问题。不过绝大部分环境我们都会利用群组斲丧。
斲丧者群组

Kafka里斲丧者从属于斲丧者群组,一个群组里的斲丧者订阅的都是同一个主题,每个斲丧者接收主题一部分分区的消息。
https://i-blog.csdnimg.cn/blog_migrate/77f4519c5c507461692db399dd8d4ebf.png
如上图,主题T有4个分区,群组中只有一个斲丧者,则该斲丧者将收到主题T1全部4个分区的消息。
https://i-blog.csdnimg.cn/blog_migrate/b0d8fb6ea59a35430f78d57ea2670ac4.png
如上图,在群组中增长一个斲丧者2,那么每个斲丧者将分别从两个分区接收消息,上图中就表现为斲丧者1接收分区1和分区3的消息,斲丧者2接收分区2和分区4的消息。
https://i-blog.csdnimg.cn/blog_migrate/14213ff6014346271693e80f09bfa035.png
如上图,在群组中有4个斲丧者,那么每个斲丧者将分别从1个分区接收消息。
https://i-blog.csdnimg.cn/blog_migrate/d17fd4b9c01437ec7372baa0ac4b3057.png
但是,当我们增长更多的斲丧者,超过了主题的分区数量,就会有一部分的斲丧者被闲置,不会接收到任何消息。
往斲丧者群组里增长斲丧者是进行横向伸缩能力的重要方式。以是我们有必要为主题设定符合规模的分区,在负载均衡的时间可以加入更多的斲丧者。但是要记着,一个群组里斲丧者数量超过了主题的分区数量,多出来的斲丧者是没有用处的。
序列化

创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景。我们完全可以自定义序列化器。只要实现org.apache.kafka.common.serialization.Serializer接口即可。
https://i-blog.csdnimg.cn/blog_migrate/d690a92886678575071279223bb483dc.png
自定义序列化

代码见:
https://i-blog.csdnimg.cn/blog_migrate/5939efc36c553cd36493707ed36d4795.png
代码中利用到了自定义序列化。
https://i-blog.csdnimg.cn/blog_migrate/0929e6def7d799573cc22c9b67b776a5.png
id的长度4个字节,字符串的长度描述4个字节, 字符串自己的长度nameSize个字节
https://i-blog.csdnimg.cn/blog_migrate/28a6fdd0a62448c3e950d3e5e2196d48.png
自定义序列化容易导致程序的脆弱性。举例,在我们上面的实现里,我们有多种类型的斲丧者,每个斲丧者对实体字段都有各自的需求,好比,有的将字段变更为long型,有的会增长字段,如许会出现新旧消息的兼容性问题。特别是在系统升级的时间,经常会出现一部分系统升级,其余系统被迫跟着升级的环境。
解决这个问题,可以思量利用自带格式描述以及语言无关的序列化框架。好比Protobuf,Kafka官方推荐的Apache Avro
分区

因为在Kafka中一个topic可以有多个partition,以是当一个生产发送消息,这条消息应该发送到哪个partition,这个过程就叫做分区。
当然,我们在新建消息的时间,我们可以指定partition,只要指定partition,那么分区器的策略则失效。
https://i-blog.csdnimg.cn/blog_migrate/f18dd37d51178170aef555bc32d89ced.png
系统分区器

在我们的代码中可以看到,生产者参数中是可以选择分区器的。
https://i-blog.csdnimg.cn/blog_migrate/a12de51cbba6bd7f03b34ab00ae96a76.png
https://i-blog.csdnimg.cn/blog_migrate/ab178b49cac96628a5e9bacdba3fcc82.png
DefaultPartitioner 默认分区策略

全路径类名:org.apache.kafka.clients.producer.internals.DefaultPartitioner


[*]如果消息中指定了分区,则利用它
[*]如果未指定分区但存在key,则根据序列化key利用murmur2哈希算法对分区数取模。
[*]如果不存在分区或key,则会利用粘性分区策略
采用默认分区的方式,键的重要用途有两个:
一,用来决定消息被写往主题的哪个分区,拥有雷同键的消息将被写往同一个分区。
二,还可以作为消息的附加消息。
RoundRobinPartitioner 分区策略

全路径类名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner


[*]如果消息中指定了分区,则利用它
[*]将消息平均的分配到每个分区中。
即key为null,那么这个时间一般也会采用RoundRobinPartitioner
UniformStickyPartitioner 纯粹的粘性分区策略

全路径类名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner
他跟DefaultPartitioner 分区策略的唯一区别就是。
DefaultPartitionerd 如果有key的话,那么它是按照key来决定分区的,这个时间并不会利用粘性分区
UniformStickyPartitioner 是不管你有没有key, 统一都用粘性分区来分配
另外关于粘性分区策略

从客户端最新的版本上来看(3.3.1),有两个序列化器已经进入 弃用阶段。
这个客户端在3.1.0都还不是如许。关于粘性分区策略
https://i-blog.csdnimg.cn/blog_migrate/fd93fb7099826d8cc108e25e4c98db9a.png
自定义分区器

我们完全可以去实现Partitioner接口,去实现有一个自定义的分区器

package com.msb.selfpartition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

/**
* 类说明:自定义分区器,以value值进行分区
*/
public class SelfPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
      List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
      int num = partitionInfos.size();
      int parId = Utils.toPositive(Utils.murmur2(valueBytes)) % num;//来自DefaultPartitioner的处理
      return parId;
    }

    public void close() {
      //do nothing
    }

    public void configure(Map<String, ?> configs) {
      //do nothing
    }

}

https://i-blog.csdnimg.cn/blog_migrate/02e52100d4a719f54e8757238f864d38.png
生产缓冲机制

客户端发送消息给kafka服务器的时间、消息会先写入一个内存缓冲中,然后直到多条消息组成了一个Batch,才会一次网络通讯把Batch发送过去。重要有以下参数:
buffer.memory
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向broker发送的速度,导致生产者空间不足,producer会阻塞或者抛出异常。缺省33554432 (32M)
buffer.memory: 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略batch.size和linger.ms的限定。
buffer.memory的默认数值是32 MB,对于单个 Producer 来说,可以保证充足的性能。 必要注意的是,如果您在同一个JVM中启动多个 Producer,那么每个 Producer 都有可能占用 32 MB缓存空间,此时便有可能触发 OOM。
batch.size
当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以利用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不肯定都会比及批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送(linger.ms控制)。缺省16384(16k) ,如果一条消息超过了批次的大小,会写不进去。
linger.ms
指定了生产者在发送批次前等候更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立刻发送而不顾这项设置,然而如果我们获得消息字节数比batch.size设置要小的多,我们必要“linger”特定的时间以获取更多的消息。这个设置默以为0,即没有延迟。设定linger.ms=5,比方,将会减少哀求数量,但是同时会增长5ms的延迟,但也会提升消息的吞吐量。
为何要设计缓冲机制

1、减少IO的开销(单个 ->批次)但是这种环境根本上也只是linger.ms配置>0的环境下才会有,因为默认inger.ms=0的,以是根本上有消息进来了就发送了,跟单条发送是差不多!!
2、减少Kafka中Java客户端的GC。
好比缓冲池大小是32MB。然后把32MB划分为N多个内存块,好比说一个内存块是16KB(batch.size),如许的话这个缓冲池里就会有很多的内存块。
你必要创建一个新的Batch,就从缓冲池里取一个16KB的内存块就可以了,然后这个Batch就不断的写入消息
下次别人再要构建一个Batch的时间,再次利用缓冲池里的内存块就好了。如许就可以利用有限的内存,对他不停的反复重复的利用。因为如果你的Batch利用完了以后是把内存块还回到缓冲池中去,那么就不涉及到垃圾接纳了。
https://i-blog.csdnimg.cn/blog_migrate/1502f709438fd9f06333feaec2e9e60d.png
斲丧者偏移量提交

一般环境下,我们调用poll方法的时间,broker返回的是生产者写入Kafka同时kafka的斲丧者提交偏移量,如允许以确保斲丧者消息斲丧不丢失也不重复,以是一般环境下Kafka提供的原生的斲丧者是安全的,但是事情会这么完善吗?
主动提交

最简单的提交方式是让斲丧者主动提交偏移量。 如果enable.auto.commit被设为 true,斲丧者会主动把从poll()方法接收到的最大偏移量提交上去。提交时间隔断由auto.commit.interval.ms控制,默认值是5s。
主动提交是在轮询里进行的,斲丧者每次在进行轮询时会检査是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
不过,在利用这种轻便的方式之前,必要知道它将会带来怎样的结果。
假设我们仍旧利用默认的5s提交时间隔断, 在近来一次提交之后的3s发生了再均衡,再均衡之后,斲丧者从最后一次提交的偏移量位置开始读取消息。这个时间偏移量已经掉队了3s,以是在这3s内到达的消息会被重复处理。可以通过修改提交时间隔断来更频繁地提交偏移量, 减小可能出现重复消息的时间窗, 不过这种环境是无法完全避免的。
在利用主动提交时,每次调用轮询方法都会把上一次调用返回的最大偏移量提交上去,它并不知道具体哪些消息已经被处理了,以是在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(enable.auto.comnit被设为 true时,在调用 close()方法之前也会进行主动提交)。一般环境下不会有什么问题,不过在处理异常或提前退出轮询时要格外鉴戒。
斲丧者的配置参数

auto.offset.reset
earliest
当各分区下有已提交的offset时,从提交的offset开始斲丧;无提交的offset时,重新开始斲丧
latest
当各分区下有已提交的offset时,从提交的offset开始斲丧;无提交的offset时,斲丧新产生的该分区下的数据
只要group.Id稳固,不管auto.offset.reset 设置成什么值,都从上一次的斲丧结束的地方开始斲丧。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka生产与斲丧详解