马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
Raft Leader 推选实现文档
目次
1. 概述
1.1 目标
本文档具体分析白 LingRaft-Lite 模块中 Raft Leader 推选功能的实现,包罗涉及的类、实现细节、测试方法等,便于开发者明白和复现。
1.2 功能范围
- 节点状态管理(Follower、Candidate、Leader)
- 推选超时检测
- 投票哀求与相应处理处罚
- 多数派推选机制
- 心跳维护 Leader 职位
- 网络分区处理处罚
1.3 Raft 算法参考
本实现基于 Raft 论文(Diego Ongaro 和 John Ousterhout, 2014)的 Leader 推选部分,具体参考:
- Section 5.1: Leader Election
- Section 5.2: Leader Election - RequestVote RPC
- Section 5.4.1: Election Safety Property
2. 焦点概念
2.1 节点状态
Raft 节点有三种状态:
状态分析职责FOLLOWER从节点相应 Leader 的 RPC 哀求(AppendEntries、RequestVote)CANDIDATE候选节点发起推选,向其他节点哀求投票LEADER主节点处理处罚客户端哀求,向 Follower 复制日志 ,发送心跳2.2 任期 (Term)
界说:
- 时间被分成多个任期,每个任期以推选开始
- 任期号是单调递增的整数
- 每次推选都进入新任期
用途:
- 辨认逾期的信息(旧任期的投票、心跳等)
- 防止脑裂(分裂投票)
实现:- private volatile long currentTerm = 0; // 当前任期号
复制代码 2.3 推选超时 (Election Timeout)
界说:
- Follower 在收到有用心跳或投票哀求之前期待的时间
- 超时后转为 Candidate 并发起推选
随机化:
- 为了克制多个节点同时超时导致平票推选,超时时间随机化
- 通常在 150ms ~ 300ms 之间
实现:- // 配置随机范围
- config.setElectionTimeoutRandomRange(Range.of(150, 300));
- // 计算随机超时时间
- int randomTimeout = raftConfig.getElectionTimeoutMs();
复制代码 2.4 多数派 (Majority)
界说:
- 高出半数的节点数:N/2 + 1
- 3 节点集群须要 2 票
- 5 节点集群须要 3 票
紧张性:
- 包管推选效果的唯一性
- 两个多数派一定有交集,确保只有一个 Leader
实现:- public VoteCounter(long term, int totalNodes) {
- this.majorityCount = totalNodes / 2 + 1;
- }
复制代码 2.5 投票规则
节点投票给候选人的条件:
- 候选人的任期 >= 当前任期
- 假如任期雷同,candidate 的日志
至少和当前节点一样新
日志 比力规则:
- 假如 candidateLastLogTerm > lastLogTerm,投票
- 假如 candidateLastLogTerm == lastLogTerm 且 candidateLastLogIndex >= lastLogIndex,投票
- 否则,拒绝投票
3. 涉及的类及其职责
3.1 焦点类
类名路径职责RaftNodeImplcom.ling.raft.core.RaftNodeImpl节点状态管理、推选发起、投票处理处罚、心跳发送ConsensusModuleImplcom.ling.raft.core.ConsensusModuleImpl投票哀求和相应的具体实现逻辑VoteCountercom.ling.raft.core.VoteCounter投票计数器,统计和判定多数派ElectionTaskcom.ling.raft.core.task.ElectionTask推选超时检测使命HeartbeatTaskcom.ling.raft.core.task.HeartbeatTaskLeader 心跳发送使命ServerStatusEnumcom.ling.raft.enums.ServerStatusEnum节点状态摆列VoteRequestcom.ling.raft.model.dto.VoteRequest投票哀求 RPCVoteResponsecom.ling.raft.model.dto.VoteResponse投票相应 RPCThreeNodeElectionTestcom.ling.raft.example.leader.ThreeNodeElectionTest完备测试步调3.2 类关系图
- ┌─────────────────────┐
- │ RaftNodeImpl │
- │ (节点主类) │
- └──────────┬──────────┘
- │ 持有引用
- ├─────────────────┐
- ▼ ▼
- ┌─────────────────────┐ ┌─────────────────────┐
- │ ConsensusModuleImpl │ │ VoteCounter │
- │ (投票逻辑) │ │ (投票计数) │
- └─────────────────────┘ └─────────────────────┘
- │ │
- ├─────────────────────┤
- ▼ ▼
- ┌─────────────────────┐ ┌─────────────────────┐
- │ ElectionTask │ │ HeartbeatTask │
- │ (选举超时检测) │ │ (心跳任务) │
- └─────────────────────┘ └─────────────────────┘
复制代码 3.3 关键字段分析
RaftNodeImpl
- // 节点状态
- private volatile ServerStatusEnum nodeStatus = ServerStatusEnum.FOLLOWER;
- // 持久化状态
- private volatile long currentTerm = 0; // 当前任期
- private volatile String votedFor = null; // 本轮任期投票给的候选人
- // 选举相关
- private ScheduledExecutorService electionExecutor;
- private ScheduledFuture<?> electionFuture;
- private VoteCounter currentVoteCounter;
- private final Random random = new Random();
- // 心跳相关
- private ScheduledExecutorService heartbeatExecutor;
- private ScheduledFuture<?> heartbeatFuture;
- // 时间记录
- private volatile long prevElectionTime = 0; // 上次选举时间
- private volatile long preHeartBeatTime = 0; // 上次收到心跳时间
复制代码 ConsensusModuleImpl
- public final RaftNodeImpl node; // 持有 RaftNodeImpl 的引用
- public final ReentrantLock voteLock = new ReentrantLock(); // 投票锁
- public final ReentrantLock appendEntriesLock = new ReentrantLock(); // 追加条目锁
复制代码 VoteCounter
- private final long term; // 当前选举任期
- private final Set<String> votesReceived; // 已投票的节点ID集合
- private final int majorityCount; // 需要获得的多数派票数
- private volatile boolean votedForSelf; // 是否已投票给自己
复制代码 4. 实现细节
4.1 节点状态与转换
4.1.1 状态摆列
类名:ServerStatusEnum
界说:- public enum ServerStatusEnum {
- LEADER("LEADER", "主节点"),
- CANDIDATE("CANDIDATE", "候选节点"),
- FOLLOWER("FOLLOWER", "从节点");
- }
复制代码 4.1.2 状态转换图
- +-------------------------+
- | 初始化 |
- +-------------------------+
- |
- ▼
- +-------------------------+
- | FOLLOWER | <------------+
- | (等待心跳或投票) | |
- +-------------------------+ |
- | |
- | 选举超时 | 收到更高任期的
- | | AppendEntries 或
- ▼ | RequestVote
- +-------------------------+ |
- | CANDIDATE | |
- | (发起选举) | |
- +-------------------------+ |
- | |
- | 获得多数派 |
- | |
- ▼ |
- +-------------------------+ |
- | LEADER | --------------+
- | (处理客户端请求) | 发现更高任期
- +-------------------------+
复制代码 投票规则详解:
- 任期查抄
- candidate 的任期 < 当前任期 → 拒绝
- 任期更新
- candidate 的任期 > 当前任期 → 更新任期,转为 Follower
- 唯一投票
- 本轮任期已投票给其他人 → 拒绝
- 已投票给该 candidate → 担当(幂等性)
- 日志完备性
- candidate 的日志 >= 自己的日志 → 担当
- 否则 → 拒绝
4.3.3 日志比力逻辑
方法:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
实现位置:ConsensusModuleImpl.java:337-350- public void becomeFollower(long newTerm) {
- // 检查任期
- if (newTerm < currentTerm) {
- log.warn("Cannot become Follower with smaller term: {} < {}",
- newTerm, currentTerm);
- return;
- }
- ServerStatusEnum oldStatus = nodeStatus;
- // 更新状态
- nodeStatus = ServerStatusEnum.FOLLOWER;
- currentTerm = newTerm;
- votedFor = null; // 重置投票记录
- currentVoteCounter = null; // 清空投票计数器
- // 停止心跳(如果之前是 Leader)
- cancelHeartbeatTimer();
- // 重置选举定时器
- resetElectionTimer();
- log.info("State changed: {} -> FOLLOWER, term: {}", oldStatus, currentTerm);
- }
复制代码 示例:- public void becomeCandidate() {
- ServerStatusEnum oldStatus = nodeStatus;
- // 增加任期号(重要!)
- currentTerm++;
- nodeStatus = ServerStatusEnum.CANDIDATE;
- votedFor = currentNodeConfig.getServerId(); // 投票给自己
- log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);
- // 重置选举定时器
- resetElectionTimer();
- // 发起投票请求
- startElection();
- }
复制代码 4.3.4 投票锁
目标:防止并发投票哀求导致状态差异等
实现:- public void becomeLeader() {
- // 只有 Candidate 才能成为 Leader
- if (nodeStatus != ServerStatusEnum.CANDIDATE) {
- log.warn("Only CANDIDATE can become LEADER, current: {}", nodeStatus);
- return;
- }
- ServerStatusEnum oldStatus = nodeStatus;
- nodeStatus = ServerStatusEnum.LEADER;
- // 初始化 Leader 状态(nextIndex、matchIndex)
- initializeLeaderState();
- // 取消选举定时器(Leader 不需要选举)
- cancelElectionTimer();
- log.info("========================================");
- log.info("State changed: {} -> LEADER, term: {}", oldStatus, currentTerm);
- log.info("========================================");
- // 立即发送心跳并开始心跳定时器
- sendHeartbeats();
- startHeartbeatTimer();
- }
复制代码 掩护的资源:
- currentTerm
- votedFor
- nodeStatus
4.4 推选发起流程
4.4.1 开始推选
方法:startElection()
实现位置:RaftNodeImpl.java:266-289- @Override
- public void run() {
- try {
- // Leader 不需要选举
- if (node.getNodeStatus() == ServerStatusEnum.LEADER) {
- log.debug("Current node is LEADER, skip election");
- return;
- }
- // 检查是否超时
- long currentTime = System.currentTimeMillis();
- int electionTimeoutMs = node.getRaftConfig().getElectionTimeoutMs();
- long timeElapsed = currentTime - node.getPrevElectionTime();
- if (timeElapsed < electionTimeoutMs) {
- // 未超时,重新设置定时器
- node.resetElectionTimer();
- return;
- }
- // 选举超时,开始新一轮选举
- log.info("========================================");
- log.info("ELECTION TIMEOUT DETECTED!");
- log.info("Time elapsed: {}ms, Timeout: {}ms", timeElapsed, electionTimeoutMs);
- log.info("Current term: {}, Status: {}", node.getCurrentTerm(), node.getNodeStatus());
- log.info("Converting to CANDIDATE and starting new election...");
- log.info("========================================");
- node.becomeCandidate();
- } catch (Exception e) {
- log.error("Error in election task", e);
- if (node.getIsRunning().get()) {
- node.resetElectionTimer();
- }
- }
- }
复制代码 流程:
- 创建投票计数器
- 投票给自己
- 单机模式直接成为 Leader
- 多机模式并发发送投票哀求
- 查抄推选效果
4.4.2 发送投票哀求
方法:sendVoteRequest(targetNode)
实现位置:RaftNodeImpl.java:294-316- public void resetElectionTimer() {
- if (!isRunning.get()) {
- return;
- }
- // 取消旧的定时任务
- cancelElectionTimer();
- // 计算随机超时时间
- int randomTimeout = raftConfig.getElectionTimeoutMs();
- // 更新超时时间戳
- prevElectionTime = System.currentTimeMillis();
- // 设置新的定时任务
- electionFuture = electionExecutor.schedule(
- new ElectionTask(this),
- randomTimeout,
- TimeUnit.MILLISECONDS
- );
- log.debug("Election timer reset, timeout: {}ms", randomTimeout);
- }
复制代码 特点:
- 并发发送到全部其他节点
- 使用线程池异步发送
- 超时设置为 3000ms
- 失败不重试(期待下一次推选)
4.4.3 投票计数器
类名:VoteCounter
实现位置:com.ling.raft.core.VoteCounter.java
焦点方法:- RaftConfig config = new RaftConfig(currentNode, allNodes);
- config.setElectionTimeout(2); // 基础倍数
- config.setElectionTimeoutRandomRange(Range.of(150, 300)); // 随机范围
复制代码 数据布局:
- 使用 ConcurrentHashMap.newKeySet() 存储投票节点 ID
- 包管线程安全
- 自动去重(不会重复计票)
4.5 投票相应处理处罚
4.5.1 处理处罚投票相应
方法:handleVoteResponse(response, voterId)
实现位置:RaftNodeImpl.java:322-361- // RaftConfig 内部实现
- public int getElectionTimeoutMs() {
- if (electionTimeoutRandomRange == null) {
- return electionTimeout * 1000;
- }
- // 在随机范围内选择一个值
- int min = electionTimeoutRandomRange.getMin();
- int max = electionTimeoutRandomRange.getMax();
- Random random = new Random();
- return min + random.nextInt(max - min + 1);
- }
复制代码 处理处罚逻辑:
- 状态查抄
- 任期查抄
- 相应任期 > 当前任期 → 发现更高任期,转为 Follower
- 相应任期 < 当前任期 → 忽略旧相应
- 投票统计
- 投票乐成 → 纪录投票,查抄是否得到多数派
- 投票失败 → 纪录日志
4.5.2 查抄推选效果
方法:checkElectionResult()
实现位置:RaftNodeImpl.java:367-373- public class VoteRequest {
- private long term; // candidate 的任期号
- private String candidateId; // candidate 的节点 ID
- private long lastLogIndex; // candidate 最后一条日志的索引
- private long lastLogTerm; // candidate 最后一条日志的任期号
- }
复制代码 调用时机:
- 投票给本死后(单机模式)
- 收到每个投票相应后
- 全部投票哀求发送后(初始查抄)
4.6 心跳机制
4.6.1 心跳使命
类名:HeartbeatTask
实现位置:com.ling.raft.core.task.HeartbeatTask.java- public class VoteResponse {
- private long term; // 当前任期(用于更新 candidate 的任期)
- private boolean voteGranted; // 是否投票
- }
复制代码 4.6.2 发送心跳
方法:sendHeartbeats()
实现位置:RaftNodeImpl.java:407-413- @Override
- public VoteResponse requestVote(VoteRequest voteRequest) {
- voteLock.lock();
- try {
- long currentTerm = node.getCurrentTerm();
- String votedFor = node.getVotedFor();
- String candidateId = voteRequest.getCandidateId();
- log.info("Received vote request from candidate: {}, Term: {}, CurrentTerm: {}, VotedFor: {}",
- candidateId, voteRequest.getTerm(), currentTerm, votedFor);
- // 1. 任期检查
- if (voteRequest.getTerm() < currentTerm) {
- log.info("Rejected: candidate term {} < current term {}",
- voteRequest.getTerm(), currentTerm);
- return new VoteResponse(currentTerm, false);
- }
- // 2. 任期更大,更新并转为 Follower
- if (voteRequest.getTerm() > currentTerm) {
- log.info("Higher term received: {} -> {}, becoming FOLLOWER",
- currentTerm, voteRequest.getTerm());
- node.becomeFollower(voteRequest.getTerm());
- currentTerm = node.getCurrentTerm();
- votedFor = node.getVotedFor();
- }
- // 3. 检查是否已投票给其他人
- if (votedFor != null && !votedFor.equals(candidateId)) {
- log.info("Already voted for {}, rejecting {}", votedFor, candidateId);
- return new VoteResponse(currentTerm, false);
- }
- // 4. 检查日志是否至少一样新
- if (isLogUpToDate(voteRequest.getLastLogIndex(), voteRequest.getLastLogTerm())) {
- log.info("Voting for candidate: {}", candidateId);
- node.setVotedFor(candidateId);
- node.setPrevElectionTime(System.currentTimeMillis()); // 重置超时
- return new VoteResponse(currentTerm, true);
- } else {
- log.info("Candidate log not up to date");
- return new VoteResponse(currentTerm, false);
- }
- } finally {
- voteLock.unlock();
- }
- }
复制代码 4.6.3 单次心跳发送
方法:sendHeartbeat(targetNode)
实现位置:RaftNodeImpl.java:418-436- private boolean isLogUpToDate(long candidateLastLogIndex, long candidateLastLogTerm) {
- long lastLogTerm = getLastLogTerm();
- long lastLogIndex = getLastLogIndex();
- // 优先比较任期:candidate 的任期更大 → 更新
- if (candidateLastLogTerm > lastLogTerm) {
- return true;
- }
- // 任期相同,比较索引:candidate 的索引 >= 自己的索引 → 更新
- if (candidateLastLogTerm == lastLogTerm && candidateLastLogIndex >= lastLogIndex) {
- return true;
- }
- // 其他情况 → 不更新
- return false;
- }
复制代码 心跳特点:
- entries 为空列表
- 只包罗 term、leaderId 等元数据
- 用于维护 Leader 职位,防止 Follower 发起新推选
4.6.4 心跳定时器
方法:startHeartbeatTimer()
实现位置:RaftNodeImpl.java:380-391- 情况 1: candidate 任期更大
- candidate: term=3, index=5
- current: term=2, index=5
- → 投票 (任期更大)
- 情况 2: 任期相同,索引更大或相等
- candidate: term=2, index=5
- current: term=2, index=4
- → 投票 (索引更大)
- 情况 3: 任期相同,索引更小
- candidate: term=2, index=4
- current: term=2, index=5
- → 不投票 (日志落后)
- 情况 4: 任期更小
- candidate: term=1, index=10
- current: term=2, index=5
- → 不投票 (任期更小)
复制代码 设置示例:- public final ReentrantLock voteLock = new ReentrantLock();
- @Override
- public VoteResponse requestVote(VoteRequest voteRequest) {
- voteLock.lock();
- try {
- // 投票逻辑
- ...
- } finally {
- voteLock.unlock();
- }
- }
复制代码 4.6.5 心跳相应处理处罚
方法:handleHeartbeatResponse(response, nodeId)
实现位置:RaftNodeImpl.java:441-448- private void startElection() {
- int totalNodes = raftConfig.getRaftNodeConfigList().size();
- currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
- // 投票给自己
- currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());
- log.info("Starting election for term: {}, voted for self, votes: {}/{}",
- currentTerm, currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
- // 单机模式直接成为 Leader
- if (totalNodes == 1) {
- log.info("Single node mode, becoming leader immediately");
- becomeLeader();
- return;
- }
- // 发送投票请求给所有其他节点
- List<RaftNodeConfig> otherNodes = getOtherNodes();
- for (RaftNodeConfig nodeConfig : otherNodes) {
- electionExecutor.execute(() -> sendVoteRequest(nodeConfig));
- }
- // 检查是否已获得多数派(可能只有自己一票的情况)
- checkElectionResult();
- }
复制代码 处理处罚逻辑:
- 查抄相应中的任期
- 发现更高任期 → 立刻转为 Follower
- 克制网络分区导致的脑裂
4.7 安全性包管
4.7.1 推选安全性
目标:任期内最多一个 Leader
实现:
- 任期单调递增
- private void sendVoteRequest(RaftNodeConfig targetNode) {
- try {
- // 构建 VoteRequest
- VoteRequest request = VoteRequest.builder()
- .term(currentTerm)
- .candidateId(currentNodeConfig.getServerId())
- .lastLogIndex(getLastLogIndex())
- .lastLogTerm(getLastLogTerm())
- .build();
- request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
- request.setCmd(Request.REQUEST_VOTE);
- log.debug("Sending VoteRequest to {} for term {}", targetNode.getServerId(), currentTerm);
- // 发送 RPC 请求
- VoteResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
- // 处理响应
- if (response != null) {
- handleVoteResponse(response, targetNode.getServerId());
- }
- } catch (Exception e) {
- log.debug("Failed to send vote request to {}: {}", targetNode.getServerId(), e.getMessage());
- }
- }
复制代码 - 只投一次票
- // 记录投票
- public synchronized boolean recordVote(String nodeId) {
- return votesReceived.add(nodeId);
- }
- // 投票给自己
- public synchronized void voteForSelf(String selfId) {
- if (!votedForSelf) {
- votesReceived.add(selfId);
- votedForSelf = true;
- }
- }
- // 检查是否获得多数派
- public boolean hasMajority() {
- return votesReceived.size() >= majorityCount;
- }
- // 获取当前票数
- public int getVoteCount() {
- return votesReceived.size();
- }
复制代码 - 多数派束缚
- private void handleVoteResponse(VoteResponse response, String voterId) {
- // 使用同步块确保原子性
- synchronized (this) {
- // 如果不是 Candidate,忽略
- if (nodeStatus != ServerStatusEnum.CANDIDATE) {
- log.debug("Not a candidate anymore (status: {}), ignoring vote from {}",
- nodeStatus, voterId);
- return;
- }
- // 如果收到更高任期,转为 Follower
- if (response.getTerm() > currentTerm) {
- log.info("Received higher term {} from {}, stepping down",
- response.getTerm(), voterId);
- becomeFollower(response.getTerm());
- return;
- }
- // 忽略旧任期的响应
- if (response.getTerm() < currentTerm) {
- log.debug("Received stale vote response from {} for old term {}",
- voterId, response.getTerm());
- return;
- }
- // 统计投票
- if (response.isVoteGranted()) {
- boolean isNewVote = currentVoteCounter.recordVote(voterId);
- if (isNewVote) {
- log.info("Received vote from {} for term {}, total votes: {}/{}",
- voterId, currentTerm, currentVoteCounter.getVoteCount(),
- currentVoteCounter.getMajorityCount());
- // 检查选举结果
- checkElectionResult();
- }
- } else {
- log.debug("Vote denied by {} for term {}", voterId, currentTerm);
- }
- }
- }
复制代码 4.7.2 任期更新规则
规则:发现更高任期 → 更新任期,转为 Follower
实现位置:
- ConsensusModuleImpl.requestVote() 第 63-68 行
- ConsensusModuleImpl.appendEntries() 第 128-134 行
- RaftNodeImpl.handleVoteResponse() 第 333-337 行
- RaftNodeImpl.handleHeartbeatResponse() 第 443-447 行
示例:- private void checkElectionResult() {
- if (currentVoteCounter != null && currentVoteCounter.hasMajority()) {
- log.info("Majority votes received ({}/{}), becoming LEADER",
- currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
- becomeLeader();
- }
- }
复制代码 4.7.3 日志完备性查抄
目标:只投票给日志至少和自己一样新的候选人
实现:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
规则:
- candidate 任期 > 自己任期 → 投票
- 任期雷同,candidate 索引 >= 自己索引 → 投票
- 否则 → 拒绝
紧张性:
- 包管新 Leader 包罗全部已提交的日志
- 防止日志丢失或覆盖
4.7.4 脑裂防备
场景:网络分区,两个 Leader 同时存在
防备机制:
- 多数派束缚
- Leader 须要多数派支持
- 分区后的少数派无法得到富足票数
- 心跳超时
- 少数派 Follower 收不到心跳
- 推选超时后发起推选
- 多数派选出新 Leader
- 任期递增
- 新 Leader 使用更高任期
- 旧 Leader 的心跳被拒绝
示例:- @Override
- public void run() {
- try {
- // 只有 Leader 才发送心跳
- if (node.getNodeStatus() != ServerStatusEnum.LEADER) {
- log.debug("Current node is not LEADER, skip heartbeat");
- return;
- }
- log.debug("Sending heartbeats to all nodes, term: {}", node.getCurrentTerm());
- // 发送心跳给所有节点
- node.sendHeartbeats();
- } catch (Exception e) {
- log.error("Error in heartbeat task", e);
- }
- }
复制代码 5. 测试指南
5.1 测试步调
文件位置:- public void sendHeartbeats() {
- List<RaftNodeConfig> otherNodes = getOtherNodes();
- for (RaftNodeConfig nodeConfig : otherNodes) {
- heartbeatExecutor.execute(() -> sendHeartbeat(nodeConfig));
- }
- }
复制代码 运行方式:- private void sendHeartbeat(RaftNodeConfig targetNode) {
- try {
- // 构建心跳请求(entries 为空)
- AppendEntriesRequest request = AppendEntriesRequest.builder()
- .term(currentTerm)
- .leaderId(currentNodeConfig.getServerId())
- .entries(new ArrayList<>()) // 空列表表示心跳
- .build();
- request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
- request.setCmd(Request.APPEND_ENTRIES);
- // 发送请求
- AppendEntriesResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
- // 处理响应
- if (response != null) {
- handleHeartbeatResponse(response, targetNode.getServerId());
- }
- } catch (Exception e) {
- log.debug("Failed to send heartbeat to {}: {}", targetNode.getServerId(), e.getMessage());
- }
- }
复制代码 脚本运行:- private void startHeartbeatTimer() {
- int heartbeatInterval = raftConfig.getHeartbeatIntervalMs();
- heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(
- new HeartbeatTask(this),
- 0, // 立即开始
- heartbeatInterval, // 间隔
- TimeUnit.MILLISECONDS
- );
- log.debug("Heartbeat timer started, interval: {}ms", heartbeatInterval);
- }
复制代码 5.2 测试功能
5.2.1 根本测试场景
场景 1:正常推选- config.setHeartbeatInterval(1); // 每 1 秒发送一次心跳
复制代码 场景 2:Leader 故障- private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
- // 如果响应的任期更大,转为 Follower
- if (response.getTerm() > currentTerm) {
- log.info("Received higher term {} from {} in heartbeat response, stepping down",
- response.getTerm(), nodeId);
- becomeFollower(response.getTerm());
- }
- }
复制代码 场景 3:节点规复- public void becomeCandidate() {
- currentTerm++; // 每次选举增加任期
- }
复制代码 5.2.2 交互式下令
下令分析示例status检察全部节点状态statusleader表现当前 Leader 信息leaderkill 模仿节点故障kill node1revive 规复节点revive node1log 控制日志级别log debugstop克制全部节点并退出stop5.2.3 日志级别控制
控制方式:- // ConsensusModuleImpl.requestVote()
- if (votedFor != null && !votedFor.equals(candidateId)) {
- return new VoteResponse(currentTerm, false);
- }
复制代码 日志级别分析:
- silent/error - 仅错误信息
- warn - 告诫及以上
- info - 信息及以上(默认)
- debug - 调试信息(全部日志)
- election - 仅推选干系日志
- heartbeat - 仅心跳干系日志
5.3 预期输出
正常推选
- // VoteCounter
- public boolean hasMajority() {
- return votesReceived.size() >= majorityCount; // N/2 + 1
- }
复制代码 Leader 故障规复
- // 在 requestVote 中
- if (voteRequest.getTerm() > currentTerm) {
- node.becomeFollower(voteRequest.getTerm());
- currentTerm = node.getCurrentTerm();
- }
复制代码 5.4 完备测试流程
步调 1:启动并验证推选
- 初始状态:5 节点(node1-5),Leader=node1
- 网络分区:
- - 分区 A: node1, node2 (2 节点)
- - 分区 B: node3, node4, node5 (3 节点)
- 分区 A:
- - node1 仍是 Leader
- - node2 收不到心跳,超时后转为 Candidate
- - 只有 1 票(自己),无法获得多数派(需要 3 票)
- - 无法选出新 Leader
- 分区 B:
- - node3 超时后发起选举
- - 获得自己 + node4 + node5 的票(3 票)
- - 成为新 Leader(term=2)
- 网络恢复后:
- - node1 发送心跳(term=1)
- - 其他节点拒绝(term=2 > term=1)
- - node1 收到更高任期,转为 Follower
复制代码 步调 2:验证心跳
- LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader/ThreeNodeElectionTest.java
复制代码 步调 3:模仿 Leader 故障
- # 直接运行 main 方法
- java -cp <classpath> com.ling.raft.example.leader.ThreeNodeElectionTest
复制代码 步调 4:规复旧 Leader
- cd LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader
- start-cluster.bat
复制代码 步调 5:多次故障测试
- [STEP 1] Starting 3 nodes...
- ✓ node1 started on port 8081
- ✓ node2 started on port 8082
- ✓ node3 started on port 8083
- ✓ All nodes started!
- [STEP 2] Waiting for leader election...
- [Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
- [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
- ----------------------------------------
- ✓ Leader elected!
复制代码 6. 使用示例
6.1 根本使用
- raft> kill node1
- ✓ node1 stopped
- ! Leader killed, waiting for new election...
- [Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
- [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码 6.2 监控 推选状态
- raft> revive node1
- ✓ node1 revived
- ✓ Status: FOLLOWER
- ✓ Election timer: active
- [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码 6.3 手动触发推选
- raft> log silent
- ✓ Log level set to ERROR (silent mode)
- raft> log info
- ✓ Log level set to INFO
- raft> log debug
- ✓ Log level set to DEBUG (verbose mode)
- raft> log election
- ✓ Showing election logs only
- raft> log heartbeat
- ✓ Showing heartbeat logs only
复制代码 6.4 查询投票信息
- ╔════════════════════════════════════════════════════════════╗
- ║ Raft Leader Election Test - 3 Nodes ║
- ╚════════════════════════════════════════════════════════════╝
- [STEP 1] Starting 3 nodes...
- ✓ node1 started on port 8081
- ✓ node2 started on port 8082
- ✓ node3 started on port 8083
- ✓ All nodes started!
- [STEP 2] Waiting for leader election...
- [Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
- [Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
- [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
- ----------------------------------------
- ✓ Leader elected!
- ┌────────────────────────────────────────────────────────────┐
- │ Cluster Status │
- ├────────────┬──────────────┬─────────┬─────────┬────────────┤
- │ Node │ Status │ Term │ Log │ Voted For │
- ├────────────┼──────────────┼─────────┼─────────┼────────────┤
- │ node1 │ LEADER │ 1 │ 0 │ - │
- │ node2 │ FOLLOWER │ 1 │ 0 │ node1 │
- │ node3 │ FOLLOWER │ 1 │ 0 │ node1 │
- └────────────┴──────────────┴─────────┴─────────┴────────────┘
复制代码 7. 常见题目
7.1 为什么推选超时须要随机化?
缘故原由:
- 假如全部节点使用固定的超时时间,大概同时超时
- 同时超时的节点会同时发起推选
- 导致平票(split vote),须要重新推选
- 随机化可以克制多个节点同时超时
示例:- raft> kill node1
- Killing node1...
- ✓ node1 stopped
- ! Leader killed, waiting for new election...
- [Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
- [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
- raft> leader
- ┌────────────────────────────────────────────────────────────┐
- │ Leader Info │
- ├────────────────────────────────────────────────────────────┤
- │ Node ID: node2 │
- │ Address: 127.0.0.1:8082 │
- │ Term: 2 │
- └────────────────────────────────────────────────────────────┘
- raft> revive node1
- Reviving node1...
- ✓ node1 revived
- ✓ Status: FOLLOWER
- ✓ Election timer: active
- [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码 代码实现:- # 运行测试程序
- java com.ling.raft.example.leader.ThreeNodeElectionTest
- # 观察选举过程
- [Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
- [Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
- [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
- # 查看当前状态
- raft> status
复制代码 7.2 为什么收到投票哀求后要重置超时?
缘故原由:
- 收到投票哀求表现至少有一个其他节点是活泼的
- 重置超时可以镌汰不须要的推选
- 克制频仍切换状态
代码实现:- # 等待几秒,观察心跳
- [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
- [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
- [Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
- # 启用心跳日志观察
- raft> log heartbeat
- ✓ Showing heartbeat logs only
复制代码 7.3 为什么 Candidate 要增长任期?
缘故原由:
- 克制使用旧任期发起新的推选
- 区分差异轮的推选
- 包管任期单调递增
代码实现:- # 杀死 Leader
- raft> kill node1
- ✓ node1 stopped
- ! Leader killed, waiting for new election...
- # 观察新选举
- [Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
- [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码 7.4 怎样处理处罚网络分区?
Raft 的包管:
- 旧 Leader 无法得到多数派,无法提交新日志
- 新 Leader 会在多数派分区推选产生
- 网络规复后,旧 Leader 会转为 Follower
代码表现:- # 恢复节点
- raft> revive node1
- ✓ node1 revived
- # 观察恢复过程
- [Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
复制代码 7.5 为什么心跳隔断通常远小于推选超时?
缘故原由:
- 心跳隔断短(如 100ms),推选超时长(如 200-300ms)
- 确保 Follower 在超时前收到心跳
- 克制不须要的推选
设置示例:- # 持续故障恢复
- raft> kill node2
- raft> kill node3
- raft> revive node2
- raft> revive node3
复制代码 发起设置:- // 1. 创建节点配置
- RaftNodeConfig node1 = new RaftNodeConfig("node1", "127.0.0.1", 8081);
- RaftNodeConfig node2 = new RaftNodeConfig("node2", "127.0.0.1", 8082);
- RaftNodeConfig node3 = new RaftNodeConfig("node3", "127.0.0.1", 8083);
- List<RaftNodeConfig> allNodes = Arrays.asList(node1, node2, node3);
- // 2. 创建 Raft 配置
- RaftConfig config1 = new RaftConfig(node1, allNodes);
- config1.setElectionTimeout(2); // 基础超时倍数
- config1.setElectionTimeoutRandomRange(Range.of(150, 300)); // 随机范围
- config1.setHeartbeatInterval(1); // 心跳间隔 1 秒
- // 3. 创建 RPC 组件
- DefaultRpcServer rpcServer1 = new DefaultRpcServer(node1.getPort(), null);
- DefaultRpcClient rpcClient1 = new DefaultRpcClient();
- // 4. 创建并初始化 Raft 节点
- RaftNodeImpl raftNode1 = new RaftNodeImpl(config1, rpcServer1, rpcClient1);
- rpcServer1.setRaftNode(raftNode1);
- raftNode1.init();
- // 5. 等待选举
- Thread.sleep(2000);
- // 6. 检查节点状态
- if (raftNode1.getNodeStatus() == ServerStatusEnum.LEADER) {
- System.out.println("Node1 is Leader, term: " + raftNode1.getCurrentTerm());
- }
复制代码 7.6 怎样克制平票(split vote)?
平票场景:- // 创建监控
线程 - Thread monitor = new Thread(() -> {
- while (true) {
- System.out.printf("Node1: %s(t%d) ",
- raftNode1.getNodeStatus(),
- raftNode1.getCurrentTerm());
- System.out.printf("Node2: %s(t%d) ",
- raftNode2.getNodeStatus(),
- raftNode2.getCurrentTerm());
- System.out.printf("Node3: %s(t%d)\n",
- raftNode3.getNodeStatus(),
- raftNode3.getCurrentTerm());
- Thread.sleep(3000);
- }
- });
- monitor.setDaemon(true);
- monitor.start();
复制代码 克制方法:
- 随机化超时(已实现)
- 预投票(Pre-vote)(未实现)
- 先扣问其他节点是否乐意投票
- 假如多数派同意,再真正发起推选
- 快速重试(未实现)
当前实现:
7.7 为什么单机模式直接成为 Leader?
缘故原由:
- 单机集群不须要推选
- 只有一个节点,自己就是多数派
- 进步启动速率
代码实现:- // 停止 Leader 的心跳定时器
- raftNode1.cancelHeartbeatTimer();
- // 模拟 Follower 超时
- raftNode2.resetElectionTimer(); // 重置超时
- // 等待超时后,node2 会自动发起选举
复制代码 7.8 怎样调优推选参数?
参数发起:
参数保举值分析心跳隔断50ms - 100ms越短越快,但网络开销大推选超时最小150ms - 200ms应该 > 心跳隔断推选超时最大300ms - 400ms应该是心跳隔断的 3-5 倍RPC 超时2000ms - 3000ms应该 > 推选超时调优示例:- // 获取当前投票信息
- String votedFor = raftNode1.getVotedFor();
- long currentTerm = raftNode1.getCurrentTerm();
- ServerStatusEnum status = raftNode1.getNodeStatus();
- System.out.println("Node1 - Status: " + status + ", Term: " + currentTerm + ", VotedFor: " + votedFor);
- // 如果是 Candidate,查看投票计数器
- if (status == ServerStatusEnum.CANDIDATE) {
- VoteCounter counter = raftNode1.getCurrentVoteCounter();
- System.out.println("Votes: " + counter.getVoteCount() + "/" + counter.getMajorityCount());
- }
复制代码 附录
A. 术语表
术语分析Term任期号,单调递增,用于辨认 LeaderElection Timeout推选超时时间,随机化克制平票Heartbeat心跳,Leader 定期发送维持职位Majority多数派,高出半数的节点(N/2 + 1)Split Vote平票推选,没有节点得到多数派Candidate候选节点,发起推选的节点Leader主节点,处理处罚客户端哀求Follower从节点,相应 Leader 的哀求B. 参考资料
C. 干系文件
文件路径RaftNodeImplcom.ling.raft.core.RaftNodeImplConsensusModuleImplcom.ling.raft.core.ConsensusModuleImplVoteCountercom.ling.raft.core.VoteCounterElectionTaskcom.ling.raft.core.task.ElectionTaskHeartbeatTaskcom.ling.raft.core.task.HeartbeatTaskServerStatusEnumcom.ling.raft.enums.ServerStatusEnumVoteRequestcom.ling.raft.model.dto.VoteRequestVoteResponsecom.ling.raft.model.dto.VoteResponseThreeNodeElectionTestcom.ling.raft.example.leader.ThreeNodeElectionTest
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金. |