马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Dragonboat Log Replication 代码走读
Dragonboat 是一个开源的高性能Go实现的Raft共识协议实现. 具有良好的性能和久经社区检验的鲁棒性, 机遇巧合, 接触到. 因此决定结合Raft博士论文走读其源码. 今天带来Raft中三大核心之一的日志复制Log Replication的代码走读.
Dragonboat Log Replication代码实现结构

Dragonboat中的网络接口调用主要在node.go文件中实现, 作者提供了对网络接口的抽象, 可以自由实现底层的网络交互方法. 本次讨论仅涉及对这些网络接口的代用逻辑, 也就是工作流的讲解, 不涉及网络协议底层实现的逻辑讨论. 作者在protobuf中定义了msg.Tpye, 并通过路由函数将不同Type的msg路由到不同的Handler函数进行处理.
msg Type 及其路由处理函数解读
先介绍根据msg.Type 进行路由的路由函数
路由函数 initializeHandlerMap
- func (r *raft) Handle(m pb.Message) {
- if !r.onMessageTermNotMatched(m) {
- r.doubleCheckTermMatched(m.Term)
- r.handle(r, m)
- } ...
- }
- func (r *raft) initializeHandlerMap() {
- // candidate
- ...
- // follower
- r.handlers[follower][pb.Propose] = r.handleFollowerPropose
- r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate
- r.handlers[follower][pb.Heartbeat] = r.handleFollowerHeartbeat
- r.handlers[follower][pb.ReadIndex] = r.handleFollowerReadIndex
- r.handlers[follower][pb.LeaderTransfer] = r.handleFollowerLeaderTransfer
- r.handlers[follower][pb.ReadIndexResp] = r.handleFollowerReadIndexResp
- r.handlers[follower][pb.InstallSnapshot] = r.handleFollowerInstallSnapshot
- r.handlers[follower][pb.Election] = r.handleNodeElection
- r.handlers[follower][pb.RequestVote] = r.handleNodeRequestVote
- r.handlers[follower][pb.TimeoutNow] = r.handleFollowerTimeoutNow
- r.handlers[follower][pb.ConfigChangeEvent] = r.handleNodeConfigChange
- r.handlers[follower][pb.LocalTick] = r.handleLocalTick
- r.handlers[follower][pb.SnapshotReceived] = r.handleRestoreRemote
- // leader
- r.handlers[leader][pb.LeaderHeartbeat] = r.handleLeaderHeartbeat
- r.handlers[leader][pb.CheckQuorum] = r.handleLeaderCheckQuorum
- r.handlers[leader][pb.Propose] = r.handleLeaderPropose
- r.handlers[leader][pb.ReadIndex] = r.handleLeaderReadIndex
- r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)
- r.handlers[leader][pb.HeartbeatResp] = lw(r, r.handleLeaderHeartbeatResp)
- r.handlers[leader][pb.SnapshotStatus] = lw(r, r.handleLeaderSnapshotStatus)
- r.handlers[leader][pb.Unreachable] = lw(r, r.handleLeaderUnreachable)
- r.handlers[leader][pb.LeaderTransfer] = r.handleLeaderTransfer
- r.handlers[leader][pb.Election] = r.handleNodeElection
- r.handlers[leader][pb.RequestVote] = r.handleNodeRequestVote
- r.handlers[leader][pb.ConfigChangeEvent] = r.handleNodeConfigChange
- r.handlers[leader][pb.LocalTick] = r.handleLocalTick
- r.handlers[leader][pb.SnapshotReceived] = r.handleRestoreRemote
- r.handlers[leader][pb.RateLimit] = r.handleLeaderRateLimit
- // observer
- ...
- // witness
- ...
- }
复制代码 重点需要关注的函数是 r.handlers[follower][pb.Propose] = r.handleFollowerPropose, r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate, r.handlers[leader][pb.Propose] = r.handleLeaderPropose, r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)这四个函数. 分别对应Follower处理Proposal消息和Replicate消息; 以及Leader处理ProposalS和ReplicateResp消息. 接下来分别阅读上述四个函数. 以及上述四个函数后续的调用栈. 最终在本地调用栈结束于send函数. send函数十分简单仅仅将msgs添加到r.msgs领域中. 之后将有node扫描raft的msgs领域和logs领域中的缓存消息, 并发起网络交互.
send
- func (r *raft) send(m pb.Message) {
- m.From = r.nodeID
- m = r.finalizeMessageTerm(m)
- r.msgs = append(r.msgs, m)
- }
复制代码 更新msg的任期以及原节点id信息, 后添加到raft的msgs领域.
handleFollowerPropose
- func (r *raft) handleFollowerPropose(m pb.Message) {
- if r.leaderID == NoLeader {
- plog.Warningf("%s dropped proposal, no leader", r.describe())
- r.reportDroppedProposal(m)
- return
- }
- m.To = r.leaderID
- // the message might be queued by the transport layer, this violates the
- // requirement of the entryQueue.get() func. copy the m.Entries to its
- // own space.
- m.Entries = newEntrySlice(m.Entries)
- r.send(m)
- }
复制代码 Follower接到客户端的proposal(提议) 后需要将提议转发给主节点, 因此更新完msg.To 目的节点信息后立刻转发. 调用send函数.
handleLeaderPropose 及其后续函数
- func (r *raft) handleLeaderPropose(m pb.Message) {
- r.mustBeLeader()
- if r.leaderTransfering() {
- plog.Warningf("%s dropped proposal, leader transferring", r.describe())
- r.reportDroppedProposal(m)
- return
- }
- for i, e := range m.Entries {
- if e.Type == pb.ConfigChangeEntry {
- if r.hasPendingConfigChange() {
- plog.Warningf("%s dropped config change, pending change", r.describe())
- r.reportDroppedConfigChange(m.Entries[i])
- m.Entries[i] = pb.Entry{Type: pb.ApplicationEntry}
- }
- r.setPendingConfigChange()
- }
- }
- r.appendEntries(m.Entries)
- r.broadcastReplicateMessage()
- }
复制代码 前18行代码都不是我们关注的重点: 大体进行一下在确认主节点完毕之后, 判断当前集群状态, 以及配置变更的操作. 最后两行的带吗引起我的的注意. 他们分别是 r.appendEntries(m.Entries)和 r.broadcastReplicateMessage().- func (r *raft) appendEntries(entries []pb.Entry) {
- lastIndex := r.log.lastIndex()
- for i := range entries {
- entries[i].Term = r.term
- entries[i].Index = lastIndex + 1 + uint64(i)
- }
- r.log.append(entries)
- r.remotes[r.nodeID].tryUpdate(r.log.lastIndex())
- if r.isSingleNodeQuorum() {
- r.tryCommit()
- }
- }
复制代码 在appendEntries中更新每个entry的term和Index信息. 并将这些entries添加到r.log中.- func (r *raft) broadcastReplicateMessage() {
- r.mustBeLeader()
- for nid := range r.observers {
- if nid == r.nodeID {
- plog.Panicf("%s observer is broadcasting Replicate msg", r.describe())
- }
- }
- for _, nid := range r.nodes() {
- if nid != r.nodeID {
- r.sendReplicateMessage(nid)
- }
- }
- }
复制代码 在broadcastReplicateMessage方法中, 检查完leader之后, 调用r.sendReplicateMessage(nid)来实现消息的发送.- func (r *raft) sendReplicateMessage(to uint64) {
- var rp *remote
- if v, ok := r.remotes[to]; ok {
- rp = v
- } else if v, ok := r.observers[to]; ok {
- rp = v
- } else {
- rp, ok = r.witnesses[to]
- if !ok {
- plog.Panicf("%s failed to get the remote instance", r.describe())
- }
- }
- if rp.isPaused() {
- return
- }
- m, err := r.makeReplicateMessage(to, rp.next, maxEntrySize)
- if err != nil {
- // log not available due to compaction, send snapshot
- if !rp.isActive() {
- plog.Warningf("%s, %s is not active, sending snapshot is skipped",
- r.describe(), NodeID(to))
- return
- }
- index := r.makeInstallSnapshotMessage(to, &m)
- plog.Infof("%s is sending snapshot (%d) to %s, r.Next %d, r.Match %d, %v",
- r.describe(), index, NodeID(to), rp.next, rp.match, err)
- rp.becomeSnapshot(index)
- } else if len(m.Entries) > 0 {
- lastIndex := m.Entries[len(m.Entries)-1].Index
- rp.progress(lastIndex)
- }
- r.send(m)
- }
复制代码 该消息发送函数进行了一系列状态检查和判断之后, 最后一行语句点明主旨. 还是调用本段开始所述的send方法.
handleFollowerReplicate
- func (r *raft) handleFollowerReplicate(m pb.Message) {
- r.leaderIsAvailable()
- r.setLeaderID(m.From)
- r.handleReplicateMessage(m)
- }
复制代码 前两行判断leader的信息. 最后一行调用r.handleReplicateMessage(m)方法处理Replicate信息.
在处理Replicate msg的过程中, 根据comitted信息的不同将有两种逻辑, 分别对应日志的复制和日志的提交.
[code]func (r *raft) handleReplicateMessage(m pb.Message) { resp := pb.Message{ To: m.From, Type: pb.ReplicateResp, } if m.LogIndex < r.log.committed { resp.LogIndex = r.log.committed r.send(resp) return } if r.log.matchTerm(m.LogIndex, m.LogTerm) { r.log.tryAppend(m.LogIndex, m.Entries) lastIdx := m.LogIndex + uint64(len(m.Entries)) r.log.commitTo(min(lastIdx, m.Commit)) resp.LogIndex = lastIdx } else { plog.Debugf("%s rejected Replicate index %d term %d from %s", r.describe(), m.LogIndex, m.Term, NodeID(m.From)) resp.Reject = true resp.LogIndex = m.LogIndex resp.Hint = r.log.lastIndex() if r.events != nil { info := server.ReplicationInfo{ ClusterID: r.clusterID, NodeID: r.nodeID, Index: m.LogIndex, Term: m.LogTerm, From: m.From, } r.events.ReplicationRejected(info) } } r.send(resp)}func (l *entryLog) tryAppend(index uint64, ents []pb.Entry) bool { conflictIndex := l.getConflictIndex(ents) if conflictIndex != 0 { if conflictIndex |