点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
- 斲丧组测试,斲丧者变动对斲丧的影响
- 斲丧者的心跳机制
- 斲丧者的相关配置参数
主题和分区
- Topic:Kafka用于分类管理消息的逻辑单元,雷同于MySQL的数据库
- Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以举行程度扩展,通常Partition的数量是BrokerServer数量的整数倍
- ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的本领。保证一个斲丧组获取到特定主题的全部消息。在消息组内部,多少个斲丧者斲丧主题分区的消息,斲丧组可以保证一个主题的每个分区只被斲丧组中的一个斲丧者斲丧。
- Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制斲丧的速率。
反序列化
- Kafka的Broker中全部的消息都是字节数组,斲丧者获取到消息之后,需要先对消息举行反序列化处置惩罚,然后才气交由给用户步伐斲丧。
- 斲丧者的反序列化器包括Key和Value。
自界说反序列化
如果要实现自界说的反序列化器,需要实现 Deserializer 接口:
- public class UserDeserializer implements Deserializer<User> {
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- Deserializer.super.configure(configs, isKey);
- }
- @Override
- public User deserialize(String topic, byte[] data) {
- ByteBuffer buffer = ByteBuffer.allocate(data.length);
- buffer.put(data);
- buffer.flip();
- int userId = buffer.getInt();
- int usernameLen = buffer.getInt();
- String username = new String(data, 8, usernameLen);
- int passwordLen = buffer.getInt();
- String password = new String(data, 8 + usernameLen, passwordLen);
- int age = buffer.getInt();
- User user = new User();
- user.setUserId(userId);
- user.setUsername(username);
- user.setPassword(password);
- user.setAge(age);
- return user;
- }
- @Override
- public User deserialize(String topic, Headers headers, byte[] data) {
- return Deserializer.super.deserialize(topic, headers, data);
- }
- @Override
- public void close() {
- Deserializer.super.close();
- }
- }
复制代码 斲丧者拦截器
斲丧者在拉取了分区消息之后,要起首颠末反序列化器对Key和Value举行反序列化操纵。
斲丧端界说消息拦截器,要实现 ConsumerInterceptor接口:
- 一个可插拔的接口,答应拦截、更改斲丧者吸收到的消息,主要的用例在于将第三方组件引入斲丧者应用步伐,用于定制监控、日记处置惩罚等
- 该接口的实现类通过configure方法获取斲丧者配置的属性,如果斲丧者配置中没有指定ClientID,还可以获取KafkaConsumer天生的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。
- ConsumerInterceptor方法抛出异常会被捕捉,但不会向下传播,如果配置了错误的参数类型,斲丧者不会抛出异常而是记录下来。
- ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程
- public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {
- @Override
- public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
- System.out.println("=== 消费者拦截器 01 onConsume ===");
- return records;
- }
- @Override
- public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
- System.out.println("=== 消费者拦截器 01 onCommit ===");
- }
- @Override
- public void close() {
- }
- @Override
- public void configure(Map<String, ?> configs) {
- System.out.println("消费者设置的参数");
- configs.forEach((k, v) -> {
- System.out.println(k + ", " + v);
- });
- }
- }
复制代码 位移提交
相关概念
- Consumer 需要向Kafka记录本身的位移数据,这个汇报过程称为:提交位移(Committing Offsets)
- Consumer 需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中
- 位移提交:自动提交和手动提交
- 位移提交:同步提交和异步提交
自动提交
Kafka Consumer后台提交
- 开启自动提交 enable.auto.commit=true
- 配置启动提交隔断:auto.commit.interval.ms,默认是5秒
位移次序
自动提交位移的次序:
- 配置 enable.auto.commit=true
- Kafka会保证在开始调用poll方法时,提交上次poll返回的全部消息的
- 因此自动提交不会出现消息丢失,但是会重复斲丧
重复斲丧
重复斲丧的场景:
- Consumer设置5秒提交offset
- 假设提交offset后3秒发生了Rebalance
- Rebalance之后全部的Consumer从上一次提交的Offset的地方继承斲丧
- 因为Rebalance发生前3秒的内的提交就丢失了
异步提交
- 使用 KafkaConsumer#commitSync,会提交全部poll返回的最新Offset
- 该方法为同步操纵 等待直到 offset 被乐成提交才返回
- 手动同步提交可以控制offset提交的时机和频率
位移管理
Kafka中,斲丧者根据消息的位移次序斲丧消息,斲丧者的位移由斲丧者者管理,Kafka提供了斲丧者的API,让斲丧者自行管理位移。
重平衡
重平衡可以说是Kafka中诟病最厉害的一部门。
重平衡是一个协议,它规定了如何让斲丧者组下的全部斲丧者来分配Topic中每一个分区。
比如一个Topic中有100个分区,一个斲丧组内有20个斲丧者,在协调者的控制下可以让每一个斲丧者能分配到5个分区,这个分配过程就是重平衡。
重平衡的出发条件主要有三个:
- 斲丧者组内成员发生变动,这个变动包括了增长和减少斲丧者,比如斲丧者宕机退出斲丧组。
- 主题的分区数发生变革,Kafka目前只能增长分区数,当增长的时候就会触发重平衡
- 订阅的主题发生变革,当斲丧组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡
为什么说重平衡让人诟病呢?因为重平衡过程中,斲丧者无法从Kafka斲丧消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会好久。
避免重平衡
要完全避免重平衡做不到,但是要只管避免重平衡。
在分布式体系中,由于网络问题没有吸收到心跳,此时不确认是挂了还是负载没过来还是网络壅闭。
- session.timeout.ms 规定超时时间是多久
- heartbeat.interval.ms 规放心跳的频率 越高越不容易误判 但是会斲丧更多资源
- max.poll.interval.ms 斲丧者poll数据后,需要处置惩罚在举行拉取,如果两次拉取时间凌驾隔断就会被剔除,默认是5分钟。
这里给出一些保举参数的配置:
- session.timeout.ms 设置为6秒
- heaertbeat.interval.ms 设置2秒
- max.poll.interval.ms 保举斲丧者处置惩罚消息最长耗时再加1分钟
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |