zookeeper源码(06)ZooKeeperServer及子类

  金牌会员 | 2024-4-13 00:32:03 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 881|帖子 881|积分 2643

ZooKeeperServer

实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:
  1. ZooKeeperServer
  2.   |-- QuorumZooKeeperServer
  3.     |-- LeaderZooKeeperServer
  4.     |-- LearnerZooKeeperServer
  5.       |-- FollowerZooKeeperServer
  6.       |-- ObserverZooKeeperServer
  7.   |-- ReadOnlyZooKeeperServer
复制代码
主要字段
  1. // tickTime参数默认值
  2. public static final int DEFAULT_TICK_TIME = 3000;
  3. protected int tickTime = DEFAULT_TICK_TIME;
  4. // 默认tickTime * 2
  5. protected int minSessionTimeout = -1;
  6. // 默认tickTime * 20
  7. protected int maxSessionTimeout = -1;
  8. // 会话跟踪
  9. protected SessionTracker sessionTracker;
  10. // 存储组件
  11. private FileTxnSnapLog txnLogFactory = null;
  12. private ZKDatabase zkDb;
  13. // 缓存数据
  14. private ResponseCache readResponseCache;
  15. private ResponseCache getChildrenResponseCache;
  16. // zxid会在启动阶段设置为最新lastZxid
  17. private final AtomicLong hzxid = new AtomicLong(0);
  18. // 请求处理器链入口
  19. protected RequestProcessor firstProcessor;
  20. // 缓存变化的数据
  21. final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
  22. final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();
  23. protected ServerCnxnFactory serverCnxnFactory;
  24. protected ServerCnxnFactory secureServerCnxnFactory;
  25. // 大请求判断使用的参数
  26. private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
  27. private volatile int largeRequestThreshold = -1;
复制代码
主要方法

方法定义
  1. // 通过zkDb从dataTree中删除Watcher监听器
  2. void removeCnxn(ServerCnxn cnxn);
  3. // 创建zkDb(为null时)并loadData加载数据
  4. public void startdata() throws IOException, InterruptedException;
  5. // 加载数据、清理session、生成快照
  6. public void loadData() throws IOException, InterruptedException;
  7. // 保存zkDb当前快照
  8. public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere,
  9.                                       boolean fastForwardFromEdits) throws IOException;
  10. // 从指定的输入流解析数据,生成新的zkDb和SessionTrack
  11. public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException;
  12. // 使用zkDb.truncateLog(zxid)删除快照数据
  13. public void truncateLog(long zxid) throws IOException;
  14. // 通过zkDb获取dataTree.lastProcessedZxid的值
  15. public long getLastProcessedZxid();
  16. // 提交closeSession类型的Request来关闭会话
  17. private void close(long sessionId);
  18. // 使用zkDb杀掉会话
  19. protected void killSession(long sessionId, long zxid);
  20. // 启动组件
  21. private void startupWithServerState(State state);
  22. // 创建RequestProcessor用来处理请求
  23. protected void setupRequestProcessors();
  24. // 创建SessionTracker
  25. protected void createSessionTracker();
  26. // 为指定的session生成一个密码
  27. byte[] generatePasswd(long id);
  28. // 验证session密码
  29. protected boolean checkPasswd(long sessionId, byte[] passwd);
  30. // 使用sessionTracker创建session、生成密码、提交一个createSession请求
  31. long createSession(ServerCnxn cnxn, byte[] passwd, int timeout);
  32. // 为指定的session绑定owner
  33. public void setOwner(long id, Object owner) throws SessionExpiredException;
  34. // 验证session之后使用finishSessionInit方法确定继续通信或者断开连接
  35. protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException;
  36. public void finishSessionInit(ServerCnxn cnxn, boolean valid);
  37. // checkPasswd->revalidateSession->finishSessionInit
  38. public void reopenSession(ServerCnxn cnxn, long sessionId,
  39.                           byte[] passwd, int sessionTimeout) throws IOException;
  40. // 把请求提交给requestThrottler之后再陆续调用submitRequestNow处理
  41. public void enqueueRequest(Request si);
  42. // 使用firstProcessor处理请求
  43. public void submitRequestNow(Request si);
  44. // 处理连接请求,网络IO层调用
  45. public void processConnectRequest(
  46.     ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException;
  47. // 处理业务请求,网络IO层调用
  48. public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException;
  49. // sasl认证
  50. private void processSasl(
  51.     RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException;
  52. // 处理transaction
  53. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn);
  54. public ProcessTxnResult processTxn(Request request);
  55. private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn);
  56. private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest);
  57. // Grant or deny authorization to an operation on a node
  58. public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids,
  59.                      String path, List<ACL> setAcls) throws KeeperException.NoAuthException;
  60. // Check a path whether exceeded the quota
  61. public void checkQuota(String path, byte[] lastData, byte[] data,
  62.                        int type) throws KeeperException.QuotaExceededException;
  63. private void checkQuota(String lastPrefix, long bytesDiff, long countDiff,
  64.                         String namespace) throws KeeperException.QuotaExceededException;
  65. // 获取上级父类path
  66. private String parentPath(String path) throws KeeperException.BadArgumentsException;
  67. // 从Request获取有效的path
  68. private String effectiveACLPath(
  69.     Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException;
  70. // 根据Request获取需要的权限类型
  71. private int effectiveACLPerms(Request request);
  72. // 检查写权限
  73. public boolean authWriteRequest(Request request);
复制代码
loadData方法

加载数据、清理session、生成快照:
  1. public void loadData() throws IOException, InterruptedException {
  2.     // 初始化zxid
  3.     if (zkDb.isInitialized()) {
  4.         setZxid(zkDb.getDataTreeLastProcessedZxid());
  5.     } else {
  6.         setZxid(zkDb.loadDataBase());
  7.     }
  8.     // 使用killSession方法杀死过期会话
  9.     zkDb.getSessions().stream()
  10.                     .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
  11.                     .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
  12.     // 保存快照
  13.     // txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap)
  14.     takeSnapshot();
  15. }
复制代码
killSession方法
  1. protected void killSession(long sessionId, long zxid) {
  2.     // 需要清理临时节点
  3.     zkDb.killSession(sessionId, zxid);
  4.     if (sessionTracker != null) {
  5.         // 删除会话跟踪信息
  6.         sessionTracker.removeSession(sessionId);
  7.     }
  8. }
复制代码
startupWithServerState方法
  1. private void startupWithServerState(State state) {
  2.     if (sessionTracker == null) {
  3.         createSessionTracker();
  4.     }
  5.     startSessionTracker();
  6.     // 创建RequestProcessor用于处理请求
  7.     setupRequestProcessors();
  8.     // 这是一个限流的组件,不做分析
  9.     startRequestThrottler();
  10.     registerJMX();
  11.     startJvmPauseMonitor();
  12.     registerMetrics();
  13.     setState(state);
  14.     requestPathMetricsCollector.start();
  15.     localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
  16.     notifyAll();
  17. }
复制代码
setupRequestProcessors方法(重要)
  1. protected void setupRequestProcessors() {
  2.     RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  3.     RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
  4.     ((SyncRequestProcessor) syncProcessor).start();
  5.     firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  6.     ((PrepRequestProcessor) firstProcessor).start();
  7. }
复制代码
RequestProcessor接口:以处理器链方式处理事务,请求总是按顺序处理。standaloneServer、follower和leader有不同的处理器链。请求通过processRequest方法传递给其他RequestProcessor对象,通常情况总是由单个线程调用。当调用shutdown时,RequestProcessor还应关闭与其关联的其他RequestProcessor对象。
FinalRequestProcessor类:处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。
SyncRequestProcessor类:将请求记录到磁盘,对请求进行批处理,以有效地执行IO操作。在日志同步到磁盘之前,请求不会传递给下一个RequestProcessor对象。SyncRequestProcessor用于3种不同的情况:

  • Leader - 将请求同步到磁盘,并将其转发给AckRequestProcessor,后者将ack发送回leader自己
  • Follower - 将请求同步到磁盘,并将其转发给SendAckRequestProcessor,后者将ack发送给leader
  • Observer - 将请求同步到磁盘,作为INFORM数据包接收。不将ack发送回leader,因此nextProcessor将为null
PrepRequestProcessor类:通常位于RequestProcessor链开头,为更新请求关联的事务做设置。
createSessionTracker方法
  1. protected void createSessionTracker() {
  2.     sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime,
  3.                                             createSessionTrackerServerId, getZooKeeperServerListener());
  4. }
复制代码
不同的子类使用了不同的SessionTracker实现类:

  • LeaderZooKeeperServer - LeaderSessionTracker
  • LearnerZooKeeperServer- LearnerSessionTracker
