马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
3.1 数据可靠性与一致性
Producer 端可靠性策略
Kafka 通过acks参数控制消息确认机制,不同设置实用于不同场景:
- acks=0:Producer 发送消息后不等待 Broker 确认,立即返回。这种模式吞吐量最高,但大概丢失消息(如网络故障导致消息未到达 Broker)。实用于对可靠性要求不高的场景,如日志
收集。
- acks=1(默认):Producer 发送消息后,等待 Leader 副本接收乐成即返回。若 Follower 副本未同步时 Leader 宕机,大概导致消息丢失。实用于对可靠性有一定要求,但可接受少量数据丢失的场景。
- acks=all:Producer 发送消息后,等待 ISR 集合中所有副本都接收乐成才返回。这种模式保证消息不丢失,但耽误较高。实用于对可靠性要求极高的场景,如金融生意业务。
幂等性生产者
开启幂等性(enable.idempotence=true)可制止消息重复发送:- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
复制代码 幂等性生产者通过为每个消息分配唯一 ID(PID)和序列号(Sequence Number),确保 Broker 不会重复写入雷同消息。
Consumer 端 Exactly-Once 语义
Kafka 提供三种消费语义:
- At-Most-Once(最多一次):消费失败后不重试,大概导致消息丢失。
- At-Least-Once(至少一次):消费失败后重试,大概导致消息重复消费。
- Exactly-Once(精确一次):通过事务或幂等性保证每条消息仅被消费一次。
实现 Exactly-Once 语义的关键是将消息消费与 Offset 提交作为原子操作:- // 配置事务
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
- producer.initTransactions();
- try {
- producer.beginTransaction();
- // 消费消息
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- // 处理消息
- process(record);
- // 发送结果到输出Topic
- producer.send(new ProducerRecord<>("output_topic", record.key(), processResult));
- }
- // 提交消费位移和生产的消息
- producer.sendOffsetsToTransaction(currentOffsets, "test_group");
- producer.commitTransaction();
- } catch (Exception e) {
- producer.abortTransaction();
- }
复制代码 3.2 分区分配与负载平衡
Consumer Group 分区分配策略
Kafka 提供三种内置分区分配策略:
- RangeAssignor(默认):按 Topic 的分区 ID 排序,依次分配给 Consumer。例如,Topic 有 5 个分区,Consumer Group 有 2 个 Consumer,则 Consumer1 分配分区 0-2,Consumer2 分配分区 3-4。大概导致分配不均。
- RoundRobin:将所有 Topic 的所有分区按顺序轮询分配给 Consumer。实用于消费多个 Topic 的场景,分配更均匀。
- StickyAssignor:在 Rebalance 时只管保持原有分配关系,减少分区移动。例如,新增 Consumer 时,仅将部分分区从其他 Consumer 转移给新 Consumer。
通过partition.assignment.strategy参数配置分配策略:- props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
- Collections.singletonList(StickyAssignor.class.getName()));
复制代码 动态 Rebalance 机制
当 Consumer 加入或退出 Group、Topic 分区数变更时,会触发 Rebalance:
Coordinator 选举:Group 中第一个启动的 Consumer 向任意 Broker 发送哀求,获取 Group Coordinator(负责管理该 Group 的 Broker)。
- 成员注册:所有 Consumer 向 Coordinator 注册,Coordinator 收集所有成员信息。
- 分区分配:Coordinator 选择一种分配策略,盘算每个 Consumer 应分配的分区。
- 分配结果通知:Coordinator 将分配结果发送给所有 Consumer。
自定义分区分配策略
实现org.apache.kafka.clients.consumer.PartitionAssignor接口,可根据业务需求自定义分区分配逻辑。例如,按消息类型将特定分区分配给指定 Consumer:- public class CustomPartitionAssignor implements PartitionAssignor {
- @Override
- public Subscription subscription(Set<String> topics) {
- return new Subscription(new ArrayList<>(), Collections.singletonMap("custom_config", "value"));
- }
- @Override
- public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
- // 自定义分区分配逻辑
- }
-
- // 其他接口方法实现
- }
复制代码 3.3 监控 与运维
内置监控 指标
Kafka 通过 JMX(Java Management Extensions)暴露大量监控 指标,重要分为三类:
- Broker 级别:如MessagesInPerSec(每秒接收消息数)、BytesInPerSec(每秒接收字节数)、FailedProduceRequestsPerSec(每秒失败的生产哀求数)。
- Topic 级别:如PartitionCount(分区数)、UnderReplicatedPartitions(欠复制分区数)。
- Consumer 级别:如ConsumerLag(消费滞后量)、RecordsConsumedPerSec(每秒消费记录数)。
常用监控工具
- # prometheus.yml配置示例
- scrape_configs:
- - job_name: 'kafka'
- static_configs:
- - targets: ['kafka-broker-1:9100', 'kafka-broker-2:9100'] # JMX Exporter端口
复制代码
- Kafka Manager:开源的 Kafka 集群管理工具,支持 Topic 创建、分区管理、集群状态监控等功能。
- Prometheus + Grafana:企业级监控方案,通过 Prometheus 收罗 Kafka 指标,Grafana 可视化展示。需配置 JMX Exporter 作为中央件:
- Confluent Control Center:Confluent 提供的贸易监控工具,支持 Kafka 集群、Schema Registry、Kafka Connect 等组件的全方位监控。
运维下令与故障排查
1.查看 Consumer Group 消费滞后量:- /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
- --describe --group test_group
复制代码 2.手动重置消费位移:- /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
- --group test_group --topic test_topic --reset-offsets --to-earliest --execute
复制代码 3.修复副本同步问题:- /opt/kafka/bin/kafka-replica-verification.sh --bootstrap-server localhost:9092 \
- --topic test_topic --include-offline-partitions
复制代码 4.常见故障排查:
- 生产者无法毗连 Broker:检查网络连通性、防火墙配置、bootstrap.servers参数。
- 消费者消费滞后:检查 Consumer 性能、Topic 分区数、消息处置惩罚逻辑。
- Broker 磁盘空间不足:清理逾期日志
、增长磁盘容量、调解log.retention.hours参数。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|