Kafka入门-生产者

[复制链接]
发表于 2025-6-21 15:33:04 | 显示全部楼层 |阅读模式
生产者

生产者发送流程:

延迟时间为0ms时,也就意味着每当有数据就会直接发送
异步发送API

异步发送和同步发送的差异在于:异步发送不需要等待效果,同步发送必须等待效果才气进行下一步发送。
平常异步发送

首先导入所需的kafka依靠
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4.     <version>3.0.0</version>
  5. </dependency>
复制代码
  1. public class CustomProducer {
  2.     public static void main(String[] args) {
  3.         //配置
  4.         Properties properties = new Properties();
  5.         //连接集群
  6.         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");
  7.         //指定对应的key和value的序列化类型
  8.         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  9.         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  10.         //创建Kafka生产者对象
  11.         KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  12.         //异步发送数据
  13.         kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));
  14.         for (int i = 0; i < 8; i++) {
  15.             kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i));
  16.         }
  17.         //关闭资源
  18.         kafkaProducer.close();
  19.     }
  20. }
复制代码
带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数:元数据信息和非常信息,假如非常信息为null,说明消息发送成功,假如非常征象不为null,说明消息发送失败。
修改发送方法,采用回调
  1. //异步发送数据,并有回调函数
  2. kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));
  3. for (int i = 0; i < 8; i++) {
  4.     kafkaProducer.send(new ProducerRecord<>("first", "learning Kafka-" + i), new Callback() {
  5.         @Override
  6.         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  7.             if(e == null){
  8.                 System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());
  9.             }
  10.         }
  11.     });
  12. }
复制代码
运行方法就能看到返回的主题、分区
  1. 主题: first 分区:2
复制代码
同步发送

同步发送只需更改发送方式
  1. //同步发送数据
  2.         kafkaProducer.send(new ProducerRecord<>("first","XXX learn Kafka"));
  3.         for (int i = 0; i < 8; i++) {
  4.             kafkaProducer.send(new ProducerRecord<>("first","learning Kafka-"+i)).get();
  5.         }
复制代码
为什么要分区


  • 便于公道使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,公道控制分区的使命,可以实现负载均衡的效果。
  • 提高并行度,生产者可以以分区为单元发送数据,而消耗者则可以以分区为单元进行消耗
分区策略:

  • 默认分区策略:

    • 假如在记录中指定了分区,那么直接使用指定的分区
      例如在send方法指定分区2,key为""
      1.     kafkaProducer.send(new ProducerRecord<>("first",2,"", "learning Kafka-" + i), new Callback() {
      2.                 @Override
      3.                 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      4.                     if(e == null){
      5.                         System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());
      6.                     }
      7.                 }
      8.             });
      复制代码
    • 假如未指定分区但存在键key,则根据key的哈希值与topic的partition数量进行取余选择分区
      例如在send方法中不指定分区,设置key
      1. kafkaProducer.send(new ProducerRecord<>("first","haha", "learning Kafka-" + i), new Callback() {
      2.                 @Override
      3.                 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
      4.                     if(e == null){
      5.                         System.out.println("主题: "+ recordMetadata.topic() +" 分区:"+recordMetadata.partition());
      6.                     }
      7.                 }
      8.             });
      复制代码
    • 假如不存在分区也没有键key,那么使用黏性分区,会随机选择一个分区而且尽可能一直使用该分区,假如该分区batch已满大概已完成,kafka会再随机一个分区进使用用(和上一个分区差异)。

自定义分区器

首先自定义一个分区器
  1. public class MyPartitioner implements Partitioner {
  2.     @Override
  3.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  4.         //获取数据
  5.         String msgValues = value.toString();
  6.         int partition;
  7.         //如果发送的数据包含aha字段则发送到0号分区,不包含则发往1号分区
  8.         if(msgValues.contains("aha")){
  9.             partition = 0;
  10.         }else {
  11.             partition = 1;
  12.         }
  13.         return partition;
  14.     }
  15.     @Override
  16.     public void close() {
  17.     }
  18.     @Override
  19.     public void configure(Map<String, ?> configs) {
  20.     }
  21. }
复制代码
在创建Kafka对象之前设置配置,选择自定义的分区器
  1. //关联自定义分区
  2. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hzp.kafka.producer.MyPartitioner");
复制代码
注意:假如使用自定义分区的同时,还在send方法内指定分区,那么以指定分区为准。
生产者提高吞吐量

生产者发消息就相当于用货车从当地堆栈(缓冲区)送货到kafka,相关的参数有两个,一个是batch size批次巨细,一个是linger.ms等待时间。batch size默以为16k,相当于货车的容量巨细,假如货车装满了就发往kafka。但是通常环境下等待时间为0ms,也就是每当堆栈来了一箱货就直接送到kafka,不管货车是否装满。
因此提高吞吐量主要有以下方法:

  • 修改linger.ms,增长等待时间大概增长批次巨细,让货车尽量装多一点货乃至装满再发送。(等待时间会造成一定的延迟,通常控制在5-100ms)
  • 发送数据时,采取压缩的方式
  • 增大缓冲区巨细,缓冲区巨细通常为32m。相当于增长堆栈巨细,让堆栈能够存储更多的货品。
  1.         //缓冲区大小(单位为kb,默认32M)1024*1024*32
  2.         properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
  3.         //批次大小(单位为kb,默认16kb)1024*16
  4.         properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
  5.         //linger.ms (单位为ms)
  6.         properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
  7.         //压缩 设置压缩类型为snappy,可配置的值有gzip、snappy、lz4、zstd
  8.         properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
