zookeeper源码(05)数据存储

打印 上一主题 下一主题

主题 926|帖子 926|积分 2778

本文详细分析一下zookeeper的数据存储。
ZKDatabase

维护zookeeper服务器内存数据库,包括session、dataTree和committedlog数据,从磁盘读取日志和快照后启动。
关键字段
  1. // 数据节点树
  2. protected DataTree dataTree;
  3. protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
  4. protected FileTxnSnapLog snapLog; // 用于操作底层数据文件
  5. // committedLog中第一条和最后一条数据的zxid
  6. protected long minCommittedLog, maxCommittedLog;
  7. // committedLog最大容量,默认500
  8. public int commitLogCount;
  9. // 维护最后提交的请求集,可用于快速follower同步
  10. protected Queue<Proposal> committedLog = new ArrayDeque<>();
  11. protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
  12. private volatile boolean initialized = false;
  13. // txnlog计数
  14. private AtomicInteger txnCount = new AtomicInteger(0);
复制代码
构造方法
  1. public ZKDatabase(FileTxnSnapLog snapLog) {
  2.     dataTree = createDataTree();
  3.     sessionsWithTimeouts = new ConcurrentHashMap<>();
  4.     this.snapLog = snapLog;
  5.     // 初始化snapshotSizeFactor默认0.33
  6.     // 初始化commitLogCount默认500
  7. }
  8. public DataTree createDataTree() {
  9.     return new DataTree();
  10. }
复制代码
创建DataTree对象:创建/zookeeper/quota、/zookeeper/config节点,创建dataWatches和childWatches对象(使用WatchManager实现类)。
主要方法
  1. // 返回committedLog集
  2. public synchronized Collection<Proposal> getCommittedLog();
  3. // 返回dataTree.lastProcessedZxid的值
  4. public long getDataTreeLastProcessedZxid();
  5. // 返回dataTree.getSessions()集
  6. public Collection<Long> getSessions();
  7. // 返回sessionsWithTimeouts的size
  8. public long getSessionCount();
  9. // 从磁盘加载dataTree并把txnLog加载到committedLog中
  10. public long loadDataBase() throws IOException;
  11. // 从磁盘加载txnLog到committedLog中
  12. public long fastForwardDataBase() throws IOException;
  13. // 使用addCommittedProposal方法添加committedLog
  14. private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest);
  15. // 添加committedLog
  16. public void addCommittedProposal(Request request);
  17. // 从txnLog加载Proposal
  18. public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit);
  19. // 使用dataTree.removeCnxn(cnxn)
  20. public void removeCnxn(ServerCnxn cnxn);
  21. // 使用dataTree.killSession(sessionId, zxid)
  22. public void killSession(long sessionId, long zxid);
  23. // 使用dataTree.dumpEphemerals(pwriter)
  24. public void dumpEphemerals(PrintWriter pwriter);
  25. // 使用dataTree.getEphemerals()
  26. public Map<Long, Set<String>> getEphemerals();
  27. // 使用dataTree.getNodeCount()
  28. public int getNodeCount();
  29. // 使用dataTree.getEphemerals(sessionId)
  30. public Set<String> getEphemerals(long sessionId);
  31. // 给dataTree.lastProcessedZxid赋值
  32. public void setlastProcessedZxid(long zxid);
  33. // 使用dataTree.processTxn(hdr, txn, digest)
  34. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest);
  35. // 使用dataTree.statNode(path, serverCnxn)
  36. public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException;
  37. // 使用dataTree.getNode(path)
  38. public DataNode getNode(String path);
  39. // 使用dataTree.getData(path, stat, watcher)
  40. public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
  41. // 使用dataTree.setWatches方法实现
  42. public void setWatches(long relativeZxid, List<String> dataWatches,
  43.                        List<String> existWatches, List<String> childWatches,
  44.                        List<String> persistentWatches, List<String> persistentRecursiveWatches,
  45.                        Watcher watcher);
  46. // 使用dataTree.addWatch(basePath, watcher, mode)
  47. public void addWatch(String basePath, Watcher watcher, int mode);
  48. // 使用dataTree.getChildren(path, stat, watcher)
  49. public List<String> getChildren(
  50.     String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException;
  51. // 使用dataTree.getAllChildrenNumber(path)
  52. public int getAllChildrenNumber(String path) throws KeeperException.NoNodeException;
  53. // Truncate the ZKDatabase to the specified zxid
  54. public boolean truncateLog(long zxid) throws IOException;
  55. // Deserialize a snapshot from an input archive
  56. public void deserializeSnapshot(InputArchive ia) throws IOException;
  57. // Deserialize a snapshot that contains FileHeader from an input archive
  58. // It is used by the admin restore command
  59. public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException;
  60. // Serialize the snapshot
  61. public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException;
  62. // 使用snapLog.append(si)保存数据,txnCount++
  63. public boolean append(Request si) throws IOException;
  64. // 使用snapLog.rollLog()滚动底层txnLog
  65. public void rollLog() throws IOException;
  66. // 使用snapLog.commit()提交底层txnLog
  67. public void commit() throws IOException;
  68. // 初始化/zookeeper/config数据,集群启动时已介绍
  69. public synchronized void initConfigInZKDatabase(QuorumVerifier qv);
  70. // 使用dataTree.containsWatcher(path, type, watcher)
  71. public boolean containsWatcher(String path, WatcherType type, Watcher watcher);
  72. // 使用dataTree.removeWatch(path, type, watcher)
  73. public boolean removeWatch(String path, WatcherType type, Watcher watcher);
复制代码
loadDataBase方法

从磁盘加载dataTree并把txnLog加载到committedLog中:
  1. public long loadDataBase() throws IOException {
  2.     long startTime = Time.currentElapsedTime();
  3.     // 1. 从snapshot加载dataTree
  4.     // 2. 使用fastForwardFromEdits方法从txnLog加载dataTree和committedlog
  5.     long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
  6.     initialized = true;
  7.     // 略
  8.     return zxid;
  9. }
复制代码
fastForwardDataBase方法

从txnLog加载dataTree和committedlog集:
  1. public long fastForwardDataBase() throws IOException {
  2.     // 会通过commitProposalPlaybackListener调用addCommittedProposal添加committedlog
  3.     long zxid = snapLog.fastForwardFromEdits(
  4.         dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
  5.     initialized = true;
  6.     return zxid;
  7. }
复制代码
addCommittedProposal方法
  1. private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest) {
  2.     Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
  3.     r.setTxnDigest(digest);
  4.     addCommittedProposal(r);
  5. }
  6. public void addCommittedProposal(Request request) {
  7.     WriteLock wl = logLock.writeLock();
  8.     try {
  9.         wl.lock();
  10.         if (committedLog.size() > commitLogCount) {
  11.             committedLog.remove();
  12.             minCommittedLog = committedLog.peek().packet.getZxid();
  13.         }
  14.         if (committedLog.isEmpty()) {
  15.             minCommittedLog = request.zxid;
  16.             maxCommittedLog = request.zxid;
  17.         }
  18.         byte[] data = request.getSerializeData();
  19.         QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
  20.         Proposal p = new Proposal();
  21.         p.packet = pp;
  22.         p.request = request;
  23.         committedLog.add(p);
  24.         maxCommittedLog = p.packet.getZxid();
  25.     } finally {
  26.         wl.unlock();
  27.     }
  28. }
复制代码
getProposalsFromTxnLog方法

从txnlog获取Proposal,只填充packet字段:
  1. public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit) {
  2.     if (sizeLimit < 0) {
  3.         return TxnLogProposalIterator.EMPTY_ITERATOR;
  4.     }
  5.     TxnIterator itr = null;
  6.     try {
  7.         // 从txnLog文件读取数据
  8.         // 底层通过FileTxnIterator类读取文件流实现
  9.         itr = snapLog.readTxnLog(startZxid, false);
  10.         // If we cannot guarantee that this is strictly the starting txn
  11.         // after a given zxid, we should fail.
  12.         if ((itr.getHeader() != null) && (itr.getHeader().getZxid() > startZxid)) {
  13.             itr.close();
  14.             return TxnLogProposalIterator.EMPTY_ITERATOR;
  15.         }
  16.         if (sizeLimit > 0) {
  17.             long txnSize = itr.getStorageSize();
  18.             if (txnSize > sizeLimit) {
  19.                 itr.close();
  20.                 return TxnLogProposalIterator.EMPTY_ITERATOR;
  21.             }
  22.         }
  23.     } catch (IOException e) {
  24.         itr.close();
  25.         return TxnLogProposalIterator.EMPTY_ITERATOR;
  26.     }
  27.     return new TxnLogProposalIterator(itr);
  28. }
