zookeeper源码(04)leader选举流程

九天猎人  金牌会员 | 2023-12-8 17:41:33 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 891|帖子 891|积分 2673

在"zookeeper源码(03)集群启动流程"中介绍了leader选举的入口,本文将详细分析leader选举组件和流程。
leader选举流程(重要)


  • quorumPeer的start阶段使用startLeaderElection()方法启动选举
  • LOOKING状态,投自己一票
  • createElectionAlgorithm - 创建选举核心组件:QuorumCnxManager(管理连接)、FastLeaderElection(选举)等
  • quorumPeer的main loop根据当前状态执行不同流程
状态与流程:

  • LOOKING - 使用fastLeaderElection.lookForLeader选举

    • 递增选举epoch开启新一轮选举
    • 使用自己的serverId、zxid、currentEpoch初始化投票决议
    • 把选票发出去
    • 循环接收其他server的选票:

      • LOOKING选票:对比选举epoch、currentEpoch、zxid、serverId决定投给哪个server,若是超过半数节点同意该决议,则将该server确定为leader
      • FOLLOWING选票:对比选举epoch后将选票投给当前leader
      • LEADING选票:对比选举epoch后将选票投给当前leader


  • LEADING - 创建Leader对象执行lead逻辑

    • zkServer加载数据
    • 启动quorum监听
    • 根据各个follower的当前epoch确定新的epoch和zxid
    • 给follower同步数据
    • 启动zkServer
    • 每间隔tick验证多数follower同步状态

  • FOLLOWING - 创建Follower对象指定followLeader逻辑

    • connectToLeader - 连接leader服务器
    • registerWithLeader - 向leader发送当前epoch,等待leader发送新一轮的epoch
    • syncWithLeader - 接收leader同步的数据:txnlog、committedlog、snapshot
    • 保持通信处理来自leader的数据包

  • OBSERVING - 创建Observer对象执行observeLeader逻辑,基本与FOLLOWING相同
启动leader选举

QuorumPeer的startLeaderElection方法是启动选举的入口:
  1. public synchronized void startLeaderElection() {
  2.     try {
  3.         if (getPeerState() == ServerState.LOOKING) {
  4.             // 投自己一票,封装zxid和epoch
  5.             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
  6.         }
  7.     } catch (IOException e) {
  8.         RuntimeException re = new RuntimeException(e.getMessage());
  9.         re.setStackTrace(e.getStackTrace());
  10.         throw re;
  11.     }
  12.     // electionType总是3
  13.     this.electionAlg = createElectionAlgorithm(electionType);
  14. }
  15. protected Election createElectionAlgorithm(int electionAlgorithm) {
  16.     Election le = null;
  17.     // TODO: use a factory rather than a switch
  18.     // 可以使用策略模式替换switch语句
  19.     switch (electionAlgorithm) {
  20.     case 1:
  21.         throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
  22.     case 2:
  23.         throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
  24.     case 3:
  25.         QuorumCnxManager qcm = createCnxnManager();
  26.         QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
  27.         // 关闭oldQcm
  28.         if (oldQcm != null) {
  29.             oldQcm.halt();
  30.         }
  31.         // 用来启动serverSocket监听
  32.         QuorumCnxManager.Listener listener = qcm.listener;
  33.         if (listener != null) {
  34.             listener.start();
  35.             FastLeaderElection fle = new FastLeaderElection(this, qcm);
  36.             fle.start();
  37.             le = fle;
  38.         }
  39.         break;
  40.     default:
  41.         assert false;
  42.     }
  43.     return le;
  44. }
  45. public QuorumCnxManager createCnxnManager() {
  46.     // socket超时设置使用,默认tickTime * syncLimit
  47.     // 按照zoo_sample.cfg文件配置是2000 * 5
  48.     int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
  49.     return new QuorumCnxManager(
  50.         this,
  51.         this.getMyId(),
  52.         this.getView(), // serverId->quorumServer
  53.         this.authServer,
  54.         this.authLearner,
  55.         timeout,
  56.         this.getQuorumListenOnAllIPs(), // 是否监听所有IP默认false
  57.         this.quorumCnxnThreadsSize, // 默认20
  58.         this.isQuorumSaslAuthEnabled());
  59. }
复制代码
QuorumCnxManager类

概述:
  1. This class implements a connection manager for leader election using TCP.
  2. It maintains one connection for every pair of servers. The tricky part is to guarantee that there is exactly one connection for every pair of servers that are operating correctly and that can communicate over the network. If two servers try to start a connection concurrently, then the connection manager uses a very simple tie-breaking mechanism to decide which connection to drop based on the IP addressed of the two parties.
  3. For every peer, the manager maintains a queue of messages to send. If the connection to any particular peer drops, then the sender thread puts the message back on the list. As this implementation currently uses a queue implementation to maintain messages to send to another peer, we add the message to the tail of the queue, thus changing the order of messages. Although this is not a problem for the leader election, it could be a problem when consolidating peer communication. This is to be verified, though.
复制代码

  • 维护leader选举时server之间的tcp连接
  • 确保两个server之间存在一个连接,如果两个server同时建立连接,则始终保留id大的一方建立的连接
  • 使用队列缓存待发送的消息
主要字段
  1. // 用于执行QuorumConnectionReqThread和QuorumConnectionReceiverThread
  2. private ThreadPoolExecutor connectionExecutor;
  3. // 管理sid -> SendWorker/BlockingQueue/ByteBuffer
  4. final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
  5. final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
  6. final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
  7. // 接收队列
  8. public final BlockingQueue<Message> recvQueue;
复制代码
主要方法
  1. public void initiateConnection(final MultipleAddresses electionAddr, final Long sid);
  2. // 将initiateConnection方法放到了QuorumConnectionReqThread中然后提交给connectionExecutor异步执行
  3. public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid);
  4. private boolean startConnection(Socket sock, Long sid) throws IOException;
  5. public void receiveConnection(final Socket sock);
  6. // 将receiveConnection方法放到了QuorumConnectionReceiverThread中然后提交给connectionExecutor异步执行
  7. public void receiveConnectionAsync(final Socket sock);
  8. public void toSend(Long sid, ByteBuffer b);
  9. boolean connectOne(long sid, MultipleAddresses electionAddr);
  10. void connectOne(long sid);
  11. public void connectAll();
复制代码
其余工具方法不分析。
initiateConnection方法

创建Socket对象,如有必要则做ssl握手和认证,发送初始化数据包。如果自己id小则关闭连接,以确保两个server之间存在一个连接。
  1. public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {
  2.     Socket sock = null;
  3.     try {
  4.         // 创建Socket
  5.         if (self.isSslQuorum()) {
  6.             sock = self.getX509Util().createSSLSocket();
  7.         } else {
  8.             sock = SOCKET_FACTORY.get();
  9.         }
  10.         setSockOpts(sock); // socket设置例如timeout
  11.         // 连接目标peer
  12.         sock.connect(electionAddr.getReachableOrOne(), cnxTO);
  13.         // ssl握手
  14.         if (sock instanceof SSLSocket) {
  15.             SSLSocket sslSock = (SSLSocket) sock;
  16.             sslSock.startHandshake();
  17.         }
  18.     } catch (X509Exception e) {
  19.         closeSocket(sock);
  20.         return;
  21.     } catch (UnresolvedAddressException | IOException e) {
  22.         closeSocket(sock);
  23.         return;
  24.     }
  25.     try {
  26.         // 发连接初始化数据包、sasl认证
  27.         // 如果selfId小于对方,关闭连接
  28.         // 创建SendWorker、RecvWorker并启动
  29.         // 创建对应sid的发送队列
  30.         startConnection(sock, sid);
  31.     } catch (IOException e) {
  32.         closeSocket(sock);
  33.     }
  34. }
复制代码
startConnection方法


  • 发连接初始化数据包、sasl认证
  • 如果selfId小于对方,关闭连接
  • 创建SendWorker、RecvWorker并启动
  • 创建对应sid的发送队列
  1. private boolean startConnection(Socket sock, Long sid) throws IOException {
  2.     DataOutputStream dout = null;
  3.     DataInputStream din = null;
  4.     try {
  5.         // 输出流
  6.         BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
  7.         dout = new DataOutputStream(buf);
  8.         // 发协议版本、myid、address初始化数据包
  9.         long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
  10.         dout.writeLong(protocolVersion);
  11.         dout.writeLong(self.getMyId());
  12.         // now we send our election address. For the new protocol version, we can send multiple addresses.
  13.         Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
  14.                 ? self.getElectionAddress().getAllAddresses()
  15.                 : Arrays.asList(self.getElectionAddress().getOne());
  16.         String addr = addressesToSend.stream()
  17.                 .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
  18.         byte[] addr_bytes = addr.getBytes();
  19.         dout.writeInt(addr_bytes.length);
  20.         dout.write(addr_bytes);
  21.         dout.flush();
  22.         din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
  23.     } catch (IOException e) {
  24.         closeSocket(sock);
  25.         return false;
  26.     }
  27.     // authenticate learner
  28.     QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
  29.     if (qps != null) {
  30.         authLearner.authenticate(sock, qps.hostname);
  31.     }
  32.     if (sid > self.getMyId()) { // If lost the challenge, then drop the new connection
  33.         closeSocket(sock);
  34.     } else {
  35.         // 创建SendWorker、RecvWorker
  36.         SendWorker sw = new SendWorker(sock, sid);
  37.         RecvWorker rw = new RecvWorker(sock, din, sid, sw);
  38.         sw.setRecv(rw);
  39.         SendWorker vsw = senderWorkerMap.get(sid);
  40.         if (vsw != null) {
  41.             vsw.finish();
  42.         }
  43.         senderWorkerMap.put(sid, sw);
  44.         // 创建发送队列
  45.         queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
  46.         sw.start();
  47.         rw.start();
  48.         return true;
  49.     }
  50.     return false;
  51. }