createSession方法
  1. long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
  2.     if (passwd == null) {
  3.         passwd = new byte[0];
  4.     }
  5.     // 创建一个session
  6.     long sessionId = sessionTracker.createSession(timeout);
  7.     // 生成session密码
  8.     Random r = new Random(sessionId ^ superSecret);
  9.     r.nextBytes(passwd);
  10.     // 提交createSession请求,该请求会被RequestProcessor处理
  11.     CreateSessionTxn txn = new CreateSessionTxn(timeout);
  12.     cnxn.setSessionId(sessionId);
  13.     Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
  14.     submitRequest(si);
  15.     return sessionId;
  16. }
复制代码
submitRequestNow方法
  1. public void submitRequestNow(Request si) {
  2.     try {
  3.         touch(si.cnxn);
  4.         boolean validpacket = Request.isValid(si.type);
  5.         if (validpacket) {
  6.             setLocalSessionFlag(si);
  7.             // 使用firstProcessor处理请求
  8.             firstProcessor.processRequest(si);
  9.             if (si.cnxn != null) {
  10.                 incInProcess();
  11.             }
  12.         } else {
  13.             // Update request accounting/throttling limits
  14.             requestFinished(si);
  15.             new UnimplementedRequestProcessor().processRequest(si);
  16.         }
  17.     } catch (MissingSessionException e) {
  18.         // Update request accounting/throttling limits
  19.         requestFinished(si);
  20.     } catch (RequestProcessorException e) {
  21.         // Update request accounting/throttling limits
  22.         requestFinished(si);
  23.     }
  24. }
复制代码
processConnectRequest方法
  1. public void processConnectRequest(
  2.         ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
  3.     long sessionId = request.getSessionId();
  4.     // 略
  5.     if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
  6.         // zxid参数有误
  7.         throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
  8.     }
  9.     int sessionTimeout = request.getTimeOut();
  10.     byte[] passwd = request.getPasswd();
  11.     int minSessionTimeout = getMinSessionTimeout();
  12.     if (sessionTimeout < minSessionTimeout) {
  13.         sessionTimeout = minSessionTimeout;
  14.     }
  15.     int maxSessionTimeout = getMaxSessionTimeout();
  16.     if (sessionTimeout > maxSessionTimeout) {
  17.         sessionTimeout = maxSessionTimeout;
  18.     }
  19.     cnxn.setSessionTimeout(sessionTimeout);
  20.     // We don't want to receive any packets until we are sure that the session is setup
  21.     cnxn.disableRecv();
  22.     if (sessionId == 0) {
  23.         // 创建session
  24.         long id = createSession(cnxn, passwd, sessionTimeout);
  25.     } else {
  26.         validateSession(cnxn, sessionId); // do nothing
  27.         // 关闭旧的ServerCnxn
  28.         if (serverCnxnFactory != null) {
  29.             serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
  30.         }
  31.         if (secureServerCnxnFactory != null) {
  32.             secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
  33.         }
  34.         cnxn.setSessionId(sessionId);
  35.         // 开启新session
  36.         reopenSession(cnxn, sessionId, passwd, sessionTimeout);
  37.     }
  38. }
复制代码
processPacket方法
  1. public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
  2.     cnxn.incrOutstandingAndCheckThrottle(h);
  3.     if (h.getType() == OpCode.auth) {
  4.         AuthPacket authPacket = request.readRecord(AuthPacket::new);
  5.         String scheme = authPacket.getScheme();
  6.         ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
  7.         Code authReturn = KeeperException.Code.AUTHFAILED;
  8.         // 认证、继续通信或者关闭连接,略
  9.         return;
  10.     } else if (h.getType() == OpCode.sasl) {
  11.         processSasl(request, cnxn, h);
  12.     } else {
  13.         if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
  14.             return;
  15.         } else {
  16.             Request si = new Request(
  17.                 cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
  18.             int length = request.limit();
  19.             if (isLargeRequest(length)) { // 判断large请求
  20.                 checkRequestSizeWhenMessageReceived(length);
  21.                 si.setLargeRequestSize(length);
  22.             }
  23.             si.setOwner(ServerCnxn.me);
  24.             // 提交请求等待firstProcessor处理
  25.             submitRequest(si);
  26.         }
  27.     }
  28. }
