Kafka入门-分区及压缩

锦通  金牌会员 | 2024-9-23 04:30:15 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 912|帖子 912|积分 2736

一、生产者消息分区

Kafka的消息构造方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会生存在某一个分区中,而不会在多个分区中被生存多份。

分区的作用就是提供负载均衡的本领,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操纵也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地实行各自分区的读写哀求处理处罚。并且,我们还可以通过添加新的节点机器来增长团体系统的吞吐量。 
分区战略

所谓分区战略是决定生产者将消息发送到哪个分区的算法。Kafka为我们提供了默认的分区战略,同时它也支持自界说分区战略。 如果要自界说分区战略,需要显式地设置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简朴,在编写生产者步伐时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简朴,只界说了两个方法:partition()和close(),通常你只需要实现最告急的partition方法。我们来看看这个方法的方法签名:
   int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(好比当前Kafka集群共有多少主题、多少Broker等)。只要实现类界说好了partition方法,同时设置partitioner.class参数为实现类的FullQualified Name,那么生产者步伐就会按照代码逻辑对消息进行分区。
比力常见的分区战略:
1. 轮询战略


未指定partitioner.class参数,默认利用
2. 随机战略


要实现随机战略版的partition方法,很简朴,只需要两行代码即可:
  1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return ThreadLocalRandom.current().nextInt(partitions.size());
复制代码
 先盘算出该主题总的分区数,然后随机地返回一个小于它的正整数。
3. 按消息键保序战略 


Kafka答应为每条消息界说消息键,简称为Key。这个Key的作用非常大,它可以是一个有着明白业务含义的字符串,好比客户代码、部门编号或是业务ID等;一旦消息被界说了Key,那么就可以包管同一个Key的所有消息都进入到雷同的分区内里,实现这个战略的partition方法同样简朴,只需要下面两行代码即可:
  1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return Math.abs(key.hashCode()) % partitions.size();
复制代码
4. 基于地理位置的分区战略

这种战略一般只针对那些大规模的Kafka集群,特别是跨城市、跨国家甚至是跨大洲的集群。
根据Broker所在的IP地址实现定制化的分区战略。好比下面这段代码:
  1. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  2. return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
复制代码
 二、Kafka的压缩

Kafka的消息层次都分为两层:消息集合(messageset)以及消息(message)。一个消息集合中包罗若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka底层的消息日志由一系列消息集合日志项组成。Kafka通常不会直接操纵具体的一条条消息,它总是在消息集合这个层面上进行写入操纵。
在Kafka中,压缩大概发生在两个地方:生产者端和Broker端
生产者端

生产者步伐中设置compression.type参数即表现启用指定类型的压缩算法。好比下面这段步伐代码展示了怎样构建一个开启GZIP的Producer对象:
  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("acks", "all");
  4. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  6. // 开启GZIP压缩
  7. props.put("compression.type", "gzip");
  8. Producer<String, String> producer = new KafkaProducer<>(props);
复制代码
这样Producer启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以 及Kafka Broker端的磁盘占用。 
Broker端

1. Broker端指定了和Producer端不同的压缩算法。Broker端也有一个参数叫compression.type,默认值是producer,可以设置不同压缩算法。
2. Broker端发生了消息格式转换。为了兼容老版本的格式,Broker端 会对新版本消息实行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。
解压缩

解压缩发生在消耗者步伐中,也就是说Producer发送压缩消息到Broker后,Broker照单全收并原样生存起来。当Consumer步伐哀求这部分消息时,由Consumer自行解压缩还原成之前的消息。
Consumer怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka会将启用了哪种压缩算法封装进消息集合中,这样当Consumer读取到消息集合时,它自然就知道 这些消息利用的是哪种压缩算法。
Producer端压缩、Broker端保持、Consumer端解压缩
对于Kafka而言压缩算法对比:
        在吞吐量方面:LZ4 > Snappy > zstd和GZIP;
        而在压缩比方面,zstd > LZ4 > GZIP > Snappy。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表