复制代码
receiveConnection方法

当server收到连接请求,如果change获胜(selfId大于对方),将关闭该连接,由自己去连接对方。
  1. public void receiveConnection(final Socket sock) {
  2.     DataInputStream din = null;
  3.     try {
  4.         // 输入流
  5.         din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
  6.         handleConnection(sock, din);
  7.     } catch (IOException e) {
  8.         closeSocket(sock);
  9.     }
  10. }
  11. private void handleConnection(Socket sock, DataInputStream din) throws IOException {
  12.     Long sid = null, protocolVersion = null;
  13.     MultipleAddresses electionAddr = null;
  14.     try {
  15.         protocolVersion = din.readLong();
  16.         if (protocolVersion >= 0) { // this is a server id and not a protocol version
  17.             sid = protocolVersion;
  18.         } else {
  19.             try {
  20.                 InitialMessage init = InitialMessage.parse(protocolVersion, din);
  21.                 sid = init.sid;
  22.                 if (!init.electionAddr.isEmpty()) {
  23.                     electionAddr = new MultipleAddresses(init.electionAddr,
  24.                             Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
  25.                 }
  26.             } catch (InitialMessage.InitialMessageException ex) {
  27.                 closeSocket(sock);
  28.                 return;
  29.             }
  30.         }
  31.         if (sid == QuorumPeer.OBSERVER_ID) {
  32.             // Choose identifier at random. We need a value to identify the connection.
  33.             sid = observerCounter.getAndDecrement();
  34.         }
  35.     } catch (IOException e) {
  36.         closeSocket(sock);
  37.         return;
  38.     }
  39.     // do authenticating learner
  40.     authServer.authenticate(sock, din);
  41.     // If wins the challenge, then close the new connection.
  42.     if (sid < self.getMyId()) { // 对方比自己id小,需要关闭当前连接,由自己去连接对方
  43.         SendWorker sw = senderWorkerMap.get(sid);
  44.         if (sw != null) {
  45.             sw.finish();
  46.         }
  47.         // 关闭连接
  48.         closeSocket(sock);
  49.         if (electionAddr != null) {
  50.             connectOne(sid, electionAddr); // 连接对方
  51.         } else {
  52.             connectOne(sid);
  53.         }
  54.     } else if (sid == self.getMyId()) {
  55.     } else { // 创建SendWorker、RecvWorker和发送队列
  56.         SendWorker sw = new SendWorker(sock, sid);
  57.         RecvWorker rw = new RecvWorker(sock, din, sid, sw);
  58.         sw.setRecv(rw);
  59.         SendWorker vsw = senderWorkerMap.get(sid);
  60.         if (vsw != null) {
  61.             vsw.finish();
  62.         }
  63.         senderWorkerMap.put(sid, sw);
  64.         queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
  65.         sw.start();
  66.         rw.start();
  67.     }
  68. }
复制代码
toSend方法

发消息。
  1. public void toSend(Long sid, ByteBuffer b) {
  2.     // 如果是给自己的消息,直接发给recvQueue
  3.     if (this.mySid == sid) {
  4.         b.position(0);
  5.         addToRecvQueue(new Message(b.duplicate(), sid));
  6.     } else {
  7.         // 将消息发给sid对应的发送队列
  8.         BlockingQueue<ByteBuffer> bq =
  9.             queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
  10.         addToSendQueue(bq, b);
  11.         // 检查是否建立了连接
  12.         connectOne(sid);
  13.     }
  14. }
复制代码
connectOne方法
  1. synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
  2.     // 已经建立过连接
  3.     if (senderWorkerMap.get(sid) != null) {
  4.         if (self.isMultiAddressEnabled() && electionAddr.size() > 1 &&
  5.             self.isMultiAddressReachabilityCheckEnabled()) {
  6.             // check是否可达
  7.             senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
  8.         }
  9.         return true;
  10.     }
  11.     // 异步建立新连接
  12.     return initiateConnectionAsync(electionAddr, sid);
  13. }
  14. synchronized void connectOne(long sid) {
  15.     if (senderWorkerMap.get(sid) != null) {
  16.         if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
  17.             senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
  18.         }
  19.         return;
  20.     }
  21.     // 使用sid从lastCommittedView、lastProposedView中解析address之后在建立连接
  22.     synchronized (self.QV_LOCK) {
  23.         boolean knownId = false;
  24.         // Resolve hostname for the remote server before attempting to
  25.         // connect in case the underlying ip address has changed.
  26.         self.recreateSocketAddresses(sid);
  27.         Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
  28.         QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
  29.         Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
  30.         if (lastCommittedView.containsKey(sid)) {
  31.             knownId = true;
  32.             if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
  33.                 return;
  34.             }
  35.         }
  36.         if (lastSeenQV != null
  37.             && lastProposedView.containsKey(sid)
  38.             && (!knownId ||
  39.                 !lastProposedView.get(sid).electionAddr.equals(lastCommittedView.get(sid).electionAddr))) {
  40.             knownId = true;
  41.             if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
  42.                 return;
  43.             }
  44.         }
  45.     }
  46. }
