- // 数据节点树
- protected DataTree dataTree;
- protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
- protected FileTxnSnapLog snapLog; // 用于操作底层数据文件
- // committedLog中第一条和最后一条数据的zxid
- protected long minCommittedLog, maxCommittedLog;
- // committedLog最大容量,默认500
- public int commitLogCount;
- // 维护最后提交的请求集,可用于快速follower同步
- protected Queue<Proposal> committedLog = new ArrayDeque<>();
- protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
- private volatile boolean initialized = false;
- // txnlog计数
- private AtomicInteger txnCount = new AtomicInteger(0);
- public ZKDatabase(FileTxnSnapLog snapLog) {
- dataTree = createDataTree();
- sessionsWithTimeouts = new ConcurrentHashMap<>();
- this.snapLog = snapLog;
- // 初始化snapshotSizeFactor默认0.33
- // 初始化commitLogCount默认500
- }
- public DataTree createDataTree() {
- return new DataTree();
- }
- // 返回committedLog集
- public synchronized Collection<Proposal> getCommittedLog();
- // 返回dataTree.lastProcessedZxid的值
- public long getDataTreeLastProcessedZxid();
- // 返回dataTree.getSessions()集
- public Collection<Long> getSessions();
- // 返回sessionsWithTimeouts的size
- public long getSessionCount();
- // 从磁盘加载dataTree并把txnLog加载到committedLog中
- public long loadDataBase() throws IOException;
- // 从磁盘加载txnLog到committedLog中
- public long fastForwardDataBase() throws IOException;
- // 使用addCommittedProposal方法添加committedLog
- private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest);
- // 添加committedLog
- public void addCommittedProposal(Request request);
- // 从txnLog加载Proposal
- public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit);
- // 使用dataTree.removeCnxn(cnxn)
- public void removeCnxn(ServerCnxn cnxn);
- // 使用dataTree.killSession(sessionId, zxid)
- public void killSession(long sessionId, long zxid);
- // 使用dataTree.dumpEphemerals(pwriter)
- public void dumpEphemerals(PrintWriter pwriter);
- // 使用dataTree.getEphemerals()
- public Map<Long, Set<String>> getEphemerals();
- // 使用dataTree.getNodeCount()
- public int getNodeCount();
- // 使用dataTree.getEphemerals(sessionId)
- public Set<String> getEphemerals(long sessionId);
- // 给dataTree.lastProcessedZxid赋值
- public void setlastProcessedZxid(long zxid);
- // 使用dataTree.processTxn(hdr, txn, digest)
- public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest);
- // 使用dataTree.statNode(path, serverCnxn)
- public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException;
- // 使用dataTree.getNode(path)
- public DataNode getNode(String path);
- // 使用dataTree.getData(path, stat, watcher)
- public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
- // 使用dataTree.setWatches方法实现
- public void setWatches(long relativeZxid, List<String> dataWatches,
- List<String> existWatches, List<String> childWatches,
- List<String> persistentWatches, List<String> persistentRecursiveWatches,
- Watcher watcher);
- // 使用dataTree.addWatch(basePath, watcher, mode)
- public void addWatch(String basePath, Watcher watcher, int mode);
- // 使用dataTree.getChildren(path, stat, watcher)
- public List<String> getChildren(
- String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
- // 使用dataTree.getAllChildrenNumber(path)
- public int getAllChildrenNumber(String path) throws KeeperException.NoNodeException;
- // Truncate the ZKDatabase to the specified zxid
- public boolean truncateLog(long zxid) throws IOException;
- // Deserialize a snapshot from an input archive
- public void deserializeSnapshot(InputArchive ia) throws IOException;
- // Deserialize a snapshot that contains FileHeader from an input archive
- // It is used by the admin restore command
- public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException;
- // Serialize the snapshot
- public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException;
- // 使用snapLog.append(si)保存数据,txnCount++
- public boolean append(Request si) throws IOException;
- // 使用snapLog.rollLog()滚动底层txnLog
- public void rollLog() throws IOException;
- // 使用snapLog.commit()提交底层txnLog
- public void commit() throws IOException;
- // 初始化/zookeeper/config数据,集群启动时已介绍
- public synchronized void initConfigInZKDatabase(QuorumVerifier qv);
- // 使用dataTree.containsWatcher(path, type, watcher)
- public boolean containsWatcher(String path, WatcherType type, Watcher watcher);
- // 使用dataTree.removeWatch(path, type, watcher)
- public boolean removeWatch(String path, WatcherType type, Watcher watcher);
从磁盘加载dataTree并把txnLog加载到committedLog中:- public long loadDataBase() throws IOException {
- long startTime = Time.currentElapsedTime();
- // 1. 从snapshot加载dataTree
- // 2. 使用fastForwardFromEdits方法从txnLog加载dataTree和committedlog
- long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
- initialized = true;
- // 略
- return zxid;
- }
从txnLog加载dataTree和committedlog集:- public long fastForwardDataBase() throws IOException {
- // 会通过commitProposalPlaybackListener调用addCommittedProposal添加committedlog
- long zxid = snapLog.fastForwardFromEdits(
- dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
- initialized = true;
- return zxid;
- }
- private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest) {
- Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
- r.setTxnDigest(digest);
- addCommittedProposal(r);
- }
- public void addCommittedProposal(Request request) {
- WriteLock wl = logLock.writeLock();
- try {
- wl.lock();
- if (committedLog.size() > commitLogCount) {
- committedLog.remove();
- minCommittedLog = committedLog.peek().packet.getZxid();
- }
- if (committedLog.isEmpty()) {
- minCommittedLog = request.zxid;
- maxCommittedLog = request.zxid;
- }
- byte[] data = request.getSerializeData();
- QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
- Proposal p = new Proposal();
- p.packet = pp;
- p.request = request;
- committedLog.add(p);
- maxCommittedLog = p.packet.getZxid();
- } finally {
- wl.unlock();
- }
- }
从txnlog获取Proposal,只填充packet字段:- public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit) {
- if (sizeLimit < 0) {
- return TxnLogProposalIterator.EMPTY_ITERATOR;
- }
- TxnIterator itr = null;
- try {
- // 从txnLog文件读取数据
- // 底层通过FileTxnIterator类读取文件流实现
- itr = snapLog.readTxnLog(startZxid, false);
- // If we cannot guarantee that this is strictly the starting txn
- // after a given zxid, we should fail.
- if ((itr.getHeader() != null) && (itr.getHeader().getZxid() > startZxid)) {
- itr.close();
- return TxnLogProposalIterator.EMPTY_ITERATOR;
- }
- if (sizeLimit > 0) {
- long txnSize = itr.getStorageSize();
- if (txnSize > sizeLimit) {
- itr.close();
- return TxnLogProposalIterator.EMPTY_ITERATOR;
- }
- }
- } catch (IOException e) {
- itr.close();
- return TxnLogProposalIterator.EMPTY_ITERATOR;
- }
- return new TxnLogProposalIterator(itr);
- }
把txnlog数据truncate到指定的zxid位置,然后重新加载DataTree数据:- public boolean truncateLog(long zxid) throws IOException {
- clear();
- // truncate the log
- boolean truncated = snapLog.truncateLog(zxid);
- if (!truncated) {
- return false;
- }
- loadDataBase();
- return true;
- }
- public void deserializeSnapshot(InputArchive ia) throws IOException {
- clear();
- SerializeUtils.deserializeSnapshot(getDataTree(), ia, getSessionWithTimeOuts());
- initialized = true;
- }
- public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
- clear();
- // deserialize data tree
- final DataTree dataTree = getDataTree();
- FileSnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
- SnapStream.checkSealIntegrity(is, ia);
- // deserialize digest and check integrity
- if (dataTree.deserializeZxidDigest(ia, 0)) {
- SnapStream.checkSealIntegrity(is, ia);
- }
- // deserialize lastProcessedZxid and check integrity
- if (dataTree.deserializeLastProcessedZxid(ia)) {
- SnapStream.checkSealIntegrity(is, ia);
- }
- // compare the digest to find inconsistency
- if (dataTree.getDigestFromLoadedSnapshot() != null) {
- dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
- }
- initialized = true;
- }
- public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException {
- SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
- }
- // This map provides a fast lookup to the data nodes
- private final NodeHashMap nodes;
- // Watcher
- private IWatchManager dataWatches;
- private IWatchManager childWatches;
- // cached total size of paths and data for all DataNodes
- private final AtomicLong nodeDataSize = new AtomicLong(0);
- // This hashtable lists the paths of the ephemeral nodes of a session
- private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>();
- // This set contains the paths of all container nodes
- private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<>());
- // This set contains the paths of all ttl nodes
- private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<>());
- // This is a pointer to the root of the DataTree
- private DataNode root = new DataNode(new byte[0], -1L, new StatPersisted());
- // create a /zookeeper filesystem that is the proc filesystem of zookeeper
- private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
- // create a /zookeeper/quota node for maintaining quota properties for zookeeper
- private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
- // 最新被处理的zxid
- public volatile long lastProcessedZxid = 0;
- NodeHashMap - NodeHashMapImpl实现类使用ConcurrentHashMap保存path -> DataNode数据
- IWatchManager和Watcher - 监听器管理
- DataNode - 封装树节点信息,包括data、children、stat等
- DataTree(DigestCalculator digestCalculator) {
- this.digestCalculator = digestCalculator;
- nodes = new NodeHashMapImpl(digestCalculator);
- // rather than fight it, let root have an alias
- nodes.put("", root); // "" -> root
- nodes.putWithoutDigest(rootZookeeper, root); // "/" -> root
- // add the proc node and quota node
- root.addChild(procChildZookeeper); // 添加zookeeper子节点
- nodes.put(procZookeeper, procDataNode); // "/zookeeper" -> procDataNode
- procDataNode.addChild(quotaChildZookeeper); // 添加quota子节点
- nodes.put(quotaZookeeper, quotaDataNode); // "/zookeeper/quota" -> quotaDataNode
- addConfigNode(); // 添加/zookeeper/config节点
- nodeDataSize.set(approximateDataSize());
- try {
- // 使用WatchManager实现类
- dataWatches = WatchManagerFactory.createWatchManager();
- childWatches = WatchManagerFactory.createWatchManager();
- } catch (Exception e) {}
- }
- public void addConfigNode() {
- DataNode zookeeperZnode = nodes.get(procZookeeper); // 找到/zookeeper节点
- if (zookeeperZnode != null) {
- zookeeperZnode.addChild(configChildZookeeper); // 添加config子节点
- }
- nodes.put(configZookeeper, new DataNode(new byte[0], -1L, new StatPersisted()));
- try {
- // Reconfig node is access controlled by default (ZOOKEEPER-2014).
- setACL(configZookeeper, ZooDefs.Ids.READ_ACL_UNSAFE, -1);
- } catch (NoNodeException e) {}
- }
- // Add a new node to the DataTree
- public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner,
- int parentCVersion, long zxid, long time, Stat outputStat);
- // Remove path from the DataTree
- public void deleteNode(String path, long zxid);
- // 为节点设置数据
- public Stat setData(String path, byte[] data, int version, long zxid, long time);
- // 1. 获取path的data
- // 2. 如果watcher不为null则addWatch
- public byte[] getData(String path, Stat stat, Watcher watcher);
- // 使用node.copyStat(stat)保存stat数据
- public Stat statNode(String path, Watcher watcher);
- // 1. copyStat到stat中
- // 2. addWatch
- // 3. getChildren
- public List<String> getChildren(String path, Stat stat, Watcher watcher);
- // 设置、获取权限
- public Stat setACL(String path, List<ACL> acl, int version);
- public List<ACL> getACL(String path, Stat stat);
- public List<ACL> getACL(DataNode node);
- // 添加Watcher
- public void addWatch(String basePath, Watcher watcher, int mode);
- // 处理事务请求
- public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn);
- // 杀会话,使用deleteNodes删除paths2DeleteLocal和paths2DeleteInTxn集
- void killSession(
- long session, long zxid, Set<String> paths2DeleteLocal, List<String> paths2DeleteInTxn);
- // 遍历paths2Delete调用deleteNode方法删除节点
- void deleteNodes(long session, long zxid, Iterable<String> paths2Delete);
- // 递归方式获取path下面的总节点数和总字节数
- private void getCounts(String path, Counts counts);
- // 序列化
- void serializeNode(OutputArchive oa, StringBuilder path);
- public void serializeNodeData(OutputArchive oa, String path, DataNode node);
- public void serializeAcls(OutputArchive oa);
- public void serializeNodes(OutputArchive oa);
- public void serialize(OutputArchive oa, String tag);
- // 反序列化
- public void deserialize(InputArchive ia, String tag);
- // 从dataWatches和childWatches移除watcher
- public void removeCnxn(Watcher watcher);
- // 触发或addWatch
- public void setWatches(long relativeZxid, List<String> dataWatches,
- List<String> existWatches, List<String> childWatches,
- List<String> persistentWatches, List<String> persistentRecursiveWatches,
- Watcher watcher);
- // 为path设置新的cversion和zxid
- public void setCversionPzxid(String path, int newCversion, long zxid);
- // Add the digest to the historical list, and update the latest zxid digest
- private void logZxidDigest(long zxid, long digest);
- // 序列化、反序列化lastProcessedZxidDigest
- public boolean serializeZxidDigest(OutputArchive oa);
- public boolean deserializeZxidDigest(InputArchive ia, long startZxidOfSnapshot);
- // 序列化、反序列化lastProcessedZxid
- public boolean serializeLastProcessedZxid(final OutputArchive oa);
- public boolean deserializeLastProcessedZxid(final InputArchive ia);
- // Compares the actual tree's digest with that in the snapshot.
- // Resets digestFromLoadedSnapshot after comparison.
- public void compareSnapshotDigests(long zxid);
- // Compares the digest of the tree with the digest present in transaction digest.
- // If there is any error, logs and alerts the watchers.
- public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest);
processTxn中会使用该方法创建节点:- public void createNode(final String path, byte[] data, List<ACL> acl,
- long ephemeralOwner, int parentCVersion, long zxid,
- long time, Stat outputStat) throws NoNodeException, NodeExistsException {
- int lastSlash = path.lastIndexOf('/');
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- StatPersisted stat = createStat(zxid, time, ephemeralOwner); // Create a node stat
- DataNode parent = nodes.get(parentName); // 父节点需要存在
- synchronized (parent) {
- Long acls = aclCache.convertAcls(acl);
- Set<String> children = parent.getChildren(); // path节点不能存在
- nodes.preChange(parentName, parent); // 执行removeDigest
- if (parentCVersion == -1) {
- parentCVersion = parent.stat.getCversion();
- parentCVersion++; // childVersion递增
- }
- if (parentCVersion > parent.stat.getCversion()) {
- parent.stat.setCversion(parentCVersion); // 父节点的childVersion
- parent.stat.setPzxid(zxid); // 父节点processZxid
- }
- DataNode child = new DataNode(data, acls, stat);
- parent.addChild(childName); // 添加节点
- nodes.postChange(parentName, parent);
- nodeDataSize.addAndGet(getNodeSize(path, child.data));
- nodes.put(path, child); // 维护NodeHashMap
- EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);// 通常是VOID|NORMAL
- if (ephemeralType == EphemeralType.CONTAINER) {
- containers.add(path);
- } else if (ephemeralType == EphemeralType.TTL) {
- ttls.add(path);
- } else if (ephemeralOwner != 0) {
- // 维护临时节点
- HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
- synchronized (list) {
- list.add(path);
- }
- }
- if (outputStat != null) {
- child.copyStat(outputStat); // 把权限保存到outputStat中
- }
- }
- // 略
- // 触发监听器
- dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
- childWatches.triggerWatch(
- parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
- }
- public void deleteNode(String path, long zxid) throws NoNodeException {
- int lastSlash = path.lastIndexOf('/');
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- DataNode parent = nodes.get(parentName); // 父节点要存在
- synchronized (parent) {
- nodes.preChange(parentName, parent);
- parent.removeChild(childName); // 移除子节点
- if (zxid > parent.stat.getPzxid()) {
- parent.stat.setPzxid(zxid);
- }
- nodes.postChange(parentName, parent);
- }
- DataNode node = nodes.get(path); // 节点要存在
- nodes.remove(path); // 从NodeHashMap移除
- synchronized (node) {
- aclCache.removeUsage(node.acl);
- nodeDataSize.addAndGet(-getNodeSize(path, node.data));
- }
- synchronized (parent) {
- long owner = node.stat.getEphemeralOwner();
- EphemeralType ephemeralType = EphemeralType.get(owner);
- if (ephemeralType == EphemeralType.CONTAINER) {
- containers.remove(path);
- } else if (ephemeralType == EphemeralType.TTL) {
- ttls.remove(path);
- } else if (owner != 0) { // 移除临时节点
- Set<String> nodes = ephemerals.get(owner);
- if (nodes != null) {
- synchronized (nodes) {
- nodes.remove(path);
- }
- }
- }
- }
- // 略
- // 触发监听器
- WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
- childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
- childWatches.triggerWatch(
- "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
- }
- public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
- Stat s = new Stat();
- DataNode n = nodes.get(path); // 节点要存在
- byte[] lastData;
- synchronized (n) {
- lastData = n.data;
- nodes.preChange(path, n);
- n.data = data; // data赋值
- n.stat.setMtime(time); // 修改时间
- n.stat.setMzxid(zxid); // 修改的zxid
- n.stat.setVersion(version); // 版本
- n.copyStat(s); // 保存stat
- nodes.postChange(path, n);
- }
- // 略
- // 触发监听器
- dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
- return s;
- }
- public Stat setACL(String path, List<ACL> acl, int version) throws NoNodeException {
- DataNode n = nodes.get(path);
- synchronized (n) {
- Stat stat = new Stat();
- aclCache.removeUsage(n.acl);
- nodes.preChange(path, n);
- n.stat.setAversion(version); // access时间
- n.acl = aclCache.convertAcls(acl); // 设置权限
- n.copyStat(stat);
- nodes.postChange(path, n);
- return stat;
- }
- }
- public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
- DataNode n = nodes.get(path);
- synchronized (n) {
- if (stat != null) {
- n.copyStat(stat);
- }
- return new ArrayList<>(aclCache.convertLong(n.acl));
- }
- }
- public List<ACL> getACL(DataNode node) {
- synchronized (node) {
- return aclCache.convertLong(node.acl);
- }
- }
- public void addWatch(String basePath, Watcher watcher, int mode) {
- WatcherMode watcherMode = WatcherMode.fromZooDef(mode); // PERSISTENT_RECURSIVE or PERSISTENT
- dataWatches.addWatch(basePath, watcher, watcherMode); // 只给节点添加Watcher
- if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
- childWatches.addWatch(basePath, watcher, watcherMode); // 递归添加Watcher
- }
- }
- public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
- ProcessTxnResult rc = new ProcessTxnResult();
- try {
- rc.clientId = header.getClientId();
- rc.cxid = header.getCxid();
- rc.zxid = header.getZxid();
- rc.type = header.getType();
- rc.err = 0;
- rc.multiResult = null;
- switch (header.getType()) {
- case OpCode.create: // 创建节点
- CreateTxn createTxn = (CreateTxn) txn;
- rc.path = createTxn.getPath();
- createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(),
- createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(),
- header.getZxid(), header.getTime(), null);
- break;
- case OpCode.create2: // 创建节点并保存stat
- CreateTxn create2Txn = (CreateTxn) txn;
- rc.path = create2Txn.getPath();
- Stat stat = new Stat();
- createNode(create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(),
- create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(),
- header.getZxid(), header.getTime(), stat);
- rc.stat = stat;
- break;
- case OpCode.createTTL:
- CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
- rc.path = createTtlTxn.getPath();
- stat = new Stat();
- createNode(createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(),
- EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), // ttl
- createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
- rc.stat = stat;
- break;
- case OpCode.createContainer:
- CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
- rc.path = createContainerTxn.getPath();
- stat = new Stat();
- createNode(createContainerTxn.getPath(), createContainerTxn.getData(),
- createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER,
- createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
- rc.stat = stat;
- break;
- case OpCode.delete:
- case OpCode.deleteContainer:
- DeleteTxn deleteTxn = (DeleteTxn) txn;
- rc.path = deleteTxn.getPath();
- deleteNode(deleteTxn.getPath(), header.getZxid()); // 删除节点
- break;
- case OpCode.reconfig:
- case OpCode.setData: // 设置节点数据
- SetDataTxn setDataTxn = (SetDataTxn) txn;
- rc.path = setDataTxn.getPath();
- rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(),
- header.getZxid(), header.getTime());
- break;
- case OpCode.setACL: // 设置ACL
- SetACLTxn setACLTxn = (SetACLTxn) txn;
- rc.path = setACLTxn.getPath();
- rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
- break;
- case OpCode.closeSession: // 关闭session
- long sessionId = header.getClientId();
- if (txn != null) {
- killSession(sessionId, header.getZxid(), ephemerals.remove(sessionId),
- ((CloseSessionTxn) txn).getPaths2Delete());
- } else {
- killSession(sessionId, header.getZxid());
- }
- break;
- case OpCode.error:
- ErrorTxn errTxn = (ErrorTxn) txn;
- rc.err = errTxn.getErr();
- break;
- case OpCode.check:
- CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
- rc.path = checkTxn.getPath();
- break;
- case OpCode.multi:
- // 遍历处理每一个Txn
- break;
- }
- } catch (KeeperException e) {
- rc.err = e.code().intValue();
- } catch (IOException e) {}
- //
- if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
- int lastSlash = rc.path.lastIndexOf('/');
- String parentName = rc.path.substring(0, lastSlash);
- CreateTxn cTxn = (CreateTxn) txn;
- try {
- setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
- } catch (NoNodeException e) {
- rc.err = e.code().intValue();
- }
- }
- //
- if (!isSubTxn) {
- if (rc.zxid > lastProcessedZxid) {
- lastProcessedZxid = rc.zxid; // 设置最新lastProcessedZxid
- }
- // 略
- }
- return rc;
- }
- void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
- String pathString = path.toString();
- DataNode node = getNode(pathString); // 查找节点
- String[] children;
- DataNode nodeCopy;
- synchronized (node) {
- StatPersisted statCopy = new StatPersisted();
- copyStatPersisted(node.stat, statCopy);
- // we do not need to make a copy of node.data because the contents are never changed
- nodeCopy = new DataNode(node.data, node.acl, statCopy);
- children = node.getChildren().toArray(new String[0]);
- }
- serializeNodeData(oa, pathString, nodeCopy); // 把节点写入到oa中
- path.append('/');
- int off = path.length();
- // 遍历子节点,将子节点写入oa中
- for (String child : children) {
- path.delete(off, Integer.MAX_VALUE);
- path.append(child);
- serializeNode(oa, path);
- }
- }
- // visible for test
- public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
- oa.writeString(path, "path");
- oa.writeRecord(node, "node");
- }
- public void serializeAcls(OutputArchive oa) throws IOException {
- aclCache.serialize(oa);
- }
- // 序列化整个NodeHashMap对象
- public void serializeNodes(OutputArchive oa) throws IOException {
- serializeNode(oa, new StringBuilder());
- // / marks end of stream
- // we need to check if clear had been called in between the snapshot.
- if (root != null) {
- oa.writeString("/", "path");
- }
- }
- // 完整序列化
- public void serialize(OutputArchive oa, String tag) throws IOException {
- serializeAcls(oa);
- serializeNodes(oa);
- }
- public void deserialize(InputArchive ia, String tag) throws IOException {
- aclCache.deserialize(ia);
- nodes.clear();
- pTrie.clear();
- nodeDataSize.set(0);
- String path = ia.readString("path");
- while (!"/".equals(path)) {
- DataNode node = new DataNode();
- ia.readRecord(node, "node");
- nodes.put(path, node);
- synchronized (node) {
- aclCache.addUsage(node.acl);
- }
- int lastSlash = path.lastIndexOf('/');
- if (lastSlash == -1) {
- root = node;
- } else {
- String parentPath = path.substring(0, lastSlash);
- DataNode parent = nodes.get(parentPath);
- if (parent == null) {
- throw new IOException(
- "Invalid Datatree, unable to find parent " + parentPath + " of path " + path);
- }
- parent.addChild(path.substring(lastSlash + 1));
- long owner = node.stat.getEphemeralOwner();
- EphemeralType ephemeralType = EphemeralType.get(owner);
- if (ephemeralType == EphemeralType.CONTAINER) {
- containers.add(path);
- } else if (ephemeralType == EphemeralType.TTL) {
- ttls.add(path);
- } else if (owner != 0) {
- HashSet<String> list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>());
- list.add(path);
- }
- }
- path = ia.readString("path");
- }
- // have counted digest for root node with "", ignore here to avoid counting twice for root node
- nodes.putWithoutDigest("/", root);
- nodeDataSize.set(approximateDataSize());
- // we are done with deserializing the datatree update the quotas - create path trie
- // and also update the stat nodes
- setupQuota();
- aclCache.purgeUnused();
- }
- // 从snapshots和transaction logs加载数据库
- public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
- // fast forward the server database to have the latest transactions in it
- // This is the same as restore, but only reads from the transaction logs and not restores from a snapshot
- public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
- // 使用txnLog.read(zxid, fastForward)方法从指定zxid加载TxnIterator
- public TxnIterator readTxnLog(long zxid, boolean fastForward);
- // process the transaction on the datatree
- public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions, Record txn);
- // 使用txnLog.getLastLoggedZxid()方法获取last logged zxid
- public long getLastLoggedZxid();
- // 把datatree和sessions保存到snapshot中
- public File save(
- DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap);
- // 把txnLog truncate到指定的zxid
- public boolean truncateLog(long zxid);
- // 使用snaplog.findMostRecentSnapshot()方法加载最近snapshot文件
- public File findMostRecentSnapshot();
- // 使用snaplog.findNRecentSnapshots(n)方法加载n个最近snapshot文件
- public List<File> findNRecentSnapshots(int n);
- // 使用snaplog.findNValidSnapshots(n)方法加载n个合法snapshot文件
- public List<File> findNValidSnapshots(int n);
- // 获取快照文件,可能包含比给定zxid更新的事务。
- // 包括起始zxid大于给定zxid的日志,以及起始zxid小于给定zxid的最新事务日志。
- // 后一个日志文件可能包含超出给定zxid的事务。
- public File[] getSnapshotLogs(long zxid);
- // 使用txnLog.append(si)追加数据
- public boolean append(Request si);
- // txnLog.commit()提交数据
- public void commit();
- 从snapshot加载dataTree数据
- 从txnlog加载dataTree和committedlog数据
- 如果没有加载到dataTree数据,将空的dataTree数据保存到snapshot.0文件中
- public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions,
- Record txn) throws KeeperException.NoNodeException {
- ProcessTxnResult rc;
- switch (hdr.getType()) {
- case OpCode.createSession:
- sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut());
- // give dataTree a chance to sync its lastProcessedZxid
- rc = dt.processTxn(hdr, txn);
- break;
- case OpCode.closeSession:
- sessions.remove(hdr.getClientId());
- rc = dt.processTxn(hdr, txn);
- break;
- default:
- rc = dt.processTxn(hdr, txn);
- }
- }
- public File save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
- boolean syncSnap) throws IOException {
- long lastZxid = dataTree.lastProcessedZxid;
- // 文件名snapshot.${lastZxid}
- File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
- try {
- snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
- return snapshotFile;
- } catch (IOException e) {
- throw e;
- }
- }
- public boolean truncateLog(long zxid) {
- try {
- // close the existing txnLog and snapLog
- close();
- // truncate it
- try (FileTxnLog truncLog = new FileTxnLog(dataDir)) {
- boolean truncated = truncLog.truncate(zxid);
- // re-open the txnLog and snapLog
- // I'd rather just close/reopen this object itself, however that
- // would have a big impact outside ZKDatabase as there are other
- // objects holding a reference to this object.
- txnLog = new FileTxnLog(dataDir);
- snapLog = new FileSnap(snapDir);
- return truncated;
- }
- } catch (IOException e) {
- return false;
- }
- }
Interface for reading transaction logs.- public interface TxnLog extends Closeable {
- // Setter for ServerStats to monitor fsync threshold exceed
- void setServerStats(ServerStats serverStats);
- // roll the current log being appended to
- void rollLog() throws IOException;
- // Append a request to the transaction log with a digset
- boolean append(Request request) throws IOException;
- // Start reading the transaction logs from a given zxid
- TxnIterator read(long zxid) throws IOException;
- // the last zxid of the logged transactions
- long getLastLoggedZxid() throws IOException;
- // truncate the log to get in sync with the leader
- boolean truncate(long zxid) throws IOException;
- // the dbid for this transaction log
- long getDbId() throws IOException;
- // commit the transaction and make sure they are persisted
- void commit() throws IOException;
- // return transaction log's elapsed sync time in milliseconds
- long getTxnLogSyncElapsedTime();
- void close() throws IOException;
- void setTotalLogSize(long size);
- long getTotalLogSize();
- }
复制代码 FileTxnLog实现类
- This class implements the TxnLog interface. It provides api's to access the txnlogs and add entries to it.
- The format of a Transactional log is as follows:
- LogFile:
- FileHeader TxnList ZeroPad
- FileHeader: {
- magic 4bytes (ZKLG)
- version 4bytes
- dbid 8bytes
- }
- TxnList:
- Txn || Txn TxnList
- Txn:
- checksum Txnlen TxnHeader Record 0x42
- checksum: 8bytes Adler32 is currently used
- calculated across payload -- Txnlen, TxnHeader, Record and 0x42
- Txnlen:
- len 4bytes
- TxnHeader: {
- sessionid 8bytes
- cxid 4bytes
- zxid 8bytes
- time 8bytes
- type 4bytes
- }
- Record:
- See Jute definition file for details on the various record types
- ZeroPad:
- 0 padded to EOF (filled during preallocation stage)
- public synchronized boolean append(Request request) throws IOException {
- TxnHeader hdr = request.getHdr();
- if (hdr == null) { // 不是事务请求
- return false;
- }
- if (hdr.getZxid() <= lastZxidSeen) {
- LOG.warn("...");
- } else {
- lastZxidSeen = hdr.getZxid();
- }
- if (logStream == null) {
- // 创建新log.${hdr.zxid}文件
- logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
- fos = new FileOutputStream(logFileWrite);
- logStream = new BufferedOutputStream(fos);
- oa = BinaryOutputArchive.getArchive(logStream);
- FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); // 文件头
- long dataSize = oa.getDataSize();
- fhdr.serialize(oa, "fileheader"); // 写文件头
- logStream.flush();
- // 文件偏移量
- filePosition += oa.getDataSize() - dataSize;
- filePadding.setCurrentSize(filePosition);
- streamsToFlush.add(fos);
- }
- fileSize = filePadding.padFile(fos.getChannel(), filePosition);
- byte[] buf = request.getSerializeData();
- long dataSize = oa.getDataSize();
- Checksum crc = makeChecksumAlgorithm();
- crc.update(buf, 0, buf.length);
- oa.writeLong(crc.getValue(), "txnEntryCRC"); // checksum
- Util.writeTxnBytes(oa, buf); // 写len, hdr, txn, digest, 0x42
- unFlushedSize += oa.getDataSize() - dataSize; // 计算未flush字节数
- return true;
- }
- public long getLastLoggedZxid() {
- File[] files = getLogFiles(logDir.listFiles(), 0);
- long maxLog = files.length > 0 ?
- Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;
- // 最新的log文件的后缀作为zxid
- long zxid = maxLog;
- // 从文件解析最新zxid
- try (FileTxnLog txn = new FileTxnLog(logDir); TxnIterator itr = txn.read(maxLog)) {
- while (true) {
- if (!itr.next()) {
- break;
- }
- TxnHeader hdr = itr.getHeader();
- zxid = hdr.getZxid();
- }
- } catch (IOException e) {
- }
- return zxid;
- }
- public synchronized void rollLog() throws IOException {
- if (logStream != null) {
- this.logStream.flush(); // 把当前文件刷写出去
- prevLogsRunningTotal += getCurrentLogSize();
- this.logStream = null; // 重置相关变量,后续append时会创建新的文件
- oa = null;
- fileSize = 0;
- filePosition = 0;
- unFlushedSize = 0;
- }
- }
- public synchronized void commit() throws IOException {
- if (logStream != null) {
- logStream.flush(); // 刷写文件
- filePosition += unFlushedSize;
- // If we have written more than we have previously preallocated,
- // we should override the fileSize by filePosition.
- if (filePosition > fileSize) {
- fileSize = filePosition;
- }
- unFlushedSize = 0;
- }
- for (FileOutputStream log : streamsToFlush) {
- log.flush(); // 刷写文件
- if (forceSync) {
- long startSyncNS = System.nanoTime();
- FileChannel channel = log.getChannel();
- channel.force(false);
- // 略
- }
- }
- // 关闭文件流
- while (streamsToFlush.size() > 1) {
- streamsToFlush.poll().close();
- }
- // Roll the log file if we exceed the size limit
- if (txnLogSizeLimit > 0) { // 默认-1分支进不来
- long logSize = getCurrentLogSize();
- if (logSize > txnLogSizeLimit) {
- rollLog();
- }
- }
- }
- // FileTxnIterator封装logFile和输入流对象,可以按照协议从文件流读取txnLog数据
- public TxnIterator read(long zxid) throws IOException {
- return read(zxid, true);
- }
- public TxnIterator read(long zxid, boolean fastForward) throws IOException {
- return new FileTxnIterator(logDir, zxid, fastForward);
- }
- // 将log文件truncate到指定zxid位置
- public boolean truncate(long zxid) throws IOException {
- try (FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid)) {
- PositionInputStream input = itr.inputStream;
- if (input == null) {
- throw new IOException("No log files found to truncate");
- }
- long pos = input.getPosition();
- // now, truncate at the current position
- RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
- raf.setLength(pos);
- raf.close(); // 把最小的文件truncate到指定zxid位置
- while (itr.goToNextLog()) { // 删除所有>zxid的log文件
- if (!itr.logFile.delete()) {
- }
- }
- }
- return true;
- }
- private static FileHeader readHeader(File file) throws IOException {
- InputStream is = null;
- try {
- is = new BufferedInputStream(new FileInputStream(file));
- InputArchive ia = BinaryInputArchive.getArchive(is);
- FileHeader hdr = new FileHeader();
- hdr.deserialize(ia, "fileheader"); // 反序列化
- return hdr;
- } finally {
- // is.close();
- }
- }
this class implements the txnlog iterator interface which is used for reading the transaction logs.
内部使用List保存着比指定zxid大或者含有指定zxid数据的log文件,初始化阶段会定位到参数zxid指定的位置,这样在后续访问时就可以从参数指定的zxid开始读取数据了。- public FileTxnIterator(File logDir, long zxid, boolean fastForward) throws IOException {
- this.logDir = logDir;
- this.zxid = zxid;
- init();
- if (fastForward && hdr != null) {
- while (hdr.getZxid() < zxid) { // 这里将数据移动到zxid位置
- if (!next()) {
- break;
- }
- }
- }
- }
- void init() throws IOException {
- storedFiles = new ArrayList<>();
- // 倒序查找log文件
- List<File> files = Util.sortDataDir(
- FileTxnLog.getLogFiles(logDir.listFiles(), 0),
- false);
- for (File f : files) {
- if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
- storedFiles.add(f);
- } else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
- // add the last logfile that is less than the zxid
- storedFiles.add(f);
- break;
- }
- }
- goToNextLog(); // 定位到下一个文件
- next(); // 定位到下一个log数据
- }
snapshot interface for the persistence layer. implement this interface for implementing snapshots.- public interface SnapShot {
- // deserialize a data tree from the last valid snapshot and return the last zxid that was deserialized
- long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
- // persist the datatree and the sessions into a persistence storage
- void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync) throws IOException;
- // find the most recent snapshot file
- File findMostRecentSnapshot() throws IOException;
- // get information of the last saved/restored snapshot
- SnapshotInfo getLastSnapshotInfo();
- // free resources from this snapshot immediately
- void close() throws IOException;
- }
负责存储、序列化和反序列化正确的快照。并提供对快照的访问:- public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
- // 在snapDir下查找合法的快照文件,倒序,所以最新的在前面
- List<File> snapList = findNValidSnapshots(100);
- File snap = null;
- long snapZxid = -1;
- boolean foundValid = false;
- for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
- snap = snapList.get(i);
- snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
- try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
- InputArchive ia = BinaryInputArchive.getArchive(snapIS);
- deserialize(dt, sessions, ia); // 将数据反序列到dt
- SnapStream.checkSealIntegrity(snapIS, ia);
- // Deserializing the zxid digest from the input
- // stream and update the digestFromLoadedSnapshot.
- // 格式: zxid digestVersion digest
- if (dt.deserializeZxidDigest(ia, snapZxid)) {
- SnapStream.checkSealIntegrity(snapIS, ia);
- }
- // deserialize lastProcessedZxid and check inconsistency
- // 读lastZxid字段得到
- if (dt.deserializeLastProcessedZxid(ia)) {
- SnapStream.checkSealIntegrity(snapIS, ia);
- }
- foundValid = true;
- break;
- } catch (IOException e) {}
- }
- // 验证foundValid
- // 上次处理到的zxid
- dt.lastProcessedZxid = snapZxid;
- lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
- // compare the digest if this is not a fuzzy snapshot, we want to compare and find inconsistent asap
- if (dt.getDigestFromLoadedSnapshot() != null) {
- dt.compareSnapshotDigests(dt.lastProcessedZxid);
- }
- return dt.lastProcessedZxid;
- }
- public static void deserialize(
- DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
- FileHeader header = new FileHeader(); // magic, version, dbid
- header.deserialize(ia, "fileheader"); // 解析文件头并验证magic
- if (header.getMagic() != SNAP_MAGIC) {
- throw new IOException("mismatching magic headers");
- }
- // 反序列化
- // 会话:
- // Count Session(s)
- // Session {id, timeout}
- // 节点:
- // AclCache PathNode(s)
- // PathNode {path, node}
- // node {data, acl, stat}
- SerializeUtils.deserializeSnapshot(dt, ia, sessions);
- }
- protected List<File> findNValidSnapshots(int n) {
- // 在snapDir下查找快照文件,倒序,最新的在前面
- List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
- int count = 0;
- List<File> list = new ArrayList<>();
- for (File f : files) {
- try {
- if (SnapStream.isValidSnapshot(f)) { // 验证文件合法
- list.add(f);
- count++;
- if (count == n) {
- break;
- }
- }
- } catch (IOException e) {}
- }
- return list;
- }
- public List<File> findNRecentSnapshots(int n) throws IOException {
- // 在snapDir下查找快照文件,倒序,最新的在前面
- List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
- int count = 0;
- List<File> list = new ArrayList<>();
- for (File f : files) {
- if (count == n) {
- break;
- }
- if (Util.getZxidFromName(f.getName(), SNAPSHOT_FILE_PREFIX) != -1) {
- count++;
- list.add(f);
- }
- }
- return list;
- }
- protected void serialize(
- DataTree dt, Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException {
- // 验证header!=null
- header.serialize(oa, "fileheader"); // 序列化文件头
- SerializeUtils.serializeSnapshot(dt, oa, sessions); // 序列化dataTree
- }
- public synchronized void serialize(
- DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) throws IOException {
- if (!close) {
- try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
- OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
- FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
- serialize(dt, sessions, oa, header);
- SnapStream.sealStream(snapOS, oa);
- // 序列化digest
- if (dt.serializeZxidDigest(oa)) {
- SnapStream.sealStream(snapOS, oa);
- }
- // 序列化lastProcessZxid
- if (dt.serializeLastProcessedZxid(oa)) {
- SnapStream.sealStream(snapOS, oa);
- }
- lastSnapshotInfo = new SnapshotInfo(
- Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
- snapShot.lastModified() / 1000);
- }
- } else {
- throw new IOException("FileSnap has already been closed");
- }
- }
复制代码 DatadirCleanupManager
purgeInterval参数指定执行周期(小时),默认0不开启清理功能。- public void start() {
- if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
- return;
- }
- // Don't schedule the purge task with zero or negative purge interval.
- if (purgeInterval <= 0) {
- return;
- }
- timer = new Timer("PurgeTask", true);
- TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
- timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
- purgeTaskStatus = PurgeTaskStatus.STARTED;
- }
复制代码 ContainerManager