复制代码
processTxn相关方法
  1. // entry point for quorum/Learner.java
  2. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
  3.     processTxnForSessionEvents(null, hdr, txn);
  4.     return processTxnInDB(hdr, txn, null);
  5. }
  6. // entry point for FinalRequestProcessor.java
  7. public ProcessTxnResult processTxn(Request request) {
  8.     TxnHeader hdr = request.getHdr();
  9.     processTxnForSessionEvents(request, hdr, request.getTxn());
  10.     final boolean writeRequest = (hdr != null);
  11.     final boolean quorumRequest = request.isQuorum();
  12.     // return fast w/o synchronization when we get a read
  13.     if (!writeRequest && !quorumRequest) {
  14.         return new ProcessTxnResult();
  15.     }
  16.     synchronized (outstandingChanges) {
  17.         ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
  18.         // request.hdr is set for write requests, which are the only ones
  19.         // that add to outstandingChanges.
  20.         if (writeRequest) {
  21.             long zxid = hdr.getZxid();
  22.             while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) {
  23.                 ChangeRecord cr = outstandingChanges.remove();
  24.                 ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
  25.                 if (outstandingChangesForPath.get(cr.path) == cr) {
  26.                     outstandingChangesForPath.remove(cr.path);
  27.                 }
  28.             }
  29.         }
  30.         // do not add non quorum packets to the queue.
  31.         if (quorumRequest) {
  32.             getZKDatabase().addCommittedProposal(request);
  33.         }
  34.         return rc;
  35.     }
  36. }
  37. private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
  38.     int opCode = (request == null) ? hdr.getType() : request.type;
  39.     long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
  40.     if (opCode == OpCode.createSession) {
  41.         if (hdr != null && txn instanceof CreateSessionTxn) {
  42.             CreateSessionTxn cst = (CreateSessionTxn) txn;
  43.             // Add the session to the local session map or global one in zkDB.
  44.             sessionTracker.commitSession(sessionId, cst.getTimeOut());
  45.         }
  46.     } else if (opCode == OpCode.closeSession) {
  47.         sessionTracker.removeSession(sessionId);
  48.     }
  49. }
  50. private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
  51.     if (hdr == null) {
  52.         return new ProcessTxnResult();
  53.     } else {
  54.         return getZKDatabase().processTxn(hdr, txn, digest);
  55.     }
  56. }
复制代码
实现类概述

集群模式下leader节点使用的ZooKeeperServer实现类:

  • 继承QuorumZooKeeperServer
  • 使用的RequestProcessor与父类不同:
    1. Just like the standard ZooKeeperServer. We just replace the request processors: PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor
    复制代码
  • 使用LeaderSessionTracker做会话追踪
  • 与learner节点通信
处理器链


  • FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾
  • ToBeAppliedRequestProcessor - 维护toBeApplied列表
  • CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器
  • ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor
    1. // 构建处理器链
    2. protected void setupRequestProcessors() {
    3.     RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    4.     RequestProcessor toBeAppliedProcessor =
    5.         new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    6.     commitProcessor = new CommitProcessor(
    7.         toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    8.     commitProcessor.start();
    9.     ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
    10.     proposalProcessor.initialize();
    11.     prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    12.     prepRequestProcessor.start();
    13.     firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
    14.     setupContainerManager(); // 启动ContainerManager用于删除ttl节点和container节点
    15. }
    复制代码
  • PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置
  • LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器
LearnerZooKeeperServer

Learner基类:

  • 使用LearnerSessionTracker做会话追踪
  • 使用CommitProcessor、SyncRequestProcessor做处理器链
FollowerZooKeeperServer

实现类概述

与ZooKeeperServer类似,只是处理器链不同:
FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
使用SyncRequestProcessor来记录leader的提案。
处理器链

setupRequestProcessors方法:
  1. public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
  2.     this.zks = zks;
  3.     this.nextProcessor = nextProcessor;
  4.     // 内部有维护SyncRequestProcessor和AckRequestProcessor
  5.     AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
  6.     syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
  7.     forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
  8.             FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
  9. }
复制代码

  • FinalRequestProcessor
  • CommitProcessor
  • FollowerRequestProcessor - 将数据更新请求转发给Leader
  • SyncRequestProcessor
  • SendAckRequestProcessor - 给leader发ACK
ObserverZooKeeperServer

Observer类型节点的ZooKeeperServer实现。
setupRequestProcessors方法:
  1. protected void setupRequestProcessors() {
  2.     RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  3.     commitProcessor = new CommitProcessor(
  4.         finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
  5.     commitProcessor.start();
  6.     firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
  7.     ((FollowerRequestProcessor) firstProcessor).start();
  8.     syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
  9.     syncProcessor.start();
  10. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表