复制代码
truncateLog方法

把txnlog数据truncate到指定的zxid位置,然后重新加载DataTree数据:
  1. public boolean truncateLog(long zxid) throws IOException {
  2.     clear();
  3.     // truncate the log
  4.     boolean truncated = snapLog.truncateLog(zxid);
  5.     if (!truncated) {
  6.         return false;
  7.     }
  8.     loadDataBase();
  9.     return true;
  10. }
复制代码
deserializeSnapshot方法
  1. public void deserializeSnapshot(InputArchive ia) throws IOException {
  2.     clear();
  3.     SerializeUtils.deserializeSnapshot(getDataTree(), ia, getSessionWithTimeOuts());
  4.     initialized = true;
  5. }
  6. public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
  7.     clear();
  8.     // deserialize data tree
  9.     final DataTree dataTree = getDataTree();
  10.     FileSnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
  11.     SnapStream.checkSealIntegrity(is, ia);
  12.     // deserialize digest and check integrity
  13.     if (dataTree.deserializeZxidDigest(ia, 0)) {
  14.         SnapStream.checkSealIntegrity(is, ia);
  15.     }
  16.     // deserialize lastProcessedZxid and check integrity
  17.     if (dataTree.deserializeLastProcessedZxid(ia)) {
  18.         SnapStream.checkSealIntegrity(is, ia);
  19.     }
  20.     // compare the digest to find inconsistency
  21.     if (dataTree.getDigestFromLoadedSnapshot() != null) {
  22.         dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
  23.     }
  24.     initialized = true;
  25. }
复制代码
serializeSnapshot方法
  1. public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException {
  2.     SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
  3. }
复制代码
DataTree

维护树状结构,没有任何网络或客户端连接代码,因此可以以独立的方式进行测试。
维护两个并行的数据结构:一个从完整路径映射到DataNodes的哈希表和一个DataNodes树,对路径的所有访问都是通过哈希表进行的,只有在序列化到磁盘时才遍历DataNodes树。
关键字段
  1. // This map provides a fast lookup to the data nodes
  2. private final NodeHashMap nodes;
  3. // Watcher
  4. private IWatchManager dataWatches;
  5. private IWatchManager childWatches;
  6. // cached total size of paths and data for all DataNodes
  7. private final AtomicLong nodeDataSize = new AtomicLong(0);
  8. // This hashtable lists the paths of the ephemeral nodes of a session
  9. private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>();
  10. // This set contains the paths of all container nodes
  11. private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<>());
  12. // This set contains the paths of all ttl nodes
  13. private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<>());
  14. // This is a pointer to the root of the DataTree
  15. private DataNode root = new DataNode(new byte[0], -1L, new StatPersisted());
  16. // create a /zookeeper filesystem that is the proc filesystem of zookeeper
  17. private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
  18. // create a /zookeeper/quota node for maintaining quota properties for zookeeper
  19. private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted());
  20. // 最新被处理的zxid
  21. public volatile long lastProcessedZxid = 0;
复制代码

  • NodeHashMap - NodeHashMapImpl实现类使用ConcurrentHashMap保存path -> DataNode数据
  • IWatchManager和Watcher - 监听器管理
  • DataNode - 封装树节点信息,包括data、children、stat等
构造方法
  1. DataTree(DigestCalculator digestCalculator) {
  2.     this.digestCalculator = digestCalculator;
  3.     nodes = new NodeHashMapImpl(digestCalculator);
  4.     // rather than fight it, let root have an alias
  5.     nodes.put("", root); // "" -> root
  6.     nodes.putWithoutDigest(rootZookeeper, root); // "/" -> root
  7.     // add the proc node and quota node
  8.     root.addChild(procChildZookeeper); // 添加zookeeper子节点
  9.     nodes.put(procZookeeper, procDataNode); // "/zookeeper" -> procDataNode
  10.     procDataNode.addChild(quotaChildZookeeper); // 添加quota子节点
  11.     nodes.put(quotaZookeeper, quotaDataNode); // "/zookeeper/quota" -> quotaDataNode
  12.     addConfigNode(); // 添加/zookeeper/config节点
  13.     nodeDataSize.set(approximateDataSize());
  14.     try {
  15.         // 使用WatchManager实现类
  16.         dataWatches = WatchManagerFactory.createWatchManager();
  17.         childWatches = WatchManagerFactory.createWatchManager();
  18.     } catch (Exception e) {}
  19. }
  20. public void addConfigNode() {
  21.     DataNode zookeeperZnode = nodes.get(procZookeeper); // 找到/zookeeper节点
  22.     if (zookeeperZnode != null) {
  23.         zookeeperZnode.addChild(configChildZookeeper); // 添加config子节点
  24.     }
  25.     nodes.put(configZookeeper, new DataNode(new byte[0], -1L, new StatPersisted()));
  26.     try {
  27.         // Reconfig node is access controlled by default (ZOOKEEPER-2014).
  28.         setACL(configZookeeper, ZooDefs.Ids.READ_ACL_UNSAFE, -1);
  29.     } catch (NoNodeException e) {}
  30. }
复制代码
主要方法
  1. // Add a new node to the DataTree
  2. public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner,
  3.                        int parentCVersion, long zxid, long time, Stat outputStat);
  4. // Remove path from the DataTree
  5. public void deleteNode(String path, long zxid);
  6. // 为节点设置数据
  7. public Stat setData(String path, byte[] data, int version, long zxid, long time);
  8. // 1. 获取path的data
  9. // 2. 如果watcher不为null则addWatch
  10. public byte[] getData(String path, Stat stat, Watcher watcher);
  11. // 使用node.copyStat(stat)保存stat数据
  12. public Stat statNode(String path, Watcher watcher);
  13. // 1. copyStat到stat中
  14. // 2. addWatch
  15. // 3. getChildren
  16. public List<String> getChildren(String path, Stat stat, Watcher watcher);
  17. // 设置、获取权限
  18. public Stat setACL(String path, List<ACL> acl, int version);
  19. public List<ACL> getACL(String path, Stat stat);
  20. public List<ACL> getACL(DataNode node);
  21. // 添加Watcher
  22. public void addWatch(String basePath, Watcher watcher, int mode);
  23. // 处理事务请求
  24. public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn);
  25. // 杀会话,使用deleteNodes删除paths2DeleteLocal和paths2DeleteInTxn集
  26. void killSession(
  27.     long session, long zxid, Set<String> paths2DeleteLocal, List<String> paths2DeleteInTxn);
  28. // 遍历paths2Delete调用deleteNode方法删除节点
  29. void deleteNodes(long session, long zxid, Iterable<String> paths2Delete);
  30. // 递归方式获取path下面的总节点数和总字节数
  31. private void getCounts(String path, Counts counts);
  32. // 序列化
  33. void serializeNode(OutputArchive oa, StringBuilder path);
  34. public void serializeNodeData(OutputArchive oa, String path, DataNode node);
  35. public void serializeAcls(OutputArchive oa);
  36. public void serializeNodes(OutputArchive oa);
  37. public void serialize(OutputArchive oa, String tag);
  38. // 反序列化
  39. public void deserialize(InputArchive ia, String tag);
  40. // 从dataWatches和childWatches移除watcher
  41. public void removeCnxn(Watcher watcher);
  42. // 触发或addWatch
  43. public void setWatches(long relativeZxid, List<String> dataWatches,
  44.                        List<String> existWatches, List<String> childWatches,
  45.                        List<String> persistentWatches, List<String> persistentRecursiveWatches,
  46.                        Watcher watcher);
  47. // 为path设置新的cversion和zxid
  48. public void setCversionPzxid(String path, int newCversion, long zxid);
  49. // Add the digest to the historical list, and update the latest zxid digest
  50. private void logZxidDigest(long zxid, long digest);
  51. // 序列化、反序列化lastProcessedZxidDigest
  52. public boolean serializeZxidDigest(OutputArchive oa);
  53. public boolean deserializeZxidDigest(InputArchive ia, long startZxidOfSnapshot);
  54. // 序列化、反序列化lastProcessedZxid
  55. public boolean serializeLastProcessedZxid(final OutputArchive oa);
  56. public boolean deserializeLastProcessedZxid(final InputArchive ia);
  57. // Compares the actual tree's digest with that in the snapshot.
  58. // Resets digestFromLoadedSnapshot after comparison.
  59. public void compareSnapshotDigests(long zxid);
  60. // Compares the digest of the tree with the digest present in transaction digest.
  61. // If there is any error, logs and alerts the watchers.
  62. public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest);
复制代码
createNode方法

processTxn中会使用该方法创建节点:
  1. public void createNode(final String path, byte[] data, List<ACL> acl,
  2.                        long ephemeralOwner, int parentCVersion, long zxid,
  3.                        long time, Stat outputStat) throws NoNodeException, NodeExistsException {
  4.     int lastSlash = path.lastIndexOf('/');
  5.     String parentName = path.substring(0, lastSlash);
  6.     String childName = path.substring(lastSlash + 1);
  7.     StatPersisted stat = createStat(zxid, time, ephemeralOwner); // Create a node stat
  8.     DataNode parent = nodes.get(parentName); // 父节点需要存在
  9.     synchronized (parent) {
  10.         Long acls = aclCache.convertAcls(acl);
  11.         Set<String> children = parent.getChildren(); // path节点不能存在
  12.         nodes.preChange(parentName, parent); // 执行removeDigest
  13.         if (parentCVersion == -1) {
  14.             parentCVersion = parent.stat.getCversion();
  15.             parentCVersion++; // childVersion递增
  16.         }
  17.         if (parentCVersion > parent.stat.getCversion()) {
  18.             parent.stat.setCversion(parentCVersion); // 父节点的childVersion
  19.             parent.stat.setPzxid(zxid); // 父节点processZxid
  20.         }
  21.         DataNode child = new DataNode(data, acls, stat);
  22.         parent.addChild(childName); // 添加节点
  23.         nodes.postChange(parentName, parent);
  24.         nodeDataSize.addAndGet(getNodeSize(path, child.data));
  25.         nodes.put(path, child); // 维护NodeHashMap
  26.         EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);// 通常是VOID|NORMAL
  27.         if (ephemeralType == EphemeralType.CONTAINER) {
  28.             containers.add(path);
  29.         } else if (ephemeralType == EphemeralType.TTL) {
  30.             ttls.add(path);
  31.         } else if (ephemeralOwner != 0) {
  32.             // 维护临时节点
  33.             HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
  34.             synchronized (list) {
  35.                 list.add(path);
  36.             }
  37.         }
  38.         if (outputStat != null) {
  39.             child.copyStat(outputStat); // 把权限保存到outputStat中
  40.         }
  41.     }
  42.     // 略
  43.     // 触发监听器
  44.     dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
  45.     childWatches.triggerWatch(
  46.         parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
  47. }
复制代码
deleteNode方法
  1. public void deleteNode(String path, long zxid) throws NoNodeException {
  2.     int lastSlash = path.lastIndexOf('/');
  3.     String parentName = path.substring(0, lastSlash);
  4.     String childName = path.substring(lastSlash + 1);
  5.     DataNode parent = nodes.get(parentName); // 父节点要存在
  6.     synchronized (parent) {
  7.         nodes.preChange(parentName, parent);
  8.         parent.removeChild(childName); // 移除子节点
  9.         if (zxid > parent.stat.getPzxid()) {
  10.             parent.stat.setPzxid(zxid);
  11.         }
  12.         nodes.postChange(parentName, parent);
  13.     }
  14.     DataNode node = nodes.get(path); // 节点要存在
  15.     nodes.remove(path); // 从NodeHashMap移除
  16.     synchronized (node) {
  17.         aclCache.removeUsage(node.acl);
  18.         nodeDataSize.addAndGet(-getNodeSize(path, node.data));
  19.     }
  20.     synchronized (parent) {
  21.         long owner = node.stat.getEphemeralOwner();
  22.         EphemeralType ephemeralType = EphemeralType.get(owner);
  23.         if (ephemeralType == EphemeralType.CONTAINER) {
  24.             containers.remove(path);
  25.         } else if (ephemeralType == EphemeralType.TTL) {
  26.             ttls.remove(path);
  27.         } else if (owner != 0) { // 移除临时节点
  28.             Set<String> nodes = ephemerals.get(owner);
  29.             if (nodes != null) {
  30.                 synchronized (nodes) {
  31.                     nodes.remove(path);
  32.                 }
  33.             }
  34.         }
  35.     }
  36.     // 略
  37.     // 触发监听器
  38.     WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
  39.     childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
  40.     childWatches.triggerWatch(
  41.         "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
  42. }
复制代码
setData方法
  1. public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
  2.     Stat s = new Stat();
  3.     DataNode n = nodes.get(path); // 节点要存在
  4.     byte[] lastData;
  5.     synchronized (n) {
  6.         lastData = n.data;
  7.         nodes.preChange(path, n);
  8.         n.data = data; // data赋值
  9.         n.stat.setMtime(time); // 修改时间
  10.         n.stat.setMzxid(zxid); // 修改的zxid
  11.         n.stat.setVersion(version); // 版本
  12.         n.copyStat(s); // 保存stat
  13.         nodes.postChange(path, n);
  14.     }
  15.     // 略
  16.     // 触发监听器
  17.     dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
  18.     return s;
  19. }
复制代码
setAcl等acl方法
  1. public Stat setACL(String path, List<ACL> acl, int version) throws NoNodeException {
  2.     DataNode n = nodes.get(path);
  3.     synchronized (n) {
  4.         Stat stat = new Stat();
  5.         aclCache.removeUsage(n.acl);
  6.         nodes.preChange(path, n);
  7.         n.stat.setAversion(version); // access时间
  8.         n.acl = aclCache.convertAcls(acl); // 设置权限
  9.         n.copyStat(stat);
  10.         nodes.postChange(path, n);
  11.         return stat;
  12.     }
  13. }
  14. public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
  15.     DataNode n = nodes.get(path);
  16.     synchronized (n) {
  17.         if (stat != null) {
  18.             n.copyStat(stat);
  19.         }
  20.         return new ArrayList<>(aclCache.convertLong(n.acl));
  21.     }
  22. }
  23. public List<ACL> getACL(DataNode node) {
  24.     synchronized (node) {
  25.         return aclCache.convertLong(node.acl);
  26.     }
  27. }
复制代码
addWatch方法
  1. public void addWatch(String basePath, Watcher watcher, int mode) {
  2.     WatcherMode watcherMode = WatcherMode.fromZooDef(mode); // PERSISTENT_RECURSIVE or PERSISTENT
  3.     dataWatches.addWatch(basePath, watcher, watcherMode); // 只给节点添加Watcher
  4.     if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
  5.         childWatches.addWatch(basePath, watcher, watcherMode); // 递归添加Watcher
  6.     }
  7. }
复制代码
processTxn方法
  1. public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
  2.     ProcessTxnResult rc = new ProcessTxnResult();
  3.     try {
  4.         rc.clientId = header.getClientId();
  5.         rc.cxid = header.getCxid();
  6.         rc.zxid = header.getZxid();
  7.         rc.type = header.getType();
  8.         rc.err = 0;
  9.         rc.multiResult = null;
  10.         switch (header.getType()) {
  11.         case OpCode.create: // 创建节点
  12.             CreateTxn createTxn = (CreateTxn) txn;
  13.             rc.path = createTxn.getPath();
  14.             createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(),
  15.                 createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(),
  16.                 header.getZxid(), header.getTime(), null);
  17.             break;
  18.         case OpCode.create2: // 创建节点并保存stat
  19.             CreateTxn create2Txn = (CreateTxn) txn;
  20.             rc.path = create2Txn.getPath();
  21.             Stat stat = new Stat();
  22.             createNode(create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(),
  23.                 create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(),
  24.                 header.getZxid(), header.getTime(), stat);
  25.             rc.stat = stat;
  26.             break;
  27.         case OpCode.createTTL:
  28.             CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
  29.             rc.path = createTtlTxn.getPath();
  30.             stat = new Stat();
  31.             createNode(createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(),
  32.                 EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), // ttl
  33.                 createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
  34.             rc.stat = stat;
  35.             break;
  36.         case OpCode.createContainer:
  37.             CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
  38.             rc.path = createContainerTxn.getPath();
  39.             stat = new Stat();
  40.             createNode(createContainerTxn.getPath(), createContainerTxn.getData(),
  41.                 createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER,
  42.                 createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat);
  43.             rc.stat = stat;
  44.             break;
  45.         case OpCode.delete:
  46.         case OpCode.deleteContainer:
  47.             DeleteTxn deleteTxn = (DeleteTxn) txn;
  48.             rc.path = deleteTxn.getPath();
  49.             deleteNode(deleteTxn.getPath(), header.getZxid()); // 删除节点
  50.             break;
  51.         case OpCode.reconfig:
  52.         case OpCode.setData: // 设置节点数据
  53.             SetDataTxn setDataTxn = (SetDataTxn) txn;
  54.             rc.path = setDataTxn.getPath();
  55.             rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(),
  56.                 header.getZxid(), header.getTime());
  57.             break;
  58.         case OpCode.setACL: // 设置ACL
  59.             SetACLTxn setACLTxn = (SetACLTxn) txn;
  60.             rc.path = setACLTxn.getPath();
  61.             rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
  62.             break;
  63.         case OpCode.closeSession: // 关闭session
  64.             long sessionId = header.getClientId();
  65.             if (txn != null) {
  66.                 killSession(sessionId, header.getZxid(), ephemerals.remove(sessionId),
  67.                         ((CloseSessionTxn) txn).getPaths2Delete());
  68.             } else {
  69.                 killSession(sessionId, header.getZxid());
  70.             }
  71.             break;
  72.         case OpCode.error:
  73.             ErrorTxn errTxn = (ErrorTxn) txn;
  74.             rc.err = errTxn.getErr();
  75.             break;
  76.         case OpCode.check:
  77.             CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
  78.             rc.path = checkTxn.getPath();
  79.             break;
  80.         case OpCode.multi:
  81.             // 遍历处理每一个Txn
  82.             break;
  83.         }
  84.     } catch (KeeperException e) {
  85.         rc.err = e.code().intValue();
  86.     } catch (IOException e) {}
  87.     //
  88.     if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
  89.         int lastSlash = rc.path.lastIndexOf('/');
  90.         String parentName = rc.path.substring(0, lastSlash);
  91.         CreateTxn cTxn = (CreateTxn) txn;
  92.         try {
  93.             setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
  94.         } catch (NoNodeException e) {
  95.             rc.err = e.code().intValue();
  96.         }
  97.     }
  98.     //
  99.     if (!isSubTxn) {
  100.         if (rc.zxid > lastProcessedZxid) {
  101.             lastProcessedZxid = rc.zxid; // 设置最新lastProcessedZxid
  102.         }
  103.         // 略
  104.     }
  105.     return rc;
  106. }
复制代码
serialize相关方法
  1. void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
  2.     String pathString = path.toString();
  3.     DataNode node = getNode(pathString); // 查找节点
  4.     String[] children;
  5.     DataNode nodeCopy;
  6.     synchronized (node) {
  7.         StatPersisted statCopy = new StatPersisted();
  8.         copyStatPersisted(node.stat, statCopy);
  9.         // we do not need to make a copy of node.data because the contents are never changed
  10.         nodeCopy = new DataNode(node.data, node.acl, statCopy);
  11.         children = node.getChildren().toArray(new String[0]);
  12.     }
  13.     serializeNodeData(oa, pathString, nodeCopy); // 把节点写入到oa中
  14.     path.append('/');
  15.     int off = path.length();
  16.     // 遍历子节点,将子节点写入oa中
  17.     for (String child : children) {
  18.         path.delete(off, Integer.MAX_VALUE);
  19.         path.append(child);
  20.         serializeNode(oa, path);
  21.     }
  22. }
  23. // visible for test
  24. public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException {
  25.     oa.writeString(path, "path");
  26.     oa.writeRecord(node, "node");
  27. }
  28. public void serializeAcls(OutputArchive oa) throws IOException {
  29.     aclCache.serialize(oa);
  30. }
  31. // 序列化整个NodeHashMap对象
  32. public void serializeNodes(OutputArchive oa) throws IOException {
  33.     serializeNode(oa, new StringBuilder());
  34.     // / marks end of stream
  35.     // we need to check if clear had been called in between the snapshot.
  36.     if (root != null) {
  37.         oa.writeString("/", "path");
  38.     }
  39. }
  40. // 完整序列化
  41. public void serialize(OutputArchive oa, String tag) throws IOException {
  42.     serializeAcls(oa);
  43.     serializeNodes(oa);
  44. }
复制代码
deserialize相关方法
  1. public void deserialize(InputArchive ia, String tag) throws IOException {
  2.     aclCache.deserialize(ia);
  3.     nodes.clear();
  4.     pTrie.clear();
  5.     nodeDataSize.set(0);
  6.     String path = ia.readString("path");
  7.     while (!"/".equals(path)) {
  8.         DataNode node = new DataNode();
  9.         ia.readRecord(node, "node");
  10.         nodes.put(path, node);
  11.         synchronized (node) {
  12.             aclCache.addUsage(node.acl);
  13.         }
  14.         int lastSlash = path.lastIndexOf('/');
  15.         if (lastSlash == -1) {
  16.             root = node;
  17.         } else {
  18.             String parentPath = path.substring(0, lastSlash);
  19.             DataNode parent = nodes.get(parentPath);
  20.             if (parent == null) {
  21.                 throw new IOException(
  22.                         "Invalid Datatree, unable to find parent " + parentPath + " of path " + path);
  23.             }
  24.             parent.addChild(path.substring(lastSlash + 1));
  25.             long owner = node.stat.getEphemeralOwner();
  26.             EphemeralType ephemeralType = EphemeralType.get(owner);
  27.             if (ephemeralType == EphemeralType.CONTAINER) {
  28.                 containers.add(path);
  29.             } else if (ephemeralType == EphemeralType.TTL) {
  30.                 ttls.add(path);
  31.             } else if (owner != 0) {
  32.                 HashSet<String> list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>());
  33.                 list.add(path);
  34.             }
  35.         }
  36.         path = ia.readString("path");
  37.     }
  38.     // have counted digest for root node with "", ignore here to avoid counting twice for root node
  39.     nodes.putWithoutDigest("/", root);
  40.     nodeDataSize.set(approximateDataSize());
  41.     // we are done with deserializing the datatree update the quotas - create path trie
  42.     // and also update the stat nodes
  43.     setupQuota();
  44.     aclCache.purgeUnused();
  45. }
复制代码
FileTxnSnapLog

操作TxnLog和SnapShot的入口类。
构造方法会创建dataDir和snapDir目录,判断数据目录可写,创建txnLog和snapLog对象访问数据文件。
主要方法
  1. // 从snapshots和transaction logs加载数据库
  2. public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
  3. // fast forward the server database to have the latest transactions in it
  4. // This is the same as restore, but only reads from the transaction logs and not restores from a snapshot
  5. public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener);
  6. // 使用txnLog.read(zxid, fastForward)方法从指定zxid加载TxnIterator
  7. public TxnIterator readTxnLog(long zxid, boolean fastForward);
  8. // process the transaction on the datatree
  9. public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions, Record txn);
  10. // 使用txnLog.getLastLoggedZxid()方法获取last logged zxid
  11. public long getLastLoggedZxid();
  12. // 把datatree和sessions保存到snapshot中
  13. public File save(
  14.     DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap);
  15. // 把txnLog truncate到指定的zxid
  16. public boolean truncateLog(long zxid);
  17. // 使用snaplog.findMostRecentSnapshot()方法加载最近snapshot文件
  18. public File findMostRecentSnapshot();
  19. // 使用snaplog.findNRecentSnapshots(n)方法加载n个最近snapshot文件
  20. public List<File> findNRecentSnapshots(int n);
  21. // 使用snaplog.findNValidSnapshots(n)方法加载n个合法snapshot文件
  22. public List<File> findNValidSnapshots(int n);
  23. // 获取快照文件,可能包含比给定zxid更新的事务。
  24. // 包括起始zxid大于给定zxid的日志,以及起始zxid小于给定zxid的最新事务日志。
  25. // 后一个日志文件可能包含超出给定zxid的事务。
  26. public File[] getSnapshotLogs(long zxid);
  27. // 使用txnLog.append(si)追加数据
  28. public boolean append(Request si);
  29. // txnLog.commit()提交数据
  30. public void commit();
复制代码
restore方法


  • 从snapshot加载dataTree数据
  • 从txnlog加载dataTree和committedlog数据
  • 如果没有加载到dataTree数据,将空的dataTree数据保存到snapshot.0文件中
fastForwardFromEdits方法

从txnlog加载dataTree和committedlog数据。
processTransaction方法
  1. public void processTransaction(TxnHeader hdr, DataTree dt, Map<Long, Integer> sessions,
  2.         Record txn) throws KeeperException.NoNodeException {
  3.     ProcessTxnResult rc;
  4.     switch (hdr.getType()) {
  5.     case OpCode.createSession:
  6.         sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut());
  7.         // give dataTree a chance to sync its lastProcessedZxid
  8.         rc = dt.processTxn(hdr, txn);
  9.         break;
  10.     case OpCode.closeSession:
  11.         sessions.remove(hdr.getClientId());
  12.         rc = dt.processTxn(hdr, txn);
  13.         break;
  14.     default:
  15.         rc = dt.processTxn(hdr, txn);
  16.     }
  17. }
复制代码
save方法
  1. public File save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
  2.         boolean syncSnap) throws IOException {
  3.     long lastZxid = dataTree.lastProcessedZxid;
  4.     // 文件名snapshot.${lastZxid}
  5.     File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
  6.     try {
  7.         snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
  8.         return snapshotFile;
  9.     } catch (IOException e) {
  10.         throw e;
  11.     }
  12. }
复制代码
truncateLog方法
  1. public boolean truncateLog(long zxid) {
  2.     try {
  3.         // close the existing txnLog and snapLog
  4.         close();
  5.         // truncate it
  6.         try (FileTxnLog truncLog = new FileTxnLog(dataDir)) {
  7.             boolean truncated = truncLog.truncate(zxid);
  8.             // re-open the txnLog and snapLog
  9.             // I'd rather just close/reopen this object itself, however that
  10.             // would have a big impact outside ZKDatabase as there are other
  11.             // objects holding a reference to this object.
  12.             txnLog = new FileTxnLog(dataDir);
  13.             snapLog = new FileSnap(snapDir);
  14.             return truncated;
  15.         }
  16.     } catch (IOException e) {
  17.         return false;
  18.     }
  19. }
复制代码
TxnLog接口和FileTxnLog实现类

txnlog

使用文件保存所有的事务操作,客户端的写操作会先写入txnlog文件,在follower达到quorum状态后提交到dataTree中,在ZKDatabase启动阶段,如果txnlog的zxid大于snapshot的zxid时,会加载txnlog文件数据回放事务,提交到dataTree中。
TxnLog接口

Interface for reading transaction logs.
  1. public interface TxnLog extends Closeable {
  2.     // Setter for ServerStats to monitor fsync threshold exceed
  3.     void setServerStats(ServerStats serverStats);
  4.     // roll the current log being appended to
  5.     void rollLog() throws IOException;
  6.     // Append a request to the transaction log with a digset
  7.     boolean append(Request request) throws IOException;
  8.     // Start reading the transaction logs from a given zxid
  9.     TxnIterator read(long zxid) throws IOException;
  10.     // the last zxid of the logged transactions
  11.     long getLastLoggedZxid() throws IOException;
  12.     // truncate the log to get in sync with the leader
  13.     boolean truncate(long zxid) throws IOException;
  14.     // the dbid for this transaction log
  15.     long getDbId() throws IOException;
  16.     // commit the transaction and make sure they are persisted
  17.     void commit() throws IOException;
  18.     // return transaction log's elapsed sync time in milliseconds
  19.     long getTxnLogSyncElapsedTime();
  20.     void close() throws IOException;
  21.     void setTotalLogSize(long size);
  22.     long getTotalLogSize();
  23. }
复制代码
FileTxnLog实现类
  1. This class implements the TxnLog interface. It provides api's to access the txnlogs and add entries to it.
  2. The format of a Transactional log is as follows:
  3.    LogFile:
  4.        FileHeader TxnList ZeroPad
  5.    FileHeader: {
  6.        magic 4bytes (ZKLG)
  7.        version 4bytes
  8.        dbid 8bytes
  9.    }
  10.    TxnList:
  11.        Txn || Txn TxnList
  12.    Txn:
  13.        checksum Txnlen TxnHeader Record 0x42
  14.    checksum: 8bytes Adler32 is currently used
  15.      calculated across payload -- Txnlen, TxnHeader, Record and 0x42
  16.    Txnlen:
  17.        len 4bytes
  18.    TxnHeader: {
  19.        sessionid 8bytes
  20.        cxid 4bytes
  21.        zxid 8bytes
  22.        time 8bytes
  23.        type 4bytes
  24.    }
  25.    Record:
  26.        See Jute definition file for details on the various record types
  27.    ZeroPad:
  28.        0 padded to EOF (filled during preallocation stage)
复制代码
FileTxnLog主要方法实现
  1. public synchronized boolean append(Request request) throws IOException {
  2.     TxnHeader hdr = request.getHdr();
  3.     if (hdr == null) { // 不是事务请求
  4.         return false;
  5.     }
  6.     if (hdr.getZxid() <= lastZxidSeen) {
  7.         LOG.warn("...");
  8.     } else {
  9.         lastZxidSeen = hdr.getZxid();
  10.     }
  11.     if (logStream == null) {
  12.         // 创建新log.${hdr.zxid}文件
  13.         logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
  14.         fos = new FileOutputStream(logFileWrite);
  15.         logStream = new BufferedOutputStream(fos);
  16.         oa = BinaryOutputArchive.getArchive(logStream);
  17.         FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); // 文件头
  18.         long dataSize = oa.getDataSize();
  19.         fhdr.serialize(oa, "fileheader"); // 写文件头
  20.         logStream.flush();
  21.         // 文件偏移量
  22.         filePosition += oa.getDataSize() - dataSize;
  23.         filePadding.setCurrentSize(filePosition);
  24.         streamsToFlush.add(fos);
  25.     }
  26.     fileSize = filePadding.padFile(fos.getChannel(), filePosition);
  27.     byte[] buf = request.getSerializeData();
  28.     long dataSize = oa.getDataSize();
  29.     Checksum crc = makeChecksumAlgorithm();
  30.     crc.update(buf, 0, buf.length);
  31.     oa.writeLong(crc.getValue(), "txnEntryCRC"); // checksum
  32.     Util.writeTxnBytes(oa, buf); // 写len, hdr, txn, digest, 0x42
  33.     unFlushedSize += oa.getDataSize() - dataSize; // 计算未flush字节数
  34.     return true;
  35. }
  36. public long getLastLoggedZxid() {
  37.     File[] files = getLogFiles(logDir.listFiles(), 0);
  38.     long maxLog = files.length > 0 ?
  39.         Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;
  40.     // 最新的log文件的后缀作为zxid
  41.     long zxid = maxLog;
  42.     // 从文件解析最新zxid
  43.     try (FileTxnLog txn = new FileTxnLog(logDir); TxnIterator itr = txn.read(maxLog)) {
  44.         while (true) {
  45.             if (!itr.next()) {
  46.                 break;
  47.             }
  48.             TxnHeader hdr = itr.getHeader();
  49.             zxid = hdr.getZxid();
  50.         }
  51.     } catch (IOException e) {
  52.     }
  53.     return zxid;
  54. }
  55. public synchronized void rollLog() throws IOException {
  56.     if (logStream != null) {
  57.         this.logStream.flush(); // 把当前文件刷写出去
  58.         prevLogsRunningTotal += getCurrentLogSize();
  59.         this.logStream = null; // 重置相关变量,后续append时会创建新的文件
  60.         oa = null;
  61.         fileSize = 0;
  62.         filePosition = 0;
  63.         unFlushedSize = 0;
  64.     }
  65. }
  66. public synchronized void commit() throws IOException {
  67.     if (logStream != null) {
  68.         logStream.flush(); // 刷写文件
  69.         filePosition += unFlushedSize;
  70.         // If we have written more than we have previously preallocated,
  71.         // we should override the fileSize by filePosition.
  72.         if (filePosition > fileSize) {
  73.             fileSize = filePosition;
  74.         }
  75.         unFlushedSize = 0;
  76.     }
  77.     for (FileOutputStream log : streamsToFlush) {
  78.         log.flush(); // 刷写文件
  79.         if (forceSync) {
  80.             long startSyncNS = System.nanoTime();
  81.             FileChannel channel = log.getChannel();
  82.             channel.force(false);
  83.             // 略
  84.         }
  85.     }
  86.     // 关闭文件流
  87.     while (streamsToFlush.size() > 1) {
  88.         streamsToFlush.poll().close();
  89.     }
  90.     // Roll the log file if we exceed the size limit
  91.     if (txnLogSizeLimit > 0) { // 默认-1分支进不来
  92.         long logSize = getCurrentLogSize();
  93.         if (logSize > txnLogSizeLimit) {
  94.             rollLog();
  95.         }
  96.     }
  97. }
  98. // FileTxnIterator封装logFile和输入流对象,可以按照协议从文件流读取txnLog数据
  99. public TxnIterator read(long zxid) throws IOException {
  100.     return read(zxid, true);
  101. }
  102. public TxnIterator read(long zxid, boolean fastForward) throws IOException {
  103.     return new FileTxnIterator(logDir, zxid, fastForward);
  104. }
  105. // 将log文件truncate到指定zxid位置
  106. public boolean truncate(long zxid) throws IOException {
  107.     try (FileTxnIterator itr = new FileTxnIterator(this.logDir, zxid)) {
  108.         PositionInputStream input = itr.inputStream;
  109.         if (input == null) {
  110.             throw new IOException("No log files found to truncate");
  111.         }
  112.         long pos = input.getPosition();
  113.         // now, truncate at the current position
  114.         RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
  115.         raf.setLength(pos);
  116.         raf.close(); // 把最小的文件truncate到指定zxid位置
  117.         while (itr.goToNextLog()) { // 删除所有>zxid的log文件
  118.             if (!itr.logFile.delete()) {
  119.             }
  120.         }
  121.     }
  122.     return true;
  123. }
  124. private static FileHeader readHeader(File file) throws IOException {
  125.     InputStream is = null;
  126.     try {
  127.         is = new BufferedInputStream(new FileInputStream(file));
  128.         InputArchive ia = BinaryInputArchive.getArchive(is);
  129.         FileHeader hdr = new FileHeader();
  130.         hdr.deserialize(ia, "fileheader"); // 反序列化
  131.         return hdr;
  132.     } finally {
  133.         // is.close();
  134.     }
  135. }
复制代码
FileTxnIterator类

this class implements the txnlog iterator interface which is used for reading the transaction logs.
内部使用List保存着比指定zxid大或者含有指定zxid数据的log文件,初始化阶段会定位到参数zxid指定的位置,这样在后续访问时就可以从参数指定的zxid开始读取数据了。
  1. public FileTxnIterator(File logDir, long zxid, boolean fastForward) throws IOException {
  2.     this.logDir = logDir;
  3.     this.zxid = zxid;
  4.     init();
  5.     if (fastForward && hdr != null) {
  6.         while (hdr.getZxid() < zxid) { // 这里将数据移动到zxid位置
  7.             if (!next()) {
  8.                 break;
  9.             }
  10.         }
  11.     }
  12. }
  13. void init() throws IOException {
  14.     storedFiles = new ArrayList<>();
  15.     // 倒序查找log文件
  16.     List<File> files = Util.sortDataDir(
  17.         FileTxnLog.getLogFiles(logDir.listFiles(), 0),
  18.         LOG_FILE_PREFIX,
  19.         false);
  20.     for (File f : files) {
  21.         if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
  22.             storedFiles.add(f);
  23.         } else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
  24.             // add the last logfile that is less than the zxid
  25.             storedFiles.add(f);
  26.             break;
  27.         }
  28.     }
  29.     goToNextLog(); // 定位到下一个文件
  30.     next(); // 定位到下一个log数据
  31. }
复制代码
SnapShot接口和FileSnap实现类

SnapShot接口