复制代码
connectAll方法

Try to establish a connection with each server if one doesn't exist.
  1. public void connectAll() {
  2.     long sid;
  3.     for (Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements(); ) {
  4.         sid = en.nextElement();
  5.         connectOne(sid);
  6.     }
  7. }
复制代码
Listener类

用来启动serverSocket监听,一个线程类,在run方法启动监听:
  1. public void run() {
  2.     if (!shutdown) {
  3.         Set<InetSocketAddress> addresses;
  4.         // 获取需要监听的地址
  5.         if (self.getQuorumListenOnAllIPs()) {
  6.             addresses = self.getElectionAddress().getWildcardAddresses();
  7.         } else {
  8.             addresses = self.getElectionAddress().getAllAddresses();
  9.         }
  10.         // 用于阻塞等待
  11.         CountDownLatch latch = new CountDownLatch(addresses.size());
  12.         // 为每一个监听地址创建ListenerHandler
  13.         listenerHandlers = addresses.stream().map(address ->
  14.                         new ListenerHandler(address,self.shouldUsePortUnification(),
  15.                                             self.isSslQuorum(), latch))
  16.                 .collect(Collectors.toList());
  17.         final ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
  18.         try {
  19.             // 启动ListenerHandler
  20.             listenerHandlers.forEach(executor::submit);
  21.         } finally {
  22.             executor.shutdown();
  23.         }
  24.         try {
  25.             // 阻塞等待,ListenerHandler结束之后会countdown
  26.             latch.await();
  27.         } catch (InterruptedException ie) {
  28.         } finally {
  29.             // Clean up for shutdown 略
  30.         }
  31.     }
  32.     // 略
  33. }
复制代码
ListenerHandler run方法:
  1. public void run() {
  2.     try {
  3.         // 接受连接
  4.         acceptConnections();
  5.         try {
  6.             close();
  7.         } catch (IOException e) {}
  8.     } catch (Exception e) {
  9.     } finally {
  10.         latch.countDown();
  11.     }
  12. }
  13. private void acceptConnections() {
  14.     int numRetries = 0;
  15.     Socket client = null;
  16.     while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
  17.         try {
  18.             // 创建ServerSocket并bind端口
  19.             serverSocket = createNewServerSocket();
  20.             while (!shutdown) {
  21.                 try {
  22.                     // 接受客户端Socket
  23.                     client = serverSocket.accept();
  24.                     setSockOpts(client); // socket设置如timeout
  25.                     // 使用receiveConnection处理新的连接
  26.                     if (quorumSaslAuthEnabled) {
  27.                         receiveConnectionAsync(client);
  28.                     } else {
  29.                         receiveConnection(client);
  30.                     }
  31.                     numRetries = 0;
  32.                 } catch (SocketTimeoutException e) {}
  33.             }
  34.         } catch (IOException e) {
  35.             // 略
  36.         }
  37.     }
  38.     // 略
  39. }
复制代码
QuorumConnectionReqThread类

用于异步连接其他peer服务,run方法调用initiateConnection方法建立连接。
QuorumConnectionReceiverThread类

用于异步接受连接,run方法调用receiveConnection方法处理新建立的连接。
SendWorker类

Thread to send messages. Instance waits on a queue, and send a message as soon as there is one available. If connection breaks, then opens a new one.
用来发送消息的线程:

  • 封装sid、socket、连接输出流
  • 从发送队列取消息,通过输出流发送
RecvWorker类

Thread to receive messages. Instance waits on a socket read. If the channel breaks, then removes itself from the pool of receivers.
用来读取消息的线程:
  1. public void run() {
  2.     threadCnt.incrementAndGet();
  3.     try {
  4.         while (running && !shutdown && sock != null) {
  5.             // 读取消息长度
  6.             int length = din.readInt();
  7.             if (length <= 0 || length > PACKETMAXSIZE) {
  8.                 throw new IOException("Received packet with invalid packet: " + length);
  9.             }
  10.             // 读取数据
  11.             final byte[] msgArray = new byte[length];
  12.             din.readFully(msgArray, 0, length);
  13.             // 保存到接收队列
  14.             addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
  15.         }
  16.     } catch (Exception e) {
  17.     } finally {
  18.         sw.finish();
  19.         closeSocket(sock);
  20.     }
  21. }
复制代码
FastLeaderElection类

文档说明:
  1. Implementation of leader election using TCP. It uses an object of the class QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based as with the other UDP implementations. There are a few parameters that can be tuned to change its behavior. First, finalizeWait determines the amount of time to wait until deciding upon a leader. This is part of the leader election algorithm.
复制代码

  • 使用tcp实现leader选举,基于推送模式
  • 使用QuorumCnxManager对象管理连接
构造方法
  1. public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
  2.     this.stop = false;
  3.     this.manager = manager;
  4.     starter(self, manager);
  5. }
  6. private void starter(QuorumPeer self, QuorumCnxManager manager) {
  7.     this.self = self;
  8.     proposedLeader = -1;
  9.     proposedZxid = -1;
  10.     sendqueue = new LinkedBlockingQueue<>();
  11.     recvqueue = new LinkedBlockingQueue<>();
  12.     // 用来启动WorkerSender和WorkerReceiver
  13.     this.messenger = new Messenger(manager);
  14. }
复制代码
主要字段
  1. // 在leader最终确定之前尝试拉取变化选票的时长
  2. static final int finalizeWait = 200;
  3. // 投票箱,用于保存一轮选举的结果、统计选举结果
  4. private SyncedLearnerTracker leadingVoteSet;
  5. // 发送队列
  6. LinkedBlockingQueue<ToSend> sendqueue;
  7. // 接收队列
  8. LinkedBlockingQueue<Notification> recvqueue;
  9. // 用来启动WorkerSender和WorkerReceiver
  10. Messenger messenger;
  11. // 决议leaderId
  12. long proposedLeader;
  13. // 决议zxid
  14. long proposedZxid;
  15. // 决议epoch
  16. long proposedEpoch;
复制代码
start方法启动选举
  1. public void start() {
  2.     this.messenger.start(); // 会启动WorkerSender和WorkerReceiver两个线程
  3. }
复制代码
Messenger类

WorkerSender线程


  • 从sendqueue取ToSend消息
  • 通过QuorumCnxManager的toSend方法发送消息
WorkerReceiver线程


  • 通过QuorumCnxManager的pollRecvQueue取接收的消息
  • 封装Notification对象,推送到recvqueue队列
主要方法
  1. // 创建发送消息
  2. static ByteBuffer buildMsg(
  3.     int state, long leader, long zxid, long electionEpoch, long epoch, byte[] configData);
  4. // 给所有节点发Notification投票
  5. private void sendNotifications();
  6. // 对比serverId、zxid、currentEpoch决定将票投给哪个server
  7. protected boolean totalOrderPredicate(
  8.     long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch);
  9. // 给定一个Vote集,返回SyncedLearnerTracker对象,用来确定是否有足够的选票确定选举结束
  10. protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote);
  11. // 如果有leader当选,并且有足够的选票,必须检查该leader是否投票并确认其处于领先地位
  12. // 需要进行这种检查,以避免peers一次又一次地选举一个已经崩溃且不再领先的peer
  13. protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch);
  14. // 更新proposedLeader、proposedZxid、proposedEpoch
  15. // 确定leader或者为下一轮投票做准备
  16. synchronized void updateProposal(long leader, long zxid, long epoch);
  17. // 使用当前proposedLeader、proposedZxid、proposedEpoch创建Vote(选票)
  18. public synchronized Vote getVote();
  19. // 通过zkDb获取lastLoggedZxid
  20. private long getInitLastLoggedZxid();
  21. // 获取currentEpoch
  22. private long getPeerEpoch();
  23. // 根据参数proposedLeader更新peer状态
  24. // 如果已经是leader会使用voteSet更新leadingVoteSet
  25. private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet);
  26. // 启动一轮leader选举
  27. // 当状态变为LOOKING该方法就会被调用,会给其他peer发投票notification
  28. public Vote lookForLeader() throws InterruptedException;
  29. // 收到FOLLOWING状态notification
  30. private Vote receivedFollowingNotification(
  31.     Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
  32.     SyncedLearnerTracker voteSet, Notification n);
  33. // 收到LEADING状态notification
  34. private Vote receivedLeadingNotification(
  35.     Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
  36.     SyncedLearnerTracker voteSet, Notification n);
复制代码
buildMsg方法
  1. static ByteBuffer buildMsg(int state, long leader, long zxid,
  2.                            long electionEpoch, long epoch, byte[] configData) {
  3.     byte[] requestBytes = new byte[44 + configData.length];
  4.     ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
  5.     requestBuffer.clear();
  6.     requestBuffer.putInt(state); // 当前状态
  7.     requestBuffer.putLong(leader); // 投票的leaderId
  8.     requestBuffer.putLong(zxid); // zxid
  9.     requestBuffer.putLong(electionEpoch); // 选举epoch
  10.     requestBuffer.putLong(epoch); // 数据epoch
  11.     requestBuffer.putInt(Notification.CURRENTVERSION); // 0x2
  12.     requestBuffer.putInt(configData.length); // 数据长度
  13.     requestBuffer.put(configData); // quorumVerifier数据
  14.     return requestBuffer;
  15. }
复制代码
totalOrderPredicate方法

对比serverId、zxid、currentEpoch决定将票投给哪个server:
  1. protected boolean totalOrderPredicate(
  2.     long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
  3.     if (self.getQuorumVerifier().getWeight(newId) == 0) {
  4.         return false;
  5.     }
  6.     /*
  7.      * Return true if one of the following three cases hold:
  8.      * 1- New epoch is higher
  9.      * 2- New epoch is the same as current epoch, but new zxid is higher
  10.      * 3- New epoch is the same as current epoch, new zxid is the same
  11.      *  as current zxid, but server id is higher.
  12.      */
  13.     return ((newEpoch > curEpoch)
  14.             || ((newEpoch == curEpoch)
  15.                 && ((newZxid > curZxid)
  16.                     || ((newZxid == curZxid)
  17.                         && (newId > curId)))));
  18. }
复制代码
getVoteTracker方法

给定一个Vote集,返回SyncedLearnerTracker对象,用来确定是否有足够的选票宣布选举结束:
  1. protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
  2.     SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
  3.     voteSet.addQuorumVerifier(self.getQuorumVerifier());
  4.     if (self.getLastSeenQuorumVerifier() != null
  5.         && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
  6.         voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
  7.     }
  8.     // 比对其他server响应的选票和本地的选票,决定是否将选票sid放入ack集
  9.     for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
  10.         if (vote.equals(entry.getValue())) {
  11.             voteSet.addAck(entry.getKey()); // key是sid
  12.         }
  13.     }
  14.     return voteSet;
  15. }
复制代码
checkLeader方法
  1. protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionEpoch) {
  2.     boolean predicate = true;
  3.     if (leader != self.getMyId()) {
  4.         if (votes.get(leader) == null) { // leader服务器必须投票,否则次轮投票也无效
  5.             predicate = false;
  6.         } else if (votes.get(leader).getState() != ServerState.LEADING) {
  7.             // leader服务器的状态必须是LEADING,否则次轮投票也无效
  8.             predicate = false;
  9.         }
  10.     } else if (logicalclock.get() != electionEpoch) { // 选举epoch必须一致
  11.         predicate = false;
  12.     }
  13.     return predicate;
  14. }
复制代码
lookForLeader方法

