本文将从leader处置惩罚器入手,详细分析node的增删改查流程及监听器原理。
回首数据读写流程
leader
- ZookeeperServer.processPacket封装Request并提交给业务处置惩罚器
- LeaderRequestProcessor做本地事务升级
- PrepRequestProcessor做事务准备
- ProposalRequestProcessor事务操纵发proposal给follower节点,持久化到log文件
- CommitProcessor读请求直接转发给鄙俚处置惩罚器,事务操纵等待到了quorum状态转发给鄙俚处置惩罚器
- ToBeAppliedRequestProcessor清算toBeApplied集
- FinalRequestProcessor将事务写到ZKDatabase中,给客户端发相应
follower
- 处置惩罚PROPOSAL:使用SyncRequestProcessor处置惩罚器持久化,之后SendAckRequestProcessor给leader发ack
- 处置惩罚COMMIT:提交给CommitProcessor处置惩罚器,之后FinalRequestProcessor将事务写到ZKDatabase中
创建node
涉及create、create2、createContainer、createTTL等下令。
PrepRequestProcessor事务准备
反序列化请求参数
- switch (request.type) {
- case OpCode.createContainer:
- case OpCode.create:
- case OpCode.create2:
- CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
- pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
- break;
- case OpCode.createTTL:
- // 默认不支持ttl
- CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
- pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
- break;
- // ...
复制代码 CreateRequest封装创建node的参数:- public class CreateRequest implements Record {
- private String path;
- private byte[] data;
- private java.util.List<org.apache.zookeeper.data.ACL> acl;
- private int flags;
- }
复制代码 CreateTTLRequest封装创建node加ttl的参数:- public class CreateTTLRequest implements Record {
- private String path;
- private byte[] data;
- private java.util.List<org.apache.zookeeper.data.ACL> acl;
- private int flags;
- private long ttl;
- }
复制代码 事务准备
- protected void pRequest2Txn(int type, long zxid, Request request, Record record)
- throws KeeperException, IOException, RequestProcessorException {
- // ...
- switch (type) {
- case OpCode.create:
- case OpCode.create2:
- case OpCode.createTTL:
- case OpCode.createContainer: {
- pRequest2TxnCreate(type, request, record);
- break;
- }
- // ...
- }
- }
- private void pRequest2TxnCreate(
- int type, Request request, Record record) throws IOException, KeeperException {
- int flags;
- String path;
- List<ACL> acl;
- byte[] data;
- long ttl;
- if (type == OpCode.createTTL) {
- CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
- // 给flags等参数赋值
- } else {
- CreateRequest createRequest = (CreateRequest) record;
- // 给flags等参数赋值
- ttl = -1;
- }
- // CreateMode:
- // PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL,
- // CONTAINER, PERSISTENT_WITH_TTL, PERSISTENT_SEQUENTIAL_WITH_TTL
- CreateMode createMode = CreateMode.fromFlag(flags);
- // 验证临时节点、ttl参数、检查session
- // 默认不支持ttl
- validateCreateRequest(path, createMode, request, ttl);
- String parentPath = validatePathForCreate(path, request.sessionId); // 父节点path
- List<ACL> listACL = fixupACL(path, request.authInfo, acl); // 请求携带的权限
- ChangeRecord parentRecord = getRecordForPath(parentPath); // 得到父节点
- // 验证CREATE权限
- zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
- int parentCVersion = parentRecord.stat.getCversion();
- if (createMode.isSequential()) { // 顺序节点
- // 例如/users/admin0000000001
- path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
- }
- validatePath(path, request.sessionId);
- // 略
- boolean ephemeralParent =
- EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
- // 父节点不可以是临时节点
- int newCversion = parentRecord.stat.getCversion() + 1; // 父节点的childVersion++
- // 检查字节限额
- zks.checkQuota(path, null, data, OpCode.create);
- // 不同类型创建对应的Txn对象
- if (type == OpCode.createContainer) {
- request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
- } else if (type == OpCode.createTTL) {
- request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
- } else {
- request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
- }
- TxnHeader hdr = request.getHdr();
- long ephemeralOwner = 0;
- if (createMode.isContainer()) {
- ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
- } else if (createMode.isTTL()) {
- ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
- } else if (createMode.isEphemeral()) {
- ephemeralOwner = request.sessionId; // 临时节点使用sessionId
- }
- // czxid(created),mzxid(modified),ctime,mtime,version,cversion(childVersion),
- // aversion(aclVersion),ephemeralOwner,pzxid(lastModifiedChildren)
- StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
- // 父节点
- parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
- parentRecord.childCount++;
- parentRecord.stat.setCversion(newCversion);
- parentRecord.stat.setPzxid(request.getHdr().getZxid());
- parentRecord.precalculatedDigest = precalculateDigest(
- DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
- addChangeRecord(parentRecord);
- // 新增节点
- ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
- nodeRecord.data = data;
- nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
- setTxnDigest(request, nodeRecord.precalculatedDigest);
- addChangeRecord(nodeRecord);
- }
- protected void addChangeRecord(ChangeRecord c) {
- synchronized (zks.outstandingChanges) {
- zks.outstandingChanges.add(c);
- zks.outstandingChangesForPath.put(c.path, c);
- }
- }
复制代码 outstandingChanges生存未提交的事务变化,比如在天生顺序节点时需要使用cversion值,但是在事务提交到ZKDatabase之前,库里面的值是旧的,所以在上面的代码中,是从outstandingChanges查找节点,给cversion++后再天生顺序节点。
在事务提交之后,才会清算outstandingChanges集。
ProposalRequestProcessor发Proposal
- public void processRequest(Request request) throws RequestProcessorException {
- if (request instanceof LearnerSyncRequest) { // sync命令流程,暂不分析
- zks.getLeader().processSync((LearnerSyncRequest) request);
- } else {
- if (shouldForwardToNextProcessor(request)) {
- nextProcessor.processRequest(request); // 提交给下游处理器
- }
- if (request.getHdr() != null) { // 事务操作需要发proposal并写磁盘
- try {
- zks.getLeader().propose(request);
- } catch (XidRolloverException e) {
- throw new RequestProcessorException(e.getMessage(), e);
- }
- // 把事务log写到文件中
- // 之后通过AckRequestProcessor处理器给leader ack
- syncProcessor.processRequest(request);
- }
- }
- }
复制代码 CommitProcessor提交事务
- public void processRequest(Request request) {
- request.commitProcQueueStartTime = Time.currentElapsedTime();
- queuedRequests.add(request); // 所有请求队列
- if (needCommit(request)) { // 需要提交的请求进入到写队列
- queuedWriteRequests.add(request);
- numWriteQueuedRequests.incrementAndGet();
- } else {
- numReadQueuedRequests.incrementAndGet();
- }
- wakeup();
- }
复制代码 run方法对比queuedRequests、queuedWriteRequests、committedRequests这几个队列,将提交成功的请求或读请求转发给鄙俚的ToBeAppliedRequestProcessor处置惩罚器。
FinalRequestProcessor应用事务
该处置惩罚器位于处置惩罚器链的末了,负责将事务应用到ZKDatabase、查询数据、返反相应。
applyRequest
该方法将事务应用到ZKDatabase中:- private ProcessTxnResult applyRequest(Request request) {
- // 应用事务
- ProcessTxnResult rc = zks.processTxn(request);
- // closeSession
- // metrics
- return rc;
- }
复制代码 zks.processTxn负责处置惩罚session、处置惩罚事务、清算outstandingChanges集。重点看一下处置惩罚事务的步调。
processTxn
- public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
- ProcessTxnResult rc = new ProcessTxnResult();
- try {
- rc.clientId = header.getClientId();
- rc.cxid = header.getCxid();
- rc.zxid = header.getZxid();
- rc.type = header.getType();
- rc.err = 0;
- rc.multiResult = null;
- switch (header.getType()) {
- case OpCode.create:
- CreateTxn createTxn = (CreateTxn) txn;
- rc.path = createTxn.getPath();
- createNode(
- createTxn.getPath(),
- createTxn.getData(),
- createTxn.getAcl(),
- createTxn.getEphemeral() ? header.getClientId() : 0,
- createTxn.getParentCVersion(),
- header.getZxid(),
- header.getTime(),
- null);
- break;
- case OpCode.create2:
- CreateTxn create2Txn = (CreateTxn) txn;
- rc.path = create2Txn.getPath();
- Stat stat = new Stat();
- createNode(
- create2Txn.getPath(),
- create2Txn.getData(),
- create2Txn.getAcl(),
- create2Txn.getEphemeral() ? header.getClientId() : 0,
- create2Txn.getParentCVersion(),
- header.getZxid(),
- header.getTime(),
- stat);
- rc.stat = stat;
- break;
- case OpCode.createTTL:
- CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
- rc.path = createTtlTxn.getPath();
- stat = new Stat();
- createNode(
- createTtlTxn.getPath(),
- createTtlTxn.getData(),
- createTtlTxn.getAcl(),
- EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
- createTtlTxn.getParentCVersion(),
- header.getZxid(),
- header.getTime(),
- stat);
- rc.stat = stat;
- break;
- case OpCode.createContainer:
- CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
- rc.path = createContainerTxn.getPath();
- stat = new Stat();
- createNode(
- createContainerTxn.getPath(),
- createContainerTxn.getData(),
- createContainerTxn.getAcl(),
- EphemeralType.CONTAINER_EPHEMERAL_OWNER,
- createContainerTxn.getParentCVersion(),
- header.getZxid(),
- header.getTime(),
- stat);
- rc.stat = stat;
- break;
- // ...
- }
- }
- // ...
- }
复制代码 createNode
- public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws NoNodeException, NodeExistsException {
- int lastSlash = path.lastIndexOf('/');
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- StatPersisted stat = createStat(zxid, time, ephemeralOwner);
- DataNode parent = nodes.get(parentName); // 父节点必须存在
- if (parent == null) {
- throw new NoNodeException();
- }
- synchronized (parent) {
- Long acls = aclCache.convertAcls(acl);
- Set<String> children = parent.getChildren();
- if (children.contains(childName)) { // 节点不能存在
- throw new NodeExistsException();
- }
- nodes.preChange(parentName, parent);
- if (parentCVersion == -1) { // childVersion++
- parentCVersion = parent.stat.getCversion();
- parentCVersion++;
- }
- if (parentCVersion > parent.stat.getCversion()) {
- parent.stat.setCversion(parentCVersion);
- parent.stat.setPzxid(zxid);
- }
- // 创建node
- DataNode child = new DataNode(data, acls, stat);
- parent.addChild(childName);
- nodes.postChange(parentName, parent);
- nodeDataSize.addAndGet(getNodeSize(path, child.data));
- nodes.put(path, child); // 维护NodeHashMap
- // 处理临时节点
- EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
- if (ephemeralType == EphemeralType.CONTAINER) {
- containers.add(path);
- } else if (ephemeralType == EphemeralType.TTL) {
- ttls.add(path);
- } else if (ephemeralOwner != 0) {
- HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
- synchronized (list) {
- list.add(path);
- }
- }
- // 返回节点stat
- if (outputStat != null) {
- child.copyStat(outputStat);
- }
- }
- // now check if its one of the zookeeper node child 略
- // 触发NodeCreated监听
- dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
- // 触发父节点的NodeChildrenChanged监听
- childWatches.triggerWatch(
- parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
- }
复制代码 返反相应
- case OpCode.create: {
- lastOp = "CREA";
- rsp = new CreateResponse(rc.path); // 创建Response
- err = Code.get(rc.err); // processTxn的err
- requestPathMetricsCollector.registerRequest(request.type, rc.path);
- break;
- }
- case OpCode.create2:
- case OpCode.createTTL:
- case OpCode.createContainer: {
- lastOp = "CREA";
- rsp = new Create2Response(rc.path, rc.stat); // 创建Response
- err = Code.get(rc.err); // processTxn的err
- requestPathMetricsCollector.registerRequest(request.type, rc.path);
- break;
- }
复制代码 最后会使用cnxn把相应返回给客户端:- ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
- cnxn.sendResponse(hdr, rsp, "response");
复制代码 EphemeralType
- VOID
- NORMAL
- CONTAINER
- TTL
ephemeralOwner标识znode是暂时的,以及哪个会话创建了该节点。通过zookeeper.extendedTypesEnabled属性可以启用ttl节点等扩展功能。ephemeralOwner的"特殊位"用于体现启用了哪个功能,而ephemeral Owner的剩余位是特定于功能的。
当zookeeper.extendedTypesEnabled为true时,将启用扩展类型。扩展ephemeralOwner填充高8位(0xff00000000000000L),高8位之后的两个字节用于体现ephemeralOwner扩展特征,剩余5个字节由该功能指定,可用于所需的任何目的。
目前,唯一扩展功能是TTL节点,扩展特征值为0。对于TTL节点,ephemeralOwner具有0xff的高8位,接下来2个字节是0,后面的5个字节是以毫秒为单位的ttl值。因此ttl值为1毫秒的ephemeralOwner是0xff00000000000001。
要添加新的扩展功能:
- 向枚举添加新名称
- 在ttl之后,定义常量extended_BIT_xxxx,即0x0001
- 通过静态初始值设定项向extendedFeatureMap添加映射
注意:从技能上讲,容器节点也是扩展类型,但由于它是在该功能之前实现的,因此被特殊体现。根据定义,只有高位集(0x8000000000000000L)的暂时全部者是容器节点(无论是否启用扩展类型)。
ttl节点
- 默认不开启,使用
- Added in 3.5.3
- 创建PERSISTENT或PERSISTENT_SEQUENTIAL节点时,可以设置以毫秒为单位的ttl。如果znode没有在ttl内修改,而且没有子节点,它将在将来的某个时候成为服务器删除的候选节点
container节点
- Added in 3.5.3
- 当container节点的最后一个子节点被删除时,该container节点将成为服务器在未来某个时候删除的候选节点
Stat类
封装节点属性,字段如下:
- czxid The zxid of the change that caused this znode to be created.
- mzxid The zxid of the change that last modified this znode.
- pzxid The zxid of the change that last modified children of this znode.
- ctime The time in milliseconds from epoch when this znode was created.
- mtime The time in milliseconds from epoch when this znode was last modified.
- version The number of changes to the data of this znode.
- cversion The number of changes to the children of this znode.
- aversion The number of changes to the ACL of this znode.
- ephemeralOwner The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
- dataLength The length of the data field of this znode.
- numChildren The number of children of this znode.
删除node
涉及delete、deleteContainer等下令。
PrepRequestProcessor事务准备
反序列化请求参数
- private void pRequestHelper(Request request) {
- try {
- switch (request.type) {
- // ...
- case OpCode.deleteContainer:
- DeleteContainerRequest deleteContainerRequest =
- request.readRequestRecord(DeleteContainerRequest::new);
- pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
- break;
- case OpCode.delete:
- DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
- pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
- break;
- }
- // ...
- }
- }
复制代码 DeleteContainerRequest类:- public class DeleteContainerRequest implements Record {
- private String path;
- }
复制代码 DeleteRequest类:- public class DeleteRequest implements Record {
- private String path;
- private int version;
- }
复制代码 事务准备
- protected void pRequest2Txn(int type, long zxid, Request request,
- Record record) throws KeeperException, IOException, RequestProcessorException {
- // 略
- switch (type) {
- // 略
- case OpCode.deleteContainer: {
- DeleteContainerRequest txn = (DeleteContainerRequest) record;
- String path = txn.getPath();
- String parentPath = getParentPathAndValidate(path);
- ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
- if (nodeRecord.childCount > 0) { // 有子节点不允许删除
- throw new KeeperException.NotEmptyException(path);
- }
- if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
- throw new KeeperException.BadVersionException(path);
- }
- ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
- request.setTxn(new DeleteTxn(path));
- // addChangeRecord 略
- break;
- }
- case OpCode.delete:
- zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
- DeleteRequest deleteRequest = (DeleteRequest) record;
- String path = deleteRequest.getPath();
- String parentPath = getParentPathAndValidate(path);
- ChangeRecord parentRecord = getRecordForPath(parentPath); // 获取父节点
- // 检查DELETE权限
- zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
- ChangeRecord nodeRecord = getRecordForPath(path); // 获取待删除节点
- checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // 检查version
- if (nodeRecord.childCount > 0) { // 有子节点不允许删除
- throw new KeeperException.NotEmptyException(path);
- }
- request.setTxn(new DeleteTxn(path));
- // addChangeRecord 略
- break;
- }
- }
复制代码 FinalRequestProcessor应用事务
processTxn
- public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
- ProcessTxnResult rc = new ProcessTxnResult();
- try {
- rc.clientId = header.getClientId();
- rc.cxid = header.getCxid();
- rc.zxid = header.getZxid();
- rc.type = header.getType();
- rc.err = 0;
- rc.multiResult = null;
- switch (header.getType()) {
- // ...
- case OpCode.delete:
- case OpCode.deleteContainer:
- DeleteTxn deleteTxn = (DeleteTxn) txn;
- rc.path = deleteTxn.getPath();
- deleteNode(deleteTxn.getPath(), header.getZxid());
- break;
- }
- // ...
- }
- }
复制代码 deleteNode
- public void deleteNode(String path, long zxid) throws NoNodeException {
- int lastSlash = path.lastIndexOf('/');
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- DataNode parent = nodes.get(parentName);
- if (parent == null) { // 获取父节点且必须存在
- throw new NoNodeException();
- }
- synchronized (parent) {
- nodes.preChange(parentName, parent);
- parent.removeChild(childName);
- if (zxid > parent.stat.getPzxid()) {
- parent.stat.setPzxid(zxid); // The zxid of the change that last modified children of this znode
- }
- nodes.postChange(parentName, parent);
- }
- DataNode node = nodes.get(path); // 获取删除节点
- if (node == null) {
- throw new NoNodeException();
- }
- nodes.remove(path); // 从NodeHashMap删除
- synchronized (node) { // 移除权限
- aclCache.removeUsage(node.acl);
- nodeDataSize.addAndGet(-getNodeSize(path, node.data));
- }
- // 移除临时节点、container、ttl等缓存
- synchronized (parent) {
- long owner = node.stat.getEphemeralOwner();
- EphemeralType ephemeralType = EphemeralType.get(owner);
- if (ephemeralType == EphemeralType.CONTAINER) {
- containers.remove(path);
- } else if (ephemeralType == EphemeralType.TTL) {
- ttls.remove(path);
- } else if (owner != 0) {
- Set<String> nodes = ephemerals.get(owner);
- if (nodes != null) {
- synchronized (nodes) {
- nodes.remove(path);
- }
- }
- }
- }
- // 略
- // 触发NodeDeleted监听
- WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
- childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
- // 触发父节点的NodeChildrenChanged监听
- childWatches.triggerWatch(
- "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
- }
复制代码 设置node数据
PrepRequestProcessor事务准备
反序列化请求参数
- private void pRequestHelper(Request request) {
- try {
- switch (request.type) {
- // ...
- case OpCode.setData:
- SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
- pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
- break;
- // other case
- }
- }
- // ...
- }
复制代码 SetDataRequest类:- public class SetDataRequest implements Record {
- private String path;
- private byte[] data;
- private int version;
- }
复制代码 事务准备
- protected void pRequest2Txn(int type, long zxid, Request request,
- Record record) throws KeeperException, IOException, RequestProcessorException {
- // 略
- switch (type) {
- // ...
- case OpCode.setData:
- zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
- SetDataRequest setDataRequest = (SetDataRequest) record;
- path = setDataRequest.getPath();
- validatePath(path, request.sessionId);
- nodeRecord = getRecordForPath(path); // 获取节点对象
- // 检查权限
- zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
- // 检查字节限额
- zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
- // version++
- int newVersion = checkAndIncVersion(
- nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
- // 创建SetDataTxn
- request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
- // addChangeRecord
- break;
- // other case
- }
- }
复制代码 FinalRequestProcessor应用事务
processTxn
- public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
- ProcessTxnResult rc = new ProcessTxnResult();
- try {
- rc.clientId = header.getClientId();
- rc.cxid = header.getCxid();
- rc.zxid = header.getZxid();
- rc.type = header.getType();
- rc.err = 0;
- rc.multiResult = null;
- switch (header.getType()) {
- // other case
- case OpCode.setData:
- SetDataTxn setDataTxn = (SetDataTxn) txn;
- rc.path = setDataTxn.getPath();
- rc.stat = setData(
- setDataTxn.getPath(),
- setDataTxn.getData(),
- setDataTxn.getVersion(),
- header.getZxid(),
- header.getTime());
- break;
- // other case
- }
- }
- // ...
- }
复制代码 setData
- public Stat setData(String path, byte[] data, int version,
- long zxid, long time) throws NoNodeException {
- Stat s = new Stat();
- DataNode n = nodes.get(path);
- if (n == null) { // 检查节点存在
- throw new NoNodeException();
- }
- byte[] lastData;
- synchronized (n) {
- lastData = n.data;
- nodes.preChange(path, n);
- n.data = data; // 节点数据
- n.stat.setMtime(time); // 修改时间
- n.stat.setMzxid(zxid); // 修改zxid
- n.stat.setVersion(version); // 版本
- n.copyStat(s);
- nodes.postChange(path, n);
- }
- // 略
- // 触发NodeDataChanged监听
- dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
- return s;
- }
复制代码 查询node数据
PrepRequestProcessor验证session
经过该处置惩罚器时,只做session验证。
之后的ProposalRequestProcessor、CommitProcessor、ToBeAppliedRequestProcessor都是直接通过,不做事务处置惩罚,直接交给FinalRequestProcessor处置惩罚器查询数据、发送相应。
FinalRequestProcessor查询数据
使用handleGetDataRequest方法查询数据:- private Record handleGetDataRequest(
- Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
- GetDataRequest getDataRequest = (GetDataRequest) request;
- String path = getDataRequest.getPath();
- DataNode n = zks.getZKDatabase().getNode(path);
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- // 检查权限
- zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
- Stat stat = new Stat();
- // 查询数据
- // 如果watcher参数不为null会给path添加一个监听器
- byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
- return new GetDataResponse(b, stat);
- }
复制代码 GetDataRequest类:- public class GetDataRequest implements Record {
- private String path;
- private boolean watch;
- }
复制代码 节点监听
addWatch下令
- case OpCode.addWatch: {
- lastOp = "ADDW";
- AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
- // 最终使用DataTree的addWatch方法注册监听器
- // cnxn是ServerCnxn对象,实现了Watcher接口
- zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
- rsp = new ErrorResponse(0);
- break;
- }
复制代码 DataTree的addWatch方法
- public void addWatch(String basePath, Watcher watcher, int mode) {
- // PERSISTENT|PERSISTENT_RECURSIVE
- WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
- // dataWatches和childWatches是WatchManager类型对象
- dataWatches.addWatch(basePath, watcher, watcherMode);
- if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
- childWatches.addWatch(basePath, watcher, watcherMode);
- }
- }
复制代码 WatcherMode枚举
- public enum WatcherMode {
- STANDARD(false, false),
- PERSISTENT(true, false), // persistent=0
- PERSISTENT_RECURSIVE(true, true), // persistentRecursive=1
- ;
- }
复制代码 PERSISTENT和PERSISTENT_RECURSIVE是3.6.0版本新增的特性。
Watcher接口
实现类需要实现process方法:- void process(WatchedEvent event);
复制代码 WatchedEvent代表一个监听变乱:- public class WatchedEvent {
- // 当前zk服务器的状态
- private final KeeperState keeperState;
- // NodeCreated|NodeDeleted|NodeDataChanged|NodeChildrenChanged等
- private final EventType eventType;
- private final String path;
- private final long zxid;
- }
复制代码 重要的实现类:
- NIOServerCnxn
- NettyServerCnxn
WatchManager类
This class manages watches. It allows watches to be associated with a string and removes watchers and their watches in addition to managing triggers.
核心字段:- // path -> Watcher集
- private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
- // Watcher -> path->WatchStats(PERSISTENT|STANDARD + PERSISTENT|PERSISTENT_RECURSIVE等)
- private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();
复制代码 WatchStats类
[code]public final class WatchStats { private static final WatchStats[] WATCH_STATS = new WatchStats[] { new WatchStats(0), // NONE new WatchStats(1), // STANDARD new WatchStats(2), // PERSISTENT new WatchStats(3), // STANDARD + PERSISTENT new WatchStats(4), // PERSISTENT_RECURSIVE new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE }; public static final WatchStats NONE = WATCH_STATS[0]; private final int flags; private WatchStats(int flags) { this.flags = flags; } private static int modeToFlag(WatcherMode mode) { // mode = STANDARD; return 1 |