复制代码
数据可靠性

数据可靠性与ACK应答级别有关
acks:


  • 0:生产者发送过来的数据,不需要等数据落盘就应答。
    假如不等数据落盘就应答,轻易造成数据丢失,生产者发送数据就不管了,可靠性差,效率高。
  • 1:生产者发送过来的数据,Leader收到数据后应答
    假如Leader接收到数据,而且应答之后,突然挂掉了,但是此时Leader还没有同步数据给其他节点,此时就造成数据丢失。生产者发送数据Leader应答,可靠性中等,效率中等。
  • -1:生产者发送的数据,Leader和ISR队列中的全部节点收齐数据后应答
    生产者发送数据需要Leader和ISR队列内里全部的Follower应答,可靠性高,效率低。
           假如Leader收到数据而且和Follower同步数据时,有一个Follower因为故障,长时间不能与Leader同步,这应该如何解决?
        解决方案:Leader维护了一个动态的in-sync replica set(ISR)也就是与Leader保持同步的Follower+Leader的集合(Leader:0,ISR:0,1,2)。假如Follower长时间未向Leader发送通讯请求大概同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。如许就不消长时间等待以故障的节点。
假如分区副本为1,那么ACK应答-1和1没有区别,挂了数据就直接丢失,假如ISR内里也只有一个(Leader:0,ISR:0),那么说明没有Follower跟Leader同步,那么仍然会数据丢失。因此可以得到:数据完全可靠的条件:ACK级别设置为-1、分区副本大于等于2、ISR应答里的最小副本数量大于等于2。
通常环境下,acks=0很少使用,acks=1主要用于传输平常日志日志(大量但并不重要的数据),允许个别数据丢失,acks=-1一般用于传输重要的数据好比金钱这类对可靠性要求比较高的场景。
   acks=-1仍然存在问题,好比如今Leader:0,ISR:0,1,2。生产者发送数据data,Leader:0接收到data后与1、2同步数据。同步数据完成之后,即将应答之前,Leader突然挂掉了,那么此时就会从1,2中选择一个成为新的Leader。假设1成为新的Leader,此时生产者没有收到应答,再次发送数据data,那么此时Leader:1就接收到了两份data数据,造成数据重复。
  java设置acks,以及重试次数
  1. //acks 设置为1
  2. properties.put(ProducerConfig.ACKS_CONFIG,"1");
  3. //重试次数 默认为int的最大值
  4. properties.put(ProducerConfig.RETRIES_CONFIG,3);
复制代码
数据去重

在刚刚的数据可靠性中,我们知道怎么让数据能够完全可靠,就是让ACK级别设置为-1、分区副本大于等于2、ISR应答里的最小副本数量大于等于2。从数据通报来看,这种设置就是数据通报至少一次(At Least One);而当ACK级别设置为0,那么数据通报最多一次(At Most One)。
At Least One可以保证数据不丢失,但是不能保证数据不重复,At Most One可以保证数据不重复,但是不能保证数据不丢失。那么假如既想数据不丢失,又想数据不重复,此时就要依靠幂等性和变乱。
幂等性

幂等性就是指Producer岂论向Broker发送多少次重复数据,Broker端都只会持久化一条数据,保证了不重复。
重复数据的判断标准就是<ID、Partition、SqlNumber>相同的消息,Broker只会持久化一条数据。Pid标识指的是ProducerId,生产者编号,Kafka每重启一次就分配一个新的;Partiton标识分区号;SqlNumber是单调自增的,因此幂等性能够保证在单分区、单会话内不重复。
幂等性的使用只需设置enable.idempotence即可,默以为true,关闭只需设置为false。
变乱

变乱开启之前,必须先开启幂等性。变乱底层依靠幂等性。
数据有序

Kafka单分区内有序,但是多分区时,分区与分区之间无序。
数据乱序

kafka保证数据单分区有序的条件是:

  • 假如没有开启幂等性,那么需要设置max.in.flight.request.per.connection的值为1
  • 假如开启幂等性,那么需要设置max.in.flight.request.per.connection的值小于等于5.
在kafka1.x版本之后当kafka启用幂等,那么kafka服务端会缓存producer发来的最近5个request的元数据,而幂等性的实现依靠单调递增的序号SqlNumber。假如发送时出现乱序,那么会根据单调递增的序号进行重排序。也就是说当开启了幂等性而且缓存的请求个数小于5,那么会在服务端进行一次重新排序,让数据有序。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

© 2001-2025 Discuz! Team. Powered by Discuz! X3.5

GMT+8, 2025-7-4 18:51 , Processed in 0.281166 second(s), 35 queries 手机版|qidao123.com技术社区-IT企服评测▪应用市场 ( 浙ICP备20004199 )|网站地图

快速回复 返回顶部 返回列表