ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Kafka入门-分区及压缩
[打印本页]
作者:
锦通
时间:
2024-9-23 04:30
标题:
Kafka入门-分区及压缩
一、生产者消息分区
Kafka的消息构造方式实际上是三级结构:
主题-分区-消息
。主题下的每条消息只会生存在某一个分区中,而不会在多个分区中被生存多份。
分区的作用
就是提供
负载均衡
的本领,或者说对数据进行分区的主要原因,就是为了实现系统的
高伸缩性(Scalability
)。不同的分区能够被放置到不同节点的机器上,而数据的
读写操纵也都是针对分区这个粒度而进行的
,这样每个节点的机器都能独立地实行各自分区的读写哀求处理处罚。并且,我们还可以通过添加新的节点机器来增长团体系统的吞吐量。
分区战略
所谓分区战略是决定生产者将消息发送到哪个分区的算法
。Kafka为我们提供了
默认的分区战略
,同时它也支持
自界说分区战略
。 如果要自界说分区战略,需要显式地设置生产者端的参数
partitioner.class
。这个参数该怎么设定呢?方法很简朴,在编写生产者步伐时,你可以
编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口
。这个接口也很简朴,只界说了两个方法:
partition()和close()
,通常你只需要实现最告急的
partition
方法。我们来看看这个方法的方法签名:
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
这里的topic、key、keyBytes、value和valueBytes都属于
消息数据
,cluster则是集群信息(好比当前Kafka集群共有多少主题、多少Broker等)。只要实现类界说好了
partition
方法,同时设置
partitioner.class参数
为实现类的FullQualified Name,那么生产者步伐就会按照代码逻辑对消息进行分区。
比力常见的分区战略:
1. 轮询战略
未指定partitioner.class参数,默认利用
2. 随机战略
要实现随机战略版的partition方法,很简朴,只需要两行代码即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
复制代码
先盘算出该主题总的分区数,然后随机地返回一个小于它的正整数。
3. 按消息键保序战略
Kafka答应为每条消息界说消息键,简称为
Key
。这个Key的作用非常大,它可以是一个有着明白业务含义的字符串,好比客户代码、部门编号或是业务ID等;一旦消息被界说了Key,那么就可以包管
同一个Key的所有消息都进入到雷同的分区内里
,实现这个战略的partition方法同样简朴,只需要下面两行代码即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
复制代码
4. 基于地理位置的分区战略
这种战略一般只针对那些大规模的Kafka集群,特别是跨城市、跨国家甚至是跨大洲的集群。
根据Broker所在的IP地址实现定制化的分区战略。好比下面这段代码:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
复制代码
二、Kafka的压缩
Kafka的消息层次
都分为两层:
消息集合(messageset)以及消息(message)
。一个消息集合中包罗若干条
日志项(record item)
,而日志项才是真正
封装消息
的地方。Kafka底层的消息日志由一系列消息集合日志项组成。Kafka通常不会直接操纵具体的一条条消息,它总是在消息集合这个层面上进行写入操纵。
在Kafka中,压缩大概发生在两个地方:
生产者端和Broker端
。
生产者端
生产者步伐中设置
compression.type
参数即表现启用指定类型的压缩算法。好比下面这段步伐代码展示了怎样构建一个开启GZIP的Producer对象:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
复制代码
这样Producer启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以 及Kafka Broker端的磁盘占用。
Broker端
1. Broker端指定了和Producer端不同的压缩算法。Broker端也有一个参数叫
compression.type
,默认值是producer,可以设置不同压缩算法。
2. Broker端发生了消息格式转换。为了兼容老版本的格式,Broker端 会对新版本消息实行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。
解压缩
解压缩发生在
消耗者步伐
中,也就是说Producer发送压缩消息到Broker后,Broker照单全收并原样生存起来。当
Consumer
步伐哀求这部分消息时,由Consumer自行
解压缩还原
成之前的消息。
Consumer怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就
在消息中
。Kafka会将启用了哪种压缩算法
封装进消息集合中
,这样当Consumer读取到消息集合时,它自然就知道 这些消息利用的是哪种压缩算法。
Producer端压缩、Broker端保持、Consumer端解压缩
。
对于Kafka而言压缩算法对比:
在
吞吐量方面
:LZ4 > Snappy > zstd和GZIP;
而在
压缩比
方面,zstd > LZ4 > GZIP > Snappy。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4