zookeeper源码(10)node增删改查及监听

打印 上一主题 下一主题

主题 840|帖子 840|积分 2522

本文将从leader处置惩罚器入手,详细分析node的增删改查流程及监听器原理。
回首数据读写流程

leader


  • ZookeeperServer.processPacket封装Request并提交给业务处置惩罚器
  • LeaderRequestProcessor做本地事务升级
  • PrepRequestProcessor做事务准备
  • ProposalRequestProcessor事务操纵发proposal给follower节点,持久化到log文件
  • CommitProcessor读请求直接转发给鄙俚处置惩罚器,事务操纵等待到了quorum状态转发给鄙俚处置惩罚器
  • ToBeAppliedRequestProcessor清算toBeApplied集
  • FinalRequestProcessor将事务写到ZKDatabase中,给客户端发相应
follower


  • 处置惩罚PROPOSAL:使用SyncRequestProcessor处置惩罚器持久化,之后SendAckRequestProcessor给leader发ack
  • 处置惩罚COMMIT:提交给CommitProcessor处置惩罚器,之后FinalRequestProcessor将事务写到ZKDatabase中
创建node

涉及create、create2、createContainer、createTTL等下令。
PrepRequestProcessor事务准备

反序列化请求参数
  1. switch (request.type) {
  2. case OpCode.createContainer:
  3. case OpCode.create:
  4. case OpCode.create2:
  5.     CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
  6.     pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
  7.     break;
  8. case OpCode.createTTL:
  9.     // 默认不支持ttl
  10.     CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
  11.     pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
  12.     break;
  13. // ...
复制代码
CreateRequest封装创建node的参数:
  1. public class CreateRequest implements Record {
  2.   private String path;
  3.   private byte[] data;
  4.   private java.util.List<org.apache.zookeeper.data.ACL> acl;
  5.   private int flags;
  6. }
复制代码
CreateTTLRequest封装创建node加ttl的参数:
  1. public class CreateTTLRequest implements Record {
  2.   private String path;
  3.   private byte[] data;
  4.   private java.util.List<org.apache.zookeeper.data.ACL> acl;
  5.   private int flags;
  6.   private long ttl;
  7. }
复制代码
事务准备
  1. protected void pRequest2Txn(int type, long zxid, Request request, Record record)
  2.         throws KeeperException, IOException, RequestProcessorException {
  3.     // ...
  4.     switch (type) {
  5.       case OpCode.create:
  6.       case OpCode.create2:
  7.       case OpCode.createTTL:
  8.       case OpCode.createContainer: {
  9.         pRequest2TxnCreate(type, request, record);
  10.         break;
  11.       }
  12.     // ...
  13.     }
  14. }
  15. private void pRequest2TxnCreate(
  16.         int type, Request request, Record record) throws IOException, KeeperException {
  17.     int flags;
  18.     String path;
  19.     List<ACL> acl;
  20.     byte[] data;
  21.     long ttl;
  22.     if (type == OpCode.createTTL) {
  23.         CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
  24.         // 给flags等参数赋值
  25.     } else {
  26.         CreateRequest createRequest = (CreateRequest) record;
  27.         // 给flags等参数赋值
  28.         ttl = -1;
  29.     }
  30.     // CreateMode:
  31.     // PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL,
  32.     // CONTAINER, PERSISTENT_WITH_TTL, PERSISTENT_SEQUENTIAL_WITH_TTL
  33.     CreateMode createMode = CreateMode.fromFlag(flags);
  34.     // 验证临时节点、ttl参数、检查session
  35.     // 默认不支持ttl
  36.     validateCreateRequest(path, createMode, request, ttl);
  37.     String parentPath = validatePathForCreate(path, request.sessionId); // 父节点path
  38.     List<ACL> listACL = fixupACL(path, request.authInfo, acl); // 请求携带的权限
  39.     ChangeRecord parentRecord = getRecordForPath(parentPath); // 得到父节点
  40.     // 验证CREATE权限
  41.     zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
  42.     int parentCVersion = parentRecord.stat.getCversion();
  43.     if (createMode.isSequential()) { // 顺序节点
  44.         // 例如/users/admin0000000001
  45.         path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
  46.     }
  47.     validatePath(path, request.sessionId);
  48.     // 略
  49.     boolean ephemeralParent =
  50.         EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
  51.     // 父节点不可以是临时节点
  52.     int newCversion = parentRecord.stat.getCversion() + 1; // 父节点的childVersion++
  53.     // 检查字节限额
  54.     zks.checkQuota(path, null, data, OpCode.create);
  55.     // 不同类型创建对应的Txn对象
  56.     if (type == OpCode.createContainer) {
  57.         request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
  58.     } else if (type == OpCode.createTTL) {
  59.         request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
  60.     } else {
  61.         request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
  62.     }
  63.     TxnHeader hdr = request.getHdr();
  64.     long ephemeralOwner = 0;
  65.     if (createMode.isContainer()) {
  66.         ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
  67.     } else if (createMode.isTTL()) {
  68.         ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
  69.     } else if (createMode.isEphemeral()) {
  70.         ephemeralOwner = request.sessionId; // 临时节点使用sessionId
  71.     }
  72.     // czxid(created),mzxid(modified),ctime,mtime,version,cversion(childVersion),
  73.     // aversion(aclVersion),ephemeralOwner,pzxid(lastModifiedChildren)
  74.     StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
  75.     // 父节点
  76.     parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
  77.     parentRecord.childCount++;
  78.     parentRecord.stat.setCversion(newCversion);
  79.     parentRecord.stat.setPzxid(request.getHdr().getZxid());
  80.     parentRecord.precalculatedDigest = precalculateDigest(
  81.             DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
  82.     addChangeRecord(parentRecord);
  83.     // 新增节点
  84.     ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
  85.     nodeRecord.data = data;
  86.     nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
  87.     setTxnDigest(request, nodeRecord.precalculatedDigest);
  88.     addChangeRecord(nodeRecord);
  89. }
  90. protected void addChangeRecord(ChangeRecord c) {
  91.     synchronized (zks.outstandingChanges) {
  92.         zks.outstandingChanges.add(c);
  93.         zks.outstandingChangesForPath.put(c.path, c);
  94.     }
  95. }
复制代码
outstandingChanges生存未提交的事务变化,比如在天生顺序节点时需要使用cversion值,但是在事务提交到ZKDatabase之前,库里面的值是旧的,所以在上面的代码中,是从outstandingChanges查找节点,给cversion++后再天生顺序节点。
在事务提交之后,才会清算outstandingChanges集。
ProposalRequestProcessor发Proposal
  1. public void processRequest(Request request) throws RequestProcessorException {
  2.     if (request instanceof LearnerSyncRequest) { // sync命令流程,暂不分析
  3.         zks.getLeader().processSync((LearnerSyncRequest) request);
  4.     } else {
  5.         if (shouldForwardToNextProcessor(request)) {
  6.             nextProcessor.processRequest(request); // 提交给下游处理器
  7.         }
  8.         if (request.getHdr() != null) { // 事务操作需要发proposal并写磁盘
  9.             try {
  10.                 zks.getLeader().propose(request);
  11.             } catch (XidRolloverException e) {
  12.                 throw new RequestProcessorException(e.getMessage(), e);
  13.             }
  14.             // 把事务log写到文件中
  15.             // 之后通过AckRequestProcessor处理器给leader ack
  16.             syncProcessor.processRequest(request);
  17.         }
  18.     }
  19. }
复制代码
CommitProcessor提交事务
  1. public void processRequest(Request request) {
  2.     request.commitProcQueueStartTime = Time.currentElapsedTime();
  3.     queuedRequests.add(request); // 所有请求队列
  4.     if (needCommit(request)) { // 需要提交的请求进入到写队列
  5.         queuedWriteRequests.add(request);
  6.         numWriteQueuedRequests.incrementAndGet();
  7.     } else {
  8.         numReadQueuedRequests.incrementAndGet();
  9.     }
  10.     wakeup();
  11. }
复制代码
run方法对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给鄙俚的ToBeAppliedRequestProcessor处置惩罚器。
FinalRequestProcessor应用事务

该处置惩罚器位于处置惩罚器链的末了,负责将事务应用到ZKDatabase、查询数据、返反相应。
applyRequest

该方法将事务应用到ZKDatabase中:
  1. private ProcessTxnResult applyRequest(Request request) {
  2.     // 应用事务
  3.     ProcessTxnResult rc = zks.processTxn(request);
  4.     // closeSession
  5.     // metrics
  6.     return rc;
  7. }
复制代码
zks.processTxn负责处置惩罚session、处置惩罚事务、清算outstandingChanges集。重点看一下处置惩罚事务的步调。
processTxn
  1. public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
  2.     ProcessTxnResult rc = new ProcessTxnResult();
  3.     try {
  4.         rc.clientId = header.getClientId();
  5.         rc.cxid = header.getCxid();
  6.         rc.zxid = header.getZxid();
  7.         rc.type = header.getType();
  8.         rc.err = 0;
  9.         rc.multiResult = null;
  10.         switch (header.getType()) {
  11.         case OpCode.create:
  12.             CreateTxn createTxn = (CreateTxn) txn;
  13.             rc.path = createTxn.getPath();
  14.             createNode(
  15.                 createTxn.getPath(),
  16.                 createTxn.getData(),
  17.                 createTxn.getAcl(),
  18.                 createTxn.getEphemeral() ? header.getClientId() : 0,
  19.                 createTxn.getParentCVersion(),
  20.                 header.getZxid(),
  21.                 header.getTime(),
  22.                 null);
  23.             break;
  24.         case OpCode.create2:
  25.             CreateTxn create2Txn = (CreateTxn) txn;
  26.             rc.path = create2Txn.getPath();
  27.             Stat stat = new Stat();
  28.             createNode(
  29.                 create2Txn.getPath(),
  30.                 create2Txn.getData(),
  31.                 create2Txn.getAcl(),
  32.                 create2Txn.getEphemeral() ? header.getClientId() : 0,
  33.                 create2Txn.getParentCVersion(),
  34.                 header.getZxid(),
  35.                 header.getTime(),
  36.                 stat);
  37.             rc.stat = stat;
  38.             break;
  39.         case OpCode.createTTL:
  40.             CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
  41.             rc.path = createTtlTxn.getPath();
  42.             stat = new Stat();
  43.             createNode(
  44.                 createTtlTxn.getPath(),
  45.                 createTtlTxn.getData(),
  46.                 createTtlTxn.getAcl(),
  47.                 EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
  48.                 createTtlTxn.getParentCVersion(),
  49.                 header.getZxid(),
  50.                 header.getTime(),
  51.                 stat);
  52.             rc.stat = stat;
  53.             break;
  54.         case OpCode.createContainer:
  55.             CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
  56.             rc.path = createContainerTxn.getPath();
  57.             stat = new Stat();
  58.             createNode(
  59.                 createContainerTxn.getPath(),
  60.                 createContainerTxn.getData(),
  61.                 createContainerTxn.getAcl(),
  62.                 EphemeralType.CONTAINER_EPHEMERAL_OWNER,
  63.                 createContainerTxn.getParentCVersion(),
  64.                 header.getZxid(),
  65.                 header.getTime(),
  66.                 stat);
  67.             rc.stat = stat;
  68.             break;
  69.         // ...
  70.         }
  71.     }
  72.     // ...
  73. }
复制代码
createNode
  1. public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws NoNodeException, NodeExistsException {
  2.     int lastSlash = path.lastIndexOf('/');
  3.     String parentName = path.substring(0, lastSlash);
  4.     String childName = path.substring(lastSlash + 1);
  5.     StatPersisted stat = createStat(zxid, time, ephemeralOwner);
  6.     DataNode parent = nodes.get(parentName); // 父节点必须存在
  7.     if (parent == null) {
  8.         throw new NoNodeException();
  9.     }
  10.     synchronized (parent) {
  11.         Long acls = aclCache.convertAcls(acl);
  12.         Set<String> children = parent.getChildren();
  13.         if (children.contains(childName)) { // 节点不能存在
  14.             throw new NodeExistsException();
  15.         }
  16.         nodes.preChange(parentName, parent);
  17.         if (parentCVersion == -1) { // childVersion++
  18.             parentCVersion = parent.stat.getCversion();
  19.             parentCVersion++;
  20.         }
  21.         if (parentCVersion > parent.stat.getCversion()) {
  22.             parent.stat.setCversion(parentCVersion);
  23.             parent.stat.setPzxid(zxid);
  24.         }
  25.         // 创建node
  26.         DataNode child = new DataNode(data, acls, stat);
  27.         parent.addChild(childName);
  28.         nodes.postChange(parentName, parent);
  29.         nodeDataSize.addAndGet(getNodeSize(path, child.data));
  30.         nodes.put(path, child); // 维护NodeHashMap
  31.         // 处理临时节点
  32.         EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
  33.         if (ephemeralType == EphemeralType.CONTAINER) {
  34.             containers.add(path);
  35.         } else if (ephemeralType == EphemeralType.TTL) {
  36.             ttls.add(path);
  37.         } else if (ephemeralOwner != 0) {
  38.             HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
  39.             synchronized (list) {
  40.                 list.add(path);
  41.             }
  42.         }
  43.         // 返回节点stat
  44.         if (outputStat != null) {
  45.             child.copyStat(outputStat);
  46.         }
  47.     }
  48.     // now check if its one of the zookeeper node child 略
  49.     // 触发NodeCreated监听
  50.     dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
  51.     // 触发父节点的NodeChildrenChanged监听
  52.     childWatches.triggerWatch(
  53.         parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
  54. }
复制代码
返反相应
  1. case OpCode.create: {
  2.     lastOp = "CREA";
  3.     rsp = new CreateResponse(rc.path); // 创建Response
  4.     err = Code.get(rc.err); // processTxn的err
  5.     requestPathMetricsCollector.registerRequest(request.type, rc.path);
  6.     break;
  7. }
  8. case OpCode.create2:
  9. case OpCode.createTTL:
  10. case OpCode.createContainer: {
  11.     lastOp = "CREA";
  12.     rsp = new Create2Response(rc.path, rc.stat); // 创建Response
  13.     err = Code.get(rc.err); // processTxn的err
  14.     requestPathMetricsCollector.registerRequest(request.type, rc.path);
  15.     break;
  16. }
复制代码
最后会使用cnxn把相应返回给客户端:
  1. ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
  2. cnxn.sendResponse(hdr, rsp, "response");
复制代码
EphemeralType


  • VOID
  • NORMAL
  • CONTAINER
  • TTL
ephemeralOwner标识znode是暂时的,以及哪个会话创建了该节点。通过zookeeper.extendedTypesEnabled属性可以启用ttl节点等扩展功能。ephemeralOwner的"特殊位"用于体现启用了哪个功能,而ephemeral Owner的剩余位是特定于功能的。
当zookeeper.extendedTypesEnabled为true时,将启用扩展类型。扩展ephemeralOwner填充高8位(0xff00000000000000L),高8位之后的两个字节用于体现ephemeralOwner扩展特征,剩余5个字节由该功能指定,可用于所需的任何目的。
目前,唯一扩展功能是TTL节点,扩展特征值为0。对于TTL节点,ephemeralOwner具有0xff的高8位,接下来2个字节是0,后面的5个字节是以毫秒为单位的ttl值。因此ttl值为1毫秒的ephemeralOwner是0xff00000000000001。
要添加新的扩展功能:

  • 向枚举添加新名称
  • 在ttl之后,定义常量extended_BIT_xxxx,即0x0001
  • 通过静态初始值设定项向extendedFeatureMap添加映射
注意:从技能上讲,容器节点也是扩展类型,但由于它是在该功能之前实现的,因此被特殊体现。根据定义,只有高位集(0x8000000000000000L)的暂时全部者是容器节点(无论是否启用扩展类型)。
ttl节点


  • 默认不开启,使用
  • Added in 3.5.3
  • 创建PERSISTENT或PERSISTENT_SEQUENTIAL节点时,可以设置以毫秒为单位的ttl。如果znode没有在ttl内修改,而且没有子节点,它将在将来的某个时候成为服务器删除的候选节点
container节点


  • Added in 3.5.3
  • 当container节点的最后一个子节点被删除时,该container节点将成为服务器在未来某个时候删除的候选节点
Stat类

封装节点属性,字段如下:

  • czxid The zxid of the change that caused this znode to be created.
  • mzxid The zxid of the change that last modified this znode.
  • pzxid The zxid of the change that last modified children of this znode.
  • ctime The time in milliseconds from epoch when this znode was created.
  • mtime The time in milliseconds from epoch when this znode was last modified.
  • version The number of changes to the data of this znode.
  • cversion The number of changes to the children of this znode.
  • aversion The number of changes to the ACL of this znode.
  • ephemeralOwner The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
  • dataLength The length of the data field of this znode.
  • numChildren The number of children of this znode.
删除node

涉及delete、deleteContainer等下令。
PrepRequestProcessor事务准备

反序列化请求参数
  1. private void pRequestHelper(Request request) {
  2.     try {
  3.         switch (request.type) {
  4.         // ...
  5.         case OpCode.deleteContainer:
  6.             DeleteContainerRequest deleteContainerRequest =
  7.                 request.readRequestRecord(DeleteContainerRequest::new);
  8.             pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
  9.             break;
  10.         case OpCode.delete:
  11.             DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
  12.             pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
  13.             break;
  14.         }
  15.         // ...
  16.     }
  17. }
复制代码
DeleteContainerRequest类:
  1. public class DeleteContainerRequest implements Record {
  2.     private String path;
  3. }
复制代码
DeleteRequest类:
  1. public class DeleteRequest implements Record {
  2.   private String path;
  3.   private int version;
  4. }
复制代码
事务准备
  1. protected void pRequest2Txn(int type, long zxid, Request request,
  2.              Record record) throws KeeperException, IOException, RequestProcessorException {
  3.     // 略
  4.     switch (type) {
  5.     // 略
  6.     case OpCode.deleteContainer: {
  7.         DeleteContainerRequest txn = (DeleteContainerRequest) record;
  8.         String path = txn.getPath();
  9.         String parentPath = getParentPathAndValidate(path);
  10.         ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
  11.         if (nodeRecord.childCount > 0) { // 有子节点不允许删除
  12.             throw new KeeperException.NotEmptyException(path);
  13.         }
  14.         if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
  15.             throw new KeeperException.BadVersionException(path);
  16.         }
  17.         ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
  18.         request.setTxn(new DeleteTxn(path));
  19.         // addChangeRecord 略
  20.         break;
  21.     }
  22.     case OpCode.delete:
  23.         zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
  24.         DeleteRequest deleteRequest = (DeleteRequest) record;
  25.         String path = deleteRequest.getPath();
  26.         String parentPath = getParentPathAndValidate(path);
  27.         ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
  28.         // 检查DELETE权限
  29.         zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
  30.         ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
  31.         checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // 检查version
  32.         if (nodeRecord.childCount > 0) { // 有子节点不允许删除
  33.             throw new KeeperException.NotEmptyException(path);
  34.         }
  35.         request.setTxn(new DeleteTxn(path));
  36.         // addChangeRecord 略
  37.         break;
  38.     }
  39. }
复制代码
FinalRequestProcessor应用事务

processTxn
  1. public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
  2.     ProcessTxnResult rc = new ProcessTxnResult();
  3.     try {
  4.         rc.clientId = header.getClientId();
  5.         rc.cxid = header.getCxid();
  6.         rc.zxid = header.getZxid();
  7.         rc.type = header.getType();
  8.         rc.err = 0;
  9.         rc.multiResult = null;
  10.         switch (header.getType()) {
  11.         // ...
  12.         case OpCode.delete:
  13.         case OpCode.deleteContainer:
  14.             DeleteTxn deleteTxn = (DeleteTxn) txn;
  15.             rc.path = deleteTxn.getPath();
  16.             deleteNode(deleteTxn.getPath(), header.getZxid());
  17.             break;
  18.         }
  19.         // ...
  20.     }
  21. }
复制代码
deleteNode
  1. public void deleteNode(String path, long zxid) throws NoNodeException {
  2.     int lastSlash = path.lastIndexOf('/');
  3.     String parentName = path.substring(0, lastSlash);
  4.     String childName = path.substring(lastSlash + 1);
  5.     DataNode parent = nodes.get(parentName);
  6.     if (parent == null) { // 获取父节点且必须存在
  7.         throw new NoNodeException();
  8.     }
  9.     synchronized (parent) {
  10.         nodes.preChange(parentName, parent);
  11.         parent.removeChild(childName);
  12.         if (zxid > parent.stat.getPzxid()) {
  13.             parent.stat.setPzxid(zxid); // The zxid of the change that last modified children of this znode
  14.         }
  15.         nodes.postChange(parentName, parent);
  16.     }
  17.     DataNode node = nodes.get(path); // 获取删除节点
  18.     if (node == null) {
  19.         throw new NoNodeException();
  20.     }
  21.     nodes.remove(path); // 从NodeHashMap删除
  22.     synchronized (node) { // 移除权限
  23.         aclCache.removeUsage(node.acl);
  24.         nodeDataSize.addAndGet(-getNodeSize(path, node.data));
  25.     }
  26.     // 移除临时节点、container、ttl等缓存
  27.     synchronized (parent) {
  28.         long owner = node.stat.getEphemeralOwner();
  29.         EphemeralType ephemeralType = EphemeralType.get(owner);
  30.         if (ephemeralType == EphemeralType.CONTAINER) {
  31.             containers.remove(path);
  32.         } else if (ephemeralType == EphemeralType.TTL) {
  33.             ttls.remove(path);
  34.         } else if (owner != 0) {
  35.             Set<String> nodes = ephemerals.get(owner);
  36.             if (nodes != null) {
  37.                 synchronized (nodes) {
  38.                     nodes.remove(path);
  39.                 }
  40.             }
  41.         }
  42.     }
  43.     // 略
  44.     // 触发NodeDeleted监听
  45.     WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
  46.     childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
  47.     // 触发父节点的NodeChildrenChanged监听
  48.     childWatches.triggerWatch(
  49.         "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
  50. }
复制代码
设置node数据

PrepRequestProcessor事务准备

反序列化请求参数
  1. private void pRequestHelper(Request request) {
  2.     try {
  3.         switch (request.type) {
  4.         // ...
  5.         case OpCode.setData:
  6.             SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
  7.             pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
  8.             break;
  9.         // other case
  10.         }
  11.     }
  12.     // ...
  13. }
复制代码
SetDataRequest类:
  1. public class SetDataRequest implements Record {
  2.   private String path;
  3.   private byte[] data;
  4.   private int version;
  5. }
复制代码
事务准备
  1. protected void pRequest2Txn(int type, long zxid, Request request,
  2.           Record record) throws KeeperException, IOException, RequestProcessorException {
  3.     // 略
  4.     switch (type) {
  5.     // ...
  6.     case OpCode.setData:
  7.         zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
  8.         SetDataRequest setDataRequest = (SetDataRequest) record;
  9.         path = setDataRequest.getPath();
  10.         validatePath(path, request.sessionId);
  11.         nodeRecord = getRecordForPath(path); // 获取节点对象
  12.         // 检查权限
  13.         zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
  14.         // 检查字节限额
  15.         zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
  16.         // version++
  17.         int newVersion = checkAndIncVersion(
  18.             nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
  19.         // 创建SetDataTxn
  20.         request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
  21.         // addChangeRecord
  22.         break;
  23.     // other case
  24.     }
  25. }
复制代码
FinalRequestProcessor应用事务

processTxn
  1. public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
  2.     ProcessTxnResult rc = new ProcessTxnResult();
  3.     try {
  4.         rc.clientId = header.getClientId();
  5.         rc.cxid = header.getCxid();
  6.         rc.zxid = header.getZxid();
  7.         rc.type = header.getType();
  8.         rc.err = 0;
  9.         rc.multiResult = null;
  10.         switch (header.getType()) {
  11.         // other case
  12.         case OpCode.setData:
  13.             SetDataTxn setDataTxn = (SetDataTxn) txn;
  14.             rc.path = setDataTxn.getPath();
  15.             rc.stat = setData(
  16.                 setDataTxn.getPath(),
  17.                 setDataTxn.getData(),
  18.                 setDataTxn.getVersion(),
  19.                 header.getZxid(),
  20.                 header.getTime());
  21.             break;
  22.         // other case
  23.         }
  24.     }
  25.     // ...
  26. }
复制代码
setData
  1. public Stat setData(String path, byte[] data, int version,
  2.                     long zxid, long time) throws NoNodeException {
  3.     Stat s = new Stat();
  4.     DataNode n = nodes.get(path);
  5.     if (n == null) { // 检查节点存在
  6.         throw new NoNodeException();
  7.     }
  8.     byte[] lastData;
  9.     synchronized (n) {
  10.         lastData = n.data;
  11.         nodes.preChange(path, n);
  12.         n.data = data; // 节点数据
  13.         n.stat.setMtime(time); // 修改时间
  14.         n.stat.setMzxid(zxid); // 修改zxid
  15.         n.stat.setVersion(version); // 版本
  16.         n.copyStat(s);
  17.         nodes.postChange(path, n);
  18.     }
  19.     // 略
  20.     // 触发NodeDataChanged监听
  21.     dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
  22.     return s;
  23. }
复制代码
查询node数据

PrepRequestProcessor验证session

经过该处置惩罚器时,只做session验证。
之后的ProposalRequestProcessor、CommitProcessor、ToBeAppliedRequestProcessor都是直接通过,不做事务处置惩罚,直接交给FinalRequestProcessor处置惩罚器查询数据、发送相应。
FinalRequestProcessor查询数据

使用handleGetDataRequest方法查询数据:
  1. private Record handleGetDataRequest(
  2.         Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
  3.     GetDataRequest getDataRequest = (GetDataRequest) request;
  4.     String path = getDataRequest.getPath();
  5.     DataNode n = zks.getZKDatabase().getNode(path);
  6.     if (n == null) {
  7.         throw new KeeperException.NoNodeException();
  8.     }
  9.     // 检查权限
  10.     zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
  11.     Stat stat = new Stat();
  12.     // 查询数据
  13.     // 如果watcher参数不为null会给path添加一个监听器
  14.     byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
  15.     return new GetDataResponse(b, stat);
  16. }
复制代码
GetDataRequest类:
  1. public class GetDataRequest implements Record {
  2.   private String path;
  3.   private boolean watch;
  4. }
复制代码
节点监听

addWatch下令
  1. case OpCode.addWatch: {
  2.     lastOp = "ADDW";
  3.     AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
  4.     // 最终使用DataTree的addWatch方法注册监听器
  5.     // cnxn是ServerCnxn对象,实现了Watcher接口
  6.     zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
  7.     rsp = new ErrorResponse(0);
  8.     break;
  9. }
复制代码
DataTree的addWatch方法
  1. public void addWatch(String basePath, Watcher watcher, int mode) {
  2.     // PERSISTENT|PERSISTENT_RECURSIVE
  3.     WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
  4.     // dataWatches和childWatches是WatchManager类型对象
  5.     dataWatches.addWatch(basePath, watcher, watcherMode);
  6.     if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
  7.         childWatches.addWatch(basePath, watcher, watcherMode);
  8.     }
  9. }