启动一轮leader选举,当状态变为LOOKING该方法就会被调用,会给其他peer发投票notification通知:
  1. public Vote lookForLeader() throws InterruptedException {
  2.     // 略
  3.     try {
  4.         // 存储当前选举周期的sid -> vote选票数据
  5.         Map<Long, Vote> recvset = new HashMap<>();
  6.         // 存储之前选举周期的sid -> vote选票数据
  7.         Map<Long, Vote> outofelection = new HashMap<>();
  8.         int notTimeout = minNotificationInterval;
  9.         synchronized (this) {
  10.             logicalclock.incrementAndGet(); // 递增选举epoch开始新一轮选举
  11.             // 初始化选举"决议",最开始都是投票给自己
  12.             updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
  13.         }
  14.         // 给所有节点发通知
  15.         sendNotifications();
  16.         // 投票箱
  17.         SyncedLearnerTracker voteSet = null;
  18.         // 正常情况下直到选出leader才会退出
  19.         while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
  20.             Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
  21.             if (n == null) {
  22.                 // 重发或者重连
  23.                 if (manager.haveDelivered()) {
  24.                     sendNotifications();
  25.                 } else {
  26.                     manager.connectAll();
  27.                 }
  28.                 notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);
  29.                 // 略
  30.             } else if (validVoter(n.sid) && validVoter(n.leader)) {
  31.                 switch (n.state) {
  32.                 case LOOKING:
  33.                     // 略
  34.                     // 对方的选举epoch比自己大
  35.                     if (n.electionEpoch > logicalclock.get()) {
  36.                         logicalclock.set(n.electionEpoch); // 同步为新的epoch
  37.                         recvset.clear(); // 清空投票集
  38.                         // 比对选票,如果对方赢了,则使用对方的选票更新到本地
  39.                         if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  40.                                                 getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
  41.                             updateProposal(n.leader, n.zxid, n.peerEpoch);
  42.                         } else {
  43.                             updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
  44.                         }
  45.                         // 把最新的选票发出去
  46.                         sendNotifications();
  47.                     } else if (n.electionEpoch < logicalclock.get()) {
  48.                         // 对方的选举epoch比自己小
  49.                         break;
  50.                     } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  51.                                                    proposedLeader, proposedZxid, proposedEpoch)) {
  52.                         updateProposal(n.leader, n.zxid, n.peerEpoch);
  53.                         sendNotifications();
  54.                     }
  55.                     // 保存到选票集
  56.                     recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
  57.                     // 创建投票箱
  58.                     voteSet = getVoteTracker(
  59.                         recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
  60.                     // 判断acks>half表示已经选举出了leader
  61.                     if (voteSet.hasAllQuorums()) {
  62.                         // 等待拉取变化的选票
  63.                         while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
  64.                             if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  65.                                                     proposedLeader, proposedZxid, proposedEpoch)) {
  66.                                 recvqueue.put(n);
  67.                                 break;
  68.                             }
  69.                         }
  70.                         // 设置peer状态
  71.                         if (n == null) {
  72.                             setPeerState(proposedLeader, voteSet);
  73.                             Vote endVote = new Vote(
  74.                                 proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
  75.                             leaveInstance(endVote);
  76.                             return endVote;
  77.                         }
  78.                     }
  79.                     break;
  80.                 case OBSERVING:
  81.                     break;
  82.                 case FOLLOWING:
  83.                     // 收到FOLLOWING通知
  84.                     Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
  85.                     if (resultFN == null) {
  86.                         break;
  87.                     } else {
  88.                         return resultFN;
  89.                     }
  90.                 case LEADING:
  91.                     // 收到LEADING通知
  92.                     Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
  93.                     if (resultLN == null) {
  94.                         break;
  95.                     } else {
  96.                         return resultLN;
  97.                     }
  98.                 default:
  99.                     break;
  100.                 }
  101.             } else {
  102.                 // 略
  103.             }
  104.         }
  105.         return null;
  106.     } finally {
  107.         // 略
  108.     }
  109. }
复制代码
receivedFollowingNotification方法

收到FOLLOWING状态notification。
  1. private Vote receivedFollowingNotification(
  2.     Map<Long, Vote> recvset, Map<Long, Vote> outofelection, SyncedLearnerTracker voteSet, Notification n) {
  3.     // 也会将选票投给当前leader
  4.     // 之后会进行quorum验证和leaderCheck验证
  5.     if (n.electionEpoch == logicalclock.get()) {
  6.         // 创建投票箱
  7.         recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
  8.         voteSet = getVoteTracker(
  9.             recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
  10.         // acks>half和leaderCheck
  11.         if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
  12.             // 更新节点状态
  13.             setPeerState(n.leader, voteSet);
  14.             Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
  15.             leaveInstance(endVote);
  16.             return endVote;
  17.         }
  18.     }
  19.     // 当本节点较晚进入集群,集群已经有了leader时,会进入下面逻辑
  20.     // 与前面的代码基本相同
  21.     outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
  22.     voteSet = getVoteTracker(
  23.         outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
  24.     if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
  25.         synchronized (this) {
  26.             logicalclock.set(n.electionEpoch);
  27.             setPeerState(n.leader, voteSet);
  28.         }
  29.         Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
  30.         leaveInstance(endVote);
  31.         return endVote;
  32.     }
  33.     return null;
  34. }
复制代码
receivedLeadingNotification方法

收到LEADING状态notification。
  1. private Vote receivedLeadingNotification(Map<Long, Vote> recvset, Map<Long, Vote> outofelection,
  2.                                          SyncedLearnerTracker voteSet, Notification n) {
  3.     Vote result = receivedFollowingNotification(recvset, outofelection, voteSet, n);
  4.     if (result == null) {
  5.         if (self.getQuorumVerifier().getNeedOracle() && !self.getQuorumVerifier().askOracle()) {
  6.             // 略
  7.         } else {
  8.             return null;
  9.         }
  10.     } else {
  11.         return result;
  12.     }
  13. }
复制代码
QuorumPeer类

管理quorum协议,服务器可能处于以下三种状态:

  • Leader选举 - 每个服务器将选出一个leader,最初都会选自己
  • Follower节点 - 将与Leader同步并复制所有事务
  • Leader节点 - 处理请求并将其转发给Follower节点,大多数Follower节点必须同步,该请求才能被提交
run方法main loop

run方法main loop判断当前peer状态,执行选举、lead、follow等逻辑:
  1. public void run() {
  2.     // 略
  3.     try {
  4.         // Main loop
  5.         while (running) {
  6.             switch (getPeerState()) {
  7.             case LOOKING:
  8.                 ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
  9.                 if (Boolean.getBoolean("readonlymode.enabled")) {
  10.                     // 略
  11.                 } else {
  12.                     try {
  13.                         reconfigFlagClear();
  14.                         if (shuttingDownLE) {
  15.                             shuttingDownLE = false;
  16.                             startLeaderElection();
  17.                         }
  18.                         setCurrentVote(makeLEStrategy().lookForLeader());
  19.                     } catch (Exception e) {
  20.                         setPeerState(ServerState.LOOKING);
  21.                     }
  22.                 }
  23.                 break;
  24.             case OBSERVING:
  25.                 try {
  26.                     setObserver(makeObserver(logFactory));
  27.                     observer.observeLeader();
  28.                 } catch (Exception e) {
  29.                 } finally {
  30.                     observer.shutdown();
  31.                     setObserver(null);
  32.                     updateServerState();
  33.                     // Add delay jitter before we switch to LOOKING
  34.                     // state to reduce the load of ObserverMaster
  35.                     if (isRunning()) {
  36.                         Observer.waitForObserverElectionDelay();
  37.                     }
  38.                 }
  39.                 break;
  40.             case FOLLOWING:
  41.                 try {
  42.                     setFollower(makeFollower(logFactory));
  43.                     follower.followLeader();
  44.                 } catch (Exception e) {
  45.                 } finally {
  46.                     follower.shutdown();
  47.                     setFollower(null);
  48.                     updateServerState();
  49.                 }
  50.                 break;
  51.             case LEADING:
  52.                 try {
  53.                     setLeader(makeLeader(logFactory));
  54.                     leader.lead();
  55.                     setLeader(null);
  56.                 } catch (Exception e) {
  57.                 } finally {
  58.                     if (leader != null) {
  59.                         leader.shutdown("Forcing shutdown");
  60.                         setLeader(null);
  61.                     }
  62.                     updateServerState();
  63.                 }
  64.                 break;
  65.             }
  66.         }
  67.     } finally {
  68.         // 略
  69.     }
  70. }
复制代码
LOOKING分支
  1. try {
  2.     reconfigFlagClear();
  3.     if (shuttingDownLE) {
  4.         shuttingDownLE = false;
  5.         startLeaderElection();
  6.     }
  7.     // 使用FastLeaderElection选举
  8.     setCurrentVote(makeLEStrategy().lookForLeader());
  9. } catch (Exception e) {
  10.     setPeerState(ServerState.LOOKING); // 重置为LOOKING状态
  11. }
复制代码
FOLLOWING分支
  1. try {
  2.     setFollower(makeFollower(logFactory));
  3.     follower.followLeader(); // 启动follower
  4. } catch (Exception e) {
  5. } finally {
  6.     follower.shutdown();
  7.     setFollower(null);
  8.     updateServerState(); // 更新服务状态
  9. }
复制代码
创建Follower对象:
  1. protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
  2.     return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
  3. }
复制代码
LEADING分支
  1. try {
  2.     setLeader(makeLeader(logFactory));
  3.     leader.lead(); // 启动leader
  4.     setLeader(null);
  5. } catch (Exception e) {
  6. } finally {
  7.     if (leader != null) {
  8.         leader.shutdown("Forcing shutdown");
  9.         setLeader(null);
  10.     }
  11.     updateServerState(); // 更新服务状态
  12. }
复制代码
创建Leader对象:
  1. protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
  2.     return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
  3. }
复制代码
OBSERVING分支
  1. try {
  2.     setObserver(makeObserver(logFactory));
  3.     observer.observeLeader();
  4. } catch (Exception e) {
  5. } finally {
  6.     observer.shutdown();
  7.     setObserver(null);
  8.     updateServerState();
  9.     // Add delay jitter before we switch to LOOKING
  10.     // state to reduce the load of ObserverMaster
  11.     if (isRunning()) {
  12.         Observer.waitForObserverElectionDelay();
  13.     }
  14. }
复制代码
创建Observer对象:
  1. protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
  2.     return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
  3. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

九天猎人

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表