[Raft共识算法] Dragonboat Log Replication 代码走读

打印 上一主题 下一主题

主题 1724|帖子 1724|积分 5172

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Dragonboat Log Replication 代码走读

Dragonboat 是一个开源的高性能Go实现的Raft共识协议实现. 具有良好的性能和久经社区检验的鲁棒性, 机遇巧合, 接触到. 因此决定结合Raft博士论文走读其源码. 今天带来Raft中三大核心之一的日志复制Log Replication的代码走读.
Dragonboat Log Replication代码实现结构

![Dragonboat log replication](/Users/tanghangyun/Documents/Dragonboat log replication.png)
Dragonboat中的网络接口调用主要在node.go文件中实现, 作者提供了对网络接口的抽象, 可以自由实现底层的网络交互方法. 本次讨论仅涉及对这些网络接口的代用逻辑, 也就是工作流的讲解, 不涉及网络协议底层实现的逻辑讨论. 作者在protobuf中定义了msg.Tpye, 并通过路由函数将不同Type的msg路由到不同的Handler函数进行处理.
msg Type 及其路由处理函数解读

先介绍根据msg.Type 进行路由的路由函数
路由函数 initializeHandlerMap
  1. func (r *raft) Handle(m pb.Message) {
  2.         if !r.onMessageTermNotMatched(m) {
  3.                 r.doubleCheckTermMatched(m.Term)
  4.         r.handle(r, m)
  5.     } ...
  6. }
  7. func (r *raft) initializeHandlerMap() {
  8.         // candidate
  9. ...
  10.         // follower
  11.         r.handlers[follower][pb.Propose] = r.handleFollowerPropose
  12.         r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate
  13.         r.handlers[follower][pb.Heartbeat] = r.handleFollowerHeartbeat
  14.         r.handlers[follower][pb.ReadIndex] = r.handleFollowerReadIndex
  15.         r.handlers[follower][pb.LeaderTransfer] = r.handleFollowerLeaderTransfer
  16.         r.handlers[follower][pb.ReadIndexResp] = r.handleFollowerReadIndexResp
  17.         r.handlers[follower][pb.InstallSnapshot] = r.handleFollowerInstallSnapshot
  18.         r.handlers[follower][pb.Election] = r.handleNodeElection
  19.         r.handlers[follower][pb.RequestVote] = r.handleNodeRequestVote
  20.         r.handlers[follower][pb.TimeoutNow] = r.handleFollowerTimeoutNow
  21.         r.handlers[follower][pb.ConfigChangeEvent] = r.handleNodeConfigChange
  22.         r.handlers[follower][pb.LocalTick] = r.handleLocalTick
  23.         r.handlers[follower][pb.SnapshotReceived] = r.handleRestoreRemote
  24.         // leader
  25.         r.handlers[leader][pb.LeaderHeartbeat] = r.handleLeaderHeartbeat
  26.         r.handlers[leader][pb.CheckQuorum] = r.handleLeaderCheckQuorum
  27.         r.handlers[leader][pb.Propose] = r.handleLeaderPropose
  28.         r.handlers[leader][pb.ReadIndex] = r.handleLeaderReadIndex
  29.         r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)
  30.         r.handlers[leader][pb.HeartbeatResp] = lw(r, r.handleLeaderHeartbeatResp)
  31.         r.handlers[leader][pb.SnapshotStatus] = lw(r, r.handleLeaderSnapshotStatus)
  32.         r.handlers[leader][pb.Unreachable] = lw(r, r.handleLeaderUnreachable)
  33.         r.handlers[leader][pb.LeaderTransfer] = r.handleLeaderTransfer
  34.         r.handlers[leader][pb.Election] = r.handleNodeElection
  35.         r.handlers[leader][pb.RequestVote] = r.handleNodeRequestVote
  36.         r.handlers[leader][pb.ConfigChangeEvent] = r.handleNodeConfigChange
  37.         r.handlers[leader][pb.LocalTick] = r.handleLocalTick
  38.         r.handlers[leader][pb.SnapshotReceived] = r.handleRestoreRemote
  39.         r.handlers[leader][pb.RateLimit] = r.handleLeaderRateLimit
  40.         // observer
  41. ...
  42.         // witness
  43. ...
  44. }