复制代码
WatcherMode枚举
  1. public enum WatcherMode {
  2.     STANDARD(false, false),
  3.     PERSISTENT(true, false), // persistent=0
  4.     PERSISTENT_RECURSIVE(true, true), // persistentRecursive=1
  5.     ;
  6. }
复制代码
PERSISTENT和PERSISTENT_RECURSIVE是3.6.0版本新增的特性。
Watcher接口

实现类需要实现process方法:
  1. void process(WatchedEvent event);
复制代码
WatchedEvent代表一个监听变乱:
  1. public class WatchedEvent {
  2.     // 当前zk服务器的状态
  3.     private final KeeperState keeperState;
  4.     // NodeCreated|NodeDeleted|NodeDataChanged|NodeChildrenChanged等
  5.     private final EventType eventType;
  6.     private final String path;
  7.     private final long zxid;
  8. }
复制代码
重要的实现类:

  • NIOServerCnxn
  • NettyServerCnxn
WatchManager类

This class manages watches. It allows watches to be associated with a string and removes watchers and their watches in addition to managing triggers.
核心字段:
  1. // path -> Watcher集
  2. private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
  3. // Watcher -> path->WatchStats(PERSISTENT|STANDARD + PERSISTENT|PERSISTENT_RECURSIVE等)
  4. private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();
复制代码
WatchStats类

[code]public final class WatchStats {    private static final WatchStats[] WATCH_STATS = new WatchStats[] {            new WatchStats(0), // NONE            new WatchStats(1), // STANDARD            new WatchStats(2), // PERSISTENT            new WatchStats(3), // STANDARD + PERSISTENT            new WatchStats(4), // PERSISTENT_RECURSIVE            new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE            new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE            new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE    };    public static final WatchStats NONE = WATCH_STATS[0];    private final int flags;    private WatchStats(int flags) {        this.flags = flags;    }    private static int modeToFlag(WatcherMode mode) {        // mode = STANDARD; return 1
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

祗疼妳一个

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表