snapshot interface for the persistence layer. implement this interface for implementing snapshots.
  1. public interface SnapShot {
  2.     // deserialize a data tree from the last valid snapshot and return the last zxid that was deserialized
  3.     long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;
  4.     // persist the datatree and the sessions into a persistence storage
  5.     void serialize(DataTree dt, Map<Long, Integer> sessions, File name, boolean fsync) throws IOException;
  6.     // find the most recent snapshot file
  7.     File findMostRecentSnapshot() throws IOException;
  8.     // get information of the last saved/restored snapshot
  9.     SnapshotInfo getLastSnapshotInfo();
  10.     // free resources from this snapshot immediately
  11.     void close() throws IOException;
  12. }
复制代码
FileSnap实现类

负责存储、序列化和反序列化正确的快照。并提供对快照的访问:
  1. public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
  2.     // 在snapDir下查找合法的快照文件,倒序,所以最新的在前面
  3.     List<File> snapList = findNValidSnapshots(100);
  4.     File snap = null;
  5.     long snapZxid = -1;
  6.     boolean foundValid = false;
  7.     for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
  8.         snap = snapList.get(i);
  9.         snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
  10.         try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
  11.             InputArchive ia = BinaryInputArchive.getArchive(snapIS);
  12.             deserialize(dt, sessions, ia); // 将数据反序列到dt
  13.             SnapStream.checkSealIntegrity(snapIS, ia);
  14.             // Deserializing the zxid digest from the input
  15.             // stream and update the digestFromLoadedSnapshot.
  16.             // 格式: zxid digestVersion digest
  17.             if (dt.deserializeZxidDigest(ia, snapZxid)) {
  18.                 SnapStream.checkSealIntegrity(snapIS, ia);
  19.             }
  20.             // deserialize lastProcessedZxid and check inconsistency
  21.             // 读lastZxid字段得到
  22.             if (dt.deserializeLastProcessedZxid(ia)) {
  23.                 SnapStream.checkSealIntegrity(snapIS, ia);
  24.             }
  25.             foundValid = true;
  26.             break;
  27.         } catch (IOException e) {}
  28.     }
  29.     // 验证foundValid
  30.     // 上次处理到的zxid
  31.     dt.lastProcessedZxid = snapZxid;
  32.     lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
  33.     // compare the digest if this is not a fuzzy snapshot, we want to compare and find inconsistent asap
  34.     if (dt.getDigestFromLoadedSnapshot() != null) {
  35.         dt.compareSnapshotDigests(dt.lastProcessedZxid);
  36.     }
  37.     return dt.lastProcessedZxid;
  38. }
  39. public static void deserialize(
  40.         DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
  41.     FileHeader header = new FileHeader(); // magic, version, dbid
  42.     header.deserialize(ia, "fileheader"); // 解析文件头并验证magic
  43.     if (header.getMagic() != SNAP_MAGIC) {
  44.         throw new IOException("mismatching magic headers");
  45.     }
  46.     // 反序列化
  47.     // 会话:
  48.     //   Count Session(s)
  49.     //   Session {id, timeout}
  50.     // 节点:
  51.     //   AclCache PathNode(s)
  52.     //   PathNode {path, node}
  53.     //   node {data, acl, stat}
  54.     SerializeUtils.deserializeSnapshot(dt, ia, sessions);
  55. }
  56. protected List<File> findNValidSnapshots(int n) {
  57.     // 在snapDir下查找快照文件,倒序,最新的在前面
  58.     List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
  59.     int count = 0;
  60.     List<File> list = new ArrayList<>();
  61.     for (File f : files) {
  62.         try {
  63.             if (SnapStream.isValidSnapshot(f)) { // 验证文件合法
  64.                 list.add(f);
  65.                 count++;
  66.                 if (count == n) {
  67.                     break;
  68.                 }
  69.             }
  70.         } catch (IOException e) {}
  71.     }
  72.     return list;
  73. }
  74. public List<File> findNRecentSnapshots(int n) throws IOException {
  75.     // 在snapDir下查找快照文件,倒序,最新的在前面
  76.     List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
  77.     int count = 0;
  78.     List<File> list = new ArrayList<>();
  79.     for (File f : files) {
  80.         if (count == n) {
  81.             break;
  82.         }
  83.         if (Util.getZxidFromName(f.getName(), SNAPSHOT_FILE_PREFIX) != -1) {
  84.             count++;
  85.             list.add(f);
  86.         }
  87.     }
  88.     return list;
  89. }
  90. protected void serialize(
  91.         DataTree dt, Map<Long, Integer> sessions, OutputArchive oa, FileHeader header) throws IOException {
  92.     // 验证header!=null
  93.     header.serialize(oa, "fileheader"); // 序列化文件头
  94.     SerializeUtils.serializeSnapshot(dt, oa, sessions); // 序列化dataTree
  95. }
  96. public synchronized void serialize(
  97.         DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) throws IOException {
  98.     if (!close) {
  99.         try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot, fsync)) {
  100.             OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);
  101.             FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
  102.             serialize(dt, sessions, oa, header);
  103.             SnapStream.sealStream(snapOS, oa);
  104.             // 序列化digest
  105.             if (dt.serializeZxidDigest(oa)) {
  106.                 SnapStream.sealStream(snapOS, oa);
  107.             }
  108.             // 序列化lastProcessZxid
  109.             if (dt.serializeLastProcessedZxid(oa)) {
  110.                 SnapStream.sealStream(snapOS, oa);
  111.             }
  112.             lastSnapshotInfo = new SnapshotInfo(
  113.                 Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),
  114.                 snapShot.lastModified() / 1000);
  115.         }
  116.     } else {
  117.         throw new IOException("FileSnap has already been closed");
  118.     }
  119. }
复制代码
DatadirCleanupManager

启动周期任务

清理过期文件,保留最新的snapRetainCount个snapshot文件和对应的txnlog文件,将其余过期的文件删除掉。
purgeInterval参数指定执行周期(小时),默认0不开启清理功能。
  1. public void start() {
  2.     if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
  3.         return;
  4.     }
  5.     // Don't schedule the purge task with zero or negative purge interval.
  6.     if (purgeInterval <= 0) {
  7.         return;
  8.     }
  9.     timer = new Timer("PurgeTask", true);
  10.     TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
  11.     timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
  12.     purgeTaskStatus = PurgeTaskStatus.STARTED;
  13. }
复制代码
ContainerManager

负责清理container节点,只能有leader管理。启动后,定期检查cversion>0且没有子级的container节点和ttl节点。尝试删除节点,删除的结果并不重要。如果提议失败或容器节点不为空,则没有任何危害。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

郭卫东

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表