在zookeeper中,follower也可以接收客户端连接,处理客户端请求,本文将分析follower处理客户端请求的流程:
follower接收转发客户端请求
网络层接收客户端数据包
leader、follower都会启动ServerCnxnFactory组件,用来接收客户端连接、读取客户端数据包、将客户端数据包转发给zk应用层。
在"zookeeper源码(08)请求处理及数据读写流程"一文中已经介绍,ServerCnxn在读取到客户端数据包之后,会调用zookeeperServer的processConnectRequest或processPacket方法:
- processConnectRequest方法:创建session
- processPacket方法:处理业务请求
processConnectRequest创建session
- 会使用sessionTracker生成sessionId、创建session对象
- 生成一个密码
- 提交一个createSession类型Request并提交给业务处理器
- long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
- // 生成sessionId、创建session对象
- long sessionId = sessionTracker.createSession(timeout);
- // 生成密码
- Random r = new Random(sessionId ^ superSecret);
- r.nextBytes(passwd);
- // 提交createSession类型Request
- CreateSessionTxn txn = new CreateSessionTxn(timeout);
- cnxn.setSessionId(sessionId);
- Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
- submitRequest(si);
- return sessionId;
- }
复制代码 processPacket处理业务请求
- 封装Request
- 验证largeRequest
- 提交业务层处理器
- Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
- int length = request.limit();
- if (isLargeRequest(length)) {
- // checkRequestSize will throw IOException if request is rejected
- checkRequestSizeWhenMessageReceived(length);
- si.setLargeRequestSize(length);
- }
- si.setOwner(ServerCnxn.me);
- submitRequest(si);
复制代码 FollowerRequestProcessor处理器
在follower端,客户端请求会由FollowerRequestProcessor处理:
- 把请求提交下游CommitProcessor处理器
- 写请求转发给leader处理
- 读请求经过CommitProcessor直接转发给FinalRequestProcessor处理器,直接查询数据返回给客户端
- public void run() {
- try {
- while (!finished) {
- Request request = queuedRequests.take();
- // Screen quorum requests against ACLs first 略
- // 转发给CommitProcessor处理器
- // 提交到queuedRequests队列
- // 写请求还会提交到queuedWriteRequests队列
- maybeSendRequestToNextProcessor(request);
- // ...
- // 写请求需要转发给leader处理
- switch (request.type) {
- case OpCode.sync:
- zks.pendingSyncs.add(request); // 待同步命令
- zks.getFollower().request(request);
- break;
- case OpCode.create:
- case OpCode.create2:
- case OpCode.createTTL:
- case OpCode.createContainer:
- case OpCode.delete:
- case OpCode.deleteContainer:
- case OpCode.setData:
- case OpCode.reconfig:
- case OpCode.setACL:
- case OpCode.multi:
- case OpCode.check:
- zks.getFollower().request(request);
- break;
- case OpCode.createSession:
- case OpCode.closeSession:
- if (!request.isLocalSession()) {
- zks.getFollower().request(request);
- }
- break;
- }
- }
- } catch (Exception e) {
- handleException(this.getName(), e);
- }
- }
复制代码 转发leader
- zks.getFollower().request(request);
复制代码 Learner转发请求:- void request(Request request) throws IOException {
- // 略
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream oa = new DataOutputStream(baos);
- oa.writeLong(request.sessionId); // sessionId
- oa.writeInt(request.cxid); // 客户端xid
- oa.writeInt(request.type); // 业务类型
- byte[] payload = request.readRequestBytes(); // 请求体
- if (payload != null) {
- oa.write(payload);
- }
- oa.close();
- // 封装REQUEST数据包
- QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
- writePacket(qp, true); // 通过网络发给leader服务器
- }
复制代码 leader处理follower请求
LearnerHandler接收REQUEST请求
- case Leader.REQUEST:
- bb = ByteBuffer.wrap(qp.getData());
- sessionId = bb.getLong(); // 解析请求信息
- cxid = bb.getInt();
- type = bb.getInt();
- bb = bb.slice();
- Request si;
- if (type == OpCode.sync) {
- si = new LearnerSyncRequest(
- this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
- } else {
- si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
- }
- si.setOwner(this); // 用来判断请求来自follower
- learnerMaster.submitLearnerRequest(si); // 提交给业务处理器
- requestsReceived.incrementAndGet();
复制代码 submitLearnerRequest提交业务处理器:- public void submitLearnerRequest(Request si) {
- zk.submitLearnerRequest(si);
- }
复制代码 LeaderZooKeeperServer提交业务处理器:- public void submitLearnerRequest(Request request) {
- // 提交给PrepRequestProcessor处理器
- prepRequestProcessor.processRequest(request);
- }
复制代码 从此处开始走leader处理写请求流程。
leader处理写请求流程回顾
- PrepRequestProcessor - 做事务设置
- ProposalRequestProcessor - 发起proposal,将Request转发给SyncRequestProcessor写事务log、本地ack
- CommitProcessor - 读请求直接调用下游处理器,写请求需要等待足够的ack之后commit再调用下游RequestProcessor处理器
- ToBeAppliedRequestProcessor - 维护toBeApplied列表
- FinalRequestProcessor - 把事务应用到ZKDatabase,提供查询功能,返回响应
follower处理leader数据
在follower中,Follower使用processPacket方法处理来自leader的数据包,此处看一下PROPOSAL和COMMIT的逻辑。
PROPOSAL数据包
- fzk.logRequest(hdr, txn, digest);
复制代码 logRequest会使用syncProcessor将事务写入到txnlog文件,之后调用SendAckRequestProcessor处理器给leader发ack数据包。
leader收到超过半数的ack之后会发COMMIT数据包让各个节点将事务应用到ZKDatabase中。
COMMIT数据包
- fzk.commit(qp.getZxid());
复制代码 CommitProcessor处理器会将其提交到committedRequests队列,之后客户端Request会继续向下游FinalRequestProcessor处理器传递。
FinalRequestProcessor处理器
- 把事务应用到ZKDatabase中
- 提供查询功能
- 给客户端返回响应
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |