zookeeper源码(09)follower处理客户端请求

打印 上一主题 下一主题

主题 921|帖子 921|积分 2763

在zookeeper中,follower也可以接收客户端连接,处理客户端请求,本文将分析follower处理客户端请求的流程:

  • 读请求处理
  • 写请求转发与响应
follower接收转发客户端请求

网络层接收客户端数据包

leader、follower都会启动ServerCnxnFactory组件,用来接收客户端连接、读取客户端数据包、将客户端数据包转发给zk应用层。
在"zookeeper源码(08)请求处理及数据读写流程"一文中已经介绍,ServerCnxn在读取到客户端数据包之后,会调用zookeeperServer的processConnectRequest或processPacket方法:

  • processConnectRequest方法:创建session
  • processPacket方法:处理业务请求
processConnectRequest创建session


  • 会使用sessionTracker生成sessionId、创建session对象
  • 生成一个密码
  • 提交一个createSession类型Request并提交给业务处理器
  1. long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
  2.     // 生成sessionId、创建session对象
  3.     long sessionId = sessionTracker.createSession(timeout);
  4.     // 生成密码
  5.     Random r = new Random(sessionId ^ superSecret);
  6.     r.nextBytes(passwd);
  7.     // 提交createSession类型Request
  8.     CreateSessionTxn txn = new CreateSessionTxn(timeout);
  9.     cnxn.setSessionId(sessionId);
  10.     Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
  11.     submitRequest(si);
  12.     return sessionId;
  13. }
复制代码
processPacket处理业务请求


  • 封装Request
  • 验证largeRequest
  • 提交业务层处理器
  1. Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
  2. int length = request.limit();
  3. if (isLargeRequest(length)) {
  4.     // checkRequestSize will throw IOException if request is rejected
  5.     checkRequestSizeWhenMessageReceived(length);
  6.     si.setLargeRequestSize(length);
  7. }
  8. si.setOwner(ServerCnxn.me);
  9. submitRequest(si);
复制代码
FollowerRequestProcessor处理器

在follower端,客户端请求会由FollowerRequestProcessor处理:

  • 把请求提交下游CommitProcessor处理器
  • 写请求转发给leader处理
  • 读请求经过CommitProcessor直接转发给FinalRequestProcessor处理器,直接查询数据返回给客户端
  1. public void run() {
  2.     try {
  3.         while (!finished) {
  4.             Request request = queuedRequests.take();
  5.             // Screen quorum requests against ACLs first 略
  6.             // 转发给CommitProcessor处理器
  7.             // 提交到queuedRequests队列
  8.             // 写请求还会提交到queuedWriteRequests队列
  9.             maybeSendRequestToNextProcessor(request);
  10.             // ...
  11.             // 写请求需要转发给leader处理
  12.             switch (request.type) {
  13.             case OpCode.sync:
  14.                 zks.pendingSyncs.add(request); // 待同步命令
  15.                 zks.getFollower().request(request);
  16.                 break;
  17.             case OpCode.create:
  18.             case OpCode.create2:
  19.             case OpCode.createTTL:
  20.             case OpCode.createContainer:
  21.             case OpCode.delete:
  22.             case OpCode.deleteContainer:
  23.             case OpCode.setData:
  24.             case OpCode.reconfig:
  25.             case OpCode.setACL:
  26.             case OpCode.multi:
  27.             case OpCode.check:
  28.                 zks.getFollower().request(request);
  29.                 break;
  30.             case OpCode.createSession:
  31.             case OpCode.closeSession:
  32.                 if (!request.isLocalSession()) {
  33.                     zks.getFollower().request(request);
  34.                 }
  35.                 break;
  36.             }
  37.         }
  38.     } catch (Exception e) {
  39.         handleException(this.getName(), e);
  40.     }
  41. }
复制代码
转发leader
  1. zks.getFollower().request(request);
复制代码
Learner转发请求:
  1. void request(Request request) throws IOException {
  2.     // 略
  3.     ByteArrayOutputStream baos = new ByteArrayOutputStream();
  4.     DataOutputStream oa = new DataOutputStream(baos);
  5.     oa.writeLong(request.sessionId); // sessionId
  6.     oa.writeInt(request.cxid); // 客户端xid
  7.     oa.writeInt(request.type); // 业务类型
  8.     byte[] payload = request.readRequestBytes(); // 请求体
  9.     if (payload != null) {
  10.         oa.write(payload);
  11.     }
  12.     oa.close();
  13.     // 封装REQUEST数据包
  14.     QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
  15.     writePacket(qp, true); // 通过网络发给leader服务器
  16. }
复制代码
leader处理follower请求

LearnerHandler接收REQUEST请求
  1. case Leader.REQUEST:
  2.     bb = ByteBuffer.wrap(qp.getData());
  3.     sessionId = bb.getLong(); // 解析请求信息
  4.     cxid = bb.getInt();
  5.     type = bb.getInt();
  6.     bb = bb.slice();
  7.     Request si;
  8.     if (type == OpCode.sync) {
  9.         si = new LearnerSyncRequest(
  10.             this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
  11.     } else {
  12.         si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
  13.     }
  14.     si.setOwner(this); // 用来判断请求来自follower
  15.     learnerMaster.submitLearnerRequest(si); // 提交给业务处理器
  16.     requestsReceived.incrementAndGet();
复制代码
submitLearnerRequest提交业务处理器:
  1. public void submitLearnerRequest(Request si) {
  2.     zk.submitLearnerRequest(si);
  3. }
复制代码
LeaderZooKeeperServer提交业务处理器:
  1. public void submitLearnerRequest(Request request) {
  2.     // 提交给PrepRequestProcessor处理器
  3.     prepRequestProcessor.processRequest(request);
  4. }
复制代码
从此处开始走leader处理写请求流程。
leader处理写请求流程回顾


  • PrepRequestProcessor - 做事务设置
  • ProposalRequestProcessor - 发起proposal,将Request转发给SyncRequestProcessor写事务log、本地ack
  • CommitProcessor - 读请求直接调用下游处理器,写请求需要等待足够的ack之后commit再调用下游RequestProcessor处理器
  • ToBeAppliedRequestProcessor - 维护toBeApplied列表
  • FinalRequestProcessor - 把事务应用到ZKDatabase,提供查询功能,返回响应
follower处理leader数据

在follower中,Follower使用processPacket方法处理来自leader的数据包,此处看一下PROPOSAL和COMMIT的逻辑。
PROPOSAL数据包
  1. fzk.logRequest(hdr, txn, digest);
复制代码
logRequest会使用syncProcessor将事务写入到txnlog文件,之后调用SendAckRequestProcessor处理器给leader发ack数据包。
leader收到超过半数的ack之后会发COMMIT数据包让各个节点将事务应用到ZKDatabase中。
COMMIT数据包
  1. fzk.commit(qp.getZxid());
复制代码
CommitProcessor处理器会将其提交到committedRequests队列,之后客户端Request会继续向下游FinalRequestProcessor处理器传递。
FinalRequestProcessor处理器


  • 把事务应用到ZKDatabase中
  • 提供查询功能
  • 给客户端返回响应

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

麻花痒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表