复制代码
重点需要关注的函数是        r.handlers[follower][pb.Propose] = r.handleFollowerPropose, r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate,         r.handlers[leader][pb.Propose] = r.handleLeaderPropose,        r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)这四个函数. 分别对应Follower处理Proposal消息和Replicate消息; 以及Leader处理ProposalS和ReplicateResp消息. 接下来分别阅读上述四个函数. 以及上述四个函数后续的调用栈. 最终在本地调用栈结束于send函数. send函数十分简单仅仅将msgs添加到r.msgs领域中. 之后将有node扫描raft的msgs领域和logs领域中的缓存消息, 并发起网络交互.
send
  1. func (r *raft) send(m pb.Message) {
  2.    m.From = r.nodeID
  3.    m = r.finalizeMessageTerm(m)
  4.    r.msgs = append(r.msgs, m)
  5. }
复制代码
更新msg的任期以及原节点id信息, 后添加到raft的msgs领域.
handleFollowerPropose
  1. func (r *raft) handleFollowerPropose(m pb.Message) {
  2.         if r.leaderID == NoLeader {
  3.                 plog.Warningf("%s dropped proposal, no leader", r.describe())
  4.                 r.reportDroppedProposal(m)
  5.                 return
  6.         }
  7.         m.To = r.leaderID
  8.         // the message might be queued by the transport layer, this violates the
  9.         // requirement of the entryQueue.get() func. copy the m.Entries to its
  10.         // own space.
  11.         m.Entries = newEntrySlice(m.Entries)
  12.         r.send(m)
  13. }
复制代码
Follower接到客户端的proposal(提议) 后需要将提议转发给主节点, 因此更新完msg.To 目的节点信息后立刻转发. 调用send函数.
handleLeaderPropose 及其后续函数
  1. func (r *raft) handleLeaderPropose(m pb.Message) {
  2.         r.mustBeLeader()
  3.         if r.leaderTransfering() {
  4.                 plog.Warningf("%s dropped proposal, leader transferring", r.describe())
  5.                 r.reportDroppedProposal(m)
  6.                 return
  7.         }
  8.         for i, e := range m.Entries {
  9.                 if e.Type == pb.ConfigChangeEntry {
  10.                         if r.hasPendingConfigChange() {
  11.                                 plog.Warningf("%s dropped config change, pending change", r.describe())
  12.                                 r.reportDroppedConfigChange(m.Entries[i])
  13.                                 m.Entries[i] = pb.Entry{Type: pb.ApplicationEntry}
  14.                         }
  15.                         r.setPendingConfigChange()
  16.                 }
  17.         }
  18.         r.appendEntries(m.Entries)
  19.         r.broadcastReplicateMessage()
  20. }
复制代码
前18行代码都不是我们关注的重点: 大体进行一下在确认主节点完毕之后, 判断当前集群状态, 以及配置变更的操作. 最后两行的带吗引起我的的注意. 他们分别是        r.appendEntries(m.Entries)和        r.broadcastReplicateMessage().
  1. func (r *raft) appendEntries(entries []pb.Entry) {
  2.         lastIndex := r.log.lastIndex()
  3.         for i := range entries {
  4.                 entries[i].Term = r.term
  5.                 entries[i].Index = lastIndex + 1 + uint64(i)
  6.         }
  7.         r.log.append(entries)
  8.         r.remotes[r.nodeID].tryUpdate(r.log.lastIndex())
  9.         if r.isSingleNodeQuorum() {
  10.                 r.tryCommit()
  11.         }
  12. }
复制代码
在appendEntries中更新每个entry的term和Index信息. 并将这些entries添加到r.log中.
  1. func (r *raft) broadcastReplicateMessage() {
  2.         r.mustBeLeader()
  3.         for nid := range r.observers {
  4.                 if nid == r.nodeID {
  5.                         plog.Panicf("%s observer is broadcasting Replicate msg", r.describe())
  6.                 }
  7.         }
  8.         for _, nid := range r.nodes() {
  9.                 if nid != r.nodeID {
  10.                         r.sendReplicateMessage(nid)
  11.                 }
  12.         }
  13. }
复制代码
在broadcastReplicateMessage方法中, 检查完leader之后, 调用r.sendReplicateMessage(nid)来实现消息的发送.
  1. func (r *raft) sendReplicateMessage(to uint64) {
  2.         var rp *remote
  3.         if v, ok := r.remotes[to]; ok {
  4.                 rp = v
  5.         } else if v, ok := r.observers[to]; ok {
  6.                 rp = v
  7.         } else {
  8.                 rp, ok = r.witnesses[to]
  9.                 if !ok {
  10.                         plog.Panicf("%s failed to get the remote instance", r.describe())
  11.                 }
  12.         }
  13.         if rp.isPaused() {
  14.                 return
  15.         }
  16.         m, err := r.makeReplicateMessage(to, rp.next, maxEntrySize)
  17.         if err != nil {
  18.                 // log not available due to compaction, send snapshot
  19.                 if !rp.isActive() {
  20.                         plog.Warningf("%s, %s is not active, sending snapshot is skipped",
  21.                                 r.describe(), NodeID(to))
  22.                         return
  23.                 }
  24.                 index := r.makeInstallSnapshotMessage(to, &m)
  25.                 plog.Infof("%s is sending snapshot (%d) to %s, r.Next %d, r.Match %d, %v",
  26.                         r.describe(), index, NodeID(to), rp.next, rp.match, err)
  27.                 rp.becomeSnapshot(index)
  28.         } else if len(m.Entries) > 0 {
  29.                 lastIndex := m.Entries[len(m.Entries)-1].Index
  30.                 rp.progress(lastIndex)
  31.         }
  32.         r.send(m)
  33. }
复制代码
该消息发送函数进行了一系列状态检查和判断之后, 最后一行语句点明主旨. 还是调用本段开始所述的send方法.
handleFollowerReplicate
  1. func (r *raft) handleFollowerReplicate(m pb.Message) {
  2.         r.leaderIsAvailable()
  3.         r.setLeaderID(m.From)
  4.         r.handleReplicateMessage(m)
  5. }
复制代码
前两行判断leader的信息. 最后一行调用r.handleReplicateMessage(m)方法处理Replicate信息.
在处理Replicate msg的过程中, 根据comitted信息的不同将有两种逻辑, 分别对应日志的复制和日志的提交.
[code]func (r *raft) handleReplicateMessage(m pb.Message) {        resp := pb.Message{                To:   m.From,                Type: pb.ReplicateResp,        }        if m.LogIndex < r.log.committed {                resp.LogIndex = r.log.committed                r.send(resp)                return        }        if r.log.matchTerm(m.LogIndex, m.LogTerm) {                r.log.tryAppend(m.LogIndex, m.Entries)                lastIdx := m.LogIndex + uint64(len(m.Entries))                r.log.commitTo(min(lastIdx, m.Commit))                resp.LogIndex = lastIdx        } else {                plog.Debugf("%s rejected Replicate index %d term %d from %s",                        r.describe(), m.LogIndex, m.Term, NodeID(m.From))                resp.Reject = true                resp.LogIndex = m.LogIndex                resp.Hint = r.log.lastIndex()                if r.events != nil {                        info := server.ReplicationInfo{                                ClusterID: r.clusterID,                                NodeID:    r.nodeID,                                Index:     m.LogIndex,                                Term:      m.LogTerm,                                From:      m.From,                        }                        r.events.ReplicationRejected(info)                }        }        r.send(resp)}func (l *entryLog) tryAppend(index uint64, ents []pb.Entry) bool {        conflictIndex := l.getConflictIndex(ents)        if conflictIndex != 0 {                if conflictIndex
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曂沅仴駦

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表