zk源码—4.会话的实现原理二

打印 上一主题 下一主题

主题 1660|帖子 1660|积分 4980

大纲
1.创建会话
(1)客户端的会话状态
(2)服务端的会话创建
(3)会话ID的初始化实现
(4)设置的会话超时时间没见效的缘故原由
2.分桶计谋和会话管理
(1)分桶计谋和过期队列
(2)会话激活
(3)会话超时查抄
(4)会话清理

2.分桶计谋和会话管理
(1)分桶计谋和过期队列
(2)会话激活
(3)会话超时查抄
(4)会话清理

zk作为分布式系统的焦点组件,经常要处理惩罚大量的会话请求。zk之所以能快速响应大量客户端利用,与它自身的会话管理计谋密不可分。

(1)分桶计谋和过期队列
一.会话管理中的心跳消息和过期时间
二.分桶计谋的原理
三.分桶计谋的过期队列和bucket

一.会话管理中的心跳消息和过期时间
在zk中为了保持会话的存活状态,客户端要向服务端周期性发送心跳信息。客户端的心跳信息可以是一个PING请求,也可以是一个普通的业务请求。

zk服务端收到请求后,便会更新会话的过期时间,来保持会话的存活状态。因此zk的会话管理,最重要的工作就是管理会话的过期时间。

zk服务端的会话管理是由SessionTracker负责的,会话管理器SessionTracker接纳了分桶计谋来管理会话的过期时间。

二.分桶计谋的原理
会话管理器SessionTracker会按照差别的时间隔断对会话进行划分,超时时间相近的会话将会被放在同一个隔断区间中。

具体的划分原则就是:每个会话的最近过期时间点ExpirationTime,ExpirationTime是指会话最近的过期时间点。


对于一个新会话创建完毕后,zk服务端都会计算其ExpirationTime,会话管理器SessionTracker会每隔ExpirationInterval进行会话超时查抄。
  1. //CurrentTime是指当前时间,单位毫秒
  2. //SessionTimeout指会话的超时时间,单位毫秒
  3. //SessionTrackerImpl会每隔ExpirationInterval进行会话超时检查
  4. ExpirationTime = CurrentTime + SessionTimeout
  5. ExpirationTime = (ExpirationTime / ExpirationInterval + 1) * ExpirationInterval
复制代码
这种方式避免了对每一个会话进行查抄。接纳分批次的方式管理会话,可以低落会话管理的难度。由于每次小批量的处理惩罚会话过期可以提高会话处理惩罚的服从。

三.分桶计谋的过期队列和bucket
zk服务端全部会话过期的相干利用都是围绕过期队列来进行的,可以说zk服务端底层就是通过这个过期队列来管理会话过期的。过期队列就是ExpiryQueue范例的sessionExpiryQueue。
  1. public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
  2.     private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//过期队列
  3.     private final AtomicLong nextSessionId = new AtomicLong();//当前生成的会话ID
  4.     ConcurrentHashMap<Long, SessionImpl> sessionsById;//根据会话ID来管理具体的会话实体
  5.     ConcurrentMap<Long, Integer> sessionsWithTimeout;//根据不同的会话ID管理每个会话的超时时间
  6.     ...
  7. }
复制代码
什么是bucket:
SessionTracker的过期队列是ExpiryQueue范例的,ExpiryQueue范例的过期队列会由若干个bucket构成。每个bucket是以expirationInterval为单位进行时间区间划分的。每个bucket中会存放一些在某一时间点内过期的会话。

怎样实现过期队列:
在zk中会使用ExpiryQueue类来实现一个会话过期队列。ExpiryQueue类中有两个HashMap:elemMap和一个expiryMap。elemMap中存放会话对象SessionImpl及其对应的最近过期时间点,expiryMap中存放的就是过期队列。

expiryMap的key就是bucket的时间划分,即会话的最近过期时间点。expiryMap的value就是bucket中存放的某一时间内过期的会话集合。所以bucket可以理解为一个Set会话对象集合。expiryMap是线程安全的HaspMap,可根据差别的过期时间区间存放会话。expiryMap过期队列中的一个过期时间点就对应一个bucket。

ExpiryQueue中也实现了remove()、update()、poll()等队列的利用方法。超时查抄的定时使命一开始会获取最近的会话过期时间点看看当前是否已经到达,然后从过期队列中poll出bucket时会更新下一次的最近的会话过期时间点。
  1. public class ExpiryQueue<E> {
  2.     //存放会话对象SessionImpl及其对应的最近的过期时间点
  3.     private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
  4.     //存放过期队列,bucket可以理解为一个Set<SessionImpl>会话对象集合
  5.     private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
  6.     //最近的一批会话的过期时间点
  7.     private final AtomicLong nextExpirationTime = new AtomicLong();
  8.     //将会话划分到一个个bucket的时间间隔,也是超时检查线程定时检查时间间隔
  9.     private final int expirationInterval;
  10.     public ExpiryQueue(int expirationInterval) {
  11.         this.expirationInterval = expirationInterval;
  12.         nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
  13.     }
  14.    
  15.     private long roundToNextInterval(long time) {
  16.         return (time / expirationInterval + 1) * expirationInterval;
  17.     }
  18.    
  19.     public long getWaitTime() {
  20.         long now = Time.currentElapsedTime();
  21.         long expirationTime = nextExpirationTime.get();
  22.         return now < expirationTime ? (expirationTime - now) : 0L;
  23.     }
  24.    
  25.     public Long update(E elem, int timeout) {
  26.         Long prevExpiryTime = elemMap.get(elem);
  27.         long now = Time.currentElapsedTime();
  28.         Long newExpiryTime = roundToNextInterval(now + timeout);
  29.         if (newExpiryTime.equals(prevExpiryTime)) {
  30.             // No change, so nothing to update
  31.             return null;
  32.         }
  33.         // First add the elem to the new expiry time bucket in expiryMap.
  34.         Set<E> set = expiryMap.get(newExpiryTime);
  35.         if (set == null) {
  36.             // Construct a ConcurrentHashSet using a ConcurrentHashMap
  37.             set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
  38.             // Put the new set in the map, but only if another thread hasn't beaten us to it
  39.             Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
  40.             if (existingSet != null) {
  41.                 set = existingSet;
  42.             }
  43.         }
  44.         set.add(elem);
  45.         // Map the elem to the new expiry time. If a different previous mapping was present, clean up the previous expiry bucket.
  46.         prevExpiryTime = elemMap.put(elem, newExpiryTime);
  47.         if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
  48.             Set<E> prevSet = expiryMap.get(prevExpiryTime);
  49.             if (prevSet != null) {
  50.                 prevSet.remove(elem);
  51.             }
  52.         }
  53.         return newExpiryTime;
  54.     }
  55.    
  56.     public Set<E> poll() {
  57.         long now = Time.currentElapsedTime();
  58.         long expirationTime = nextExpirationTime.get();
  59.         if (now < expirationTime) {
  60.             return Collections.emptySet();
  61.         }
  62.         Set<E> set = null;
  63.         long newExpirationTime = expirationTime + expirationInterval;
  64.         //设置最近的会话过期时间点
  65.         if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
  66.             set = expiryMap.remove(expirationTime);
  67.         }
  68.         if (set == null) {
  69.             return Collections.emptySet();
  70.         }
  71.         return set;
  72.     }
  73.    
  74.     public Long remove(E elem) {
  75.         Long expiryTime = elemMap.remove(elem);
  76.         if (expiryTime != null) {
  77.             Set<E> set = expiryMap.get(expiryTime);
  78.             if (set != null) {
  79.                 set.remove(elem);
  80.             }
  81.         }
  82.         return expiryTime;
  83.     }
  84.     ...
  85. }
复制代码
(2)会话激活
一.查抄该会话是否已经被关闭
二.计算该会话新的过期时间点newExpiryTime
三.将该会话添加到新的过期时间点对应的bucket中
四.将该会话从旧的过期时间点对应的bucket中移除

为了保持客户端会话的有用性,客户端要不停发送PING请求进行心跳检测。服务端要不停接收客户端的这个心跳检测,并重新激活对应的客户端会话。这个重新激活会话的过程由SessionTracker的touchSession()方法实现。

服务端处理惩罚PING请求的重要流程如下:
  1. public class NIOServerCnxnFactory extends ServerCnxnFactory {
  2.     private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;
  3.     ...
  4.     public void start() {
  5.         stopped = false;
  6.         if (workerPool == null) {
  7.             workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
  8.         }
  9.         for (SelectorThread thread : selectorThreads) {
  10.             if (thread.getState() == Thread.State.NEW) {
  11.                 thread.start();
  12.             }
  13.         }
  14.         if (acceptThread.getState() == Thread.State.NEW) {
  15.             acceptThread.start();
  16.         }
  17.         if (expirerThread.getState() == Thread.State.NEW) {
  18.             expirerThread.start();
  19.         }
  20.     }
  21.    
  22.     class SelectorThread extends AbstractSelectThread {
  23.         @Override
  24.         public void run() {
  25.             ...
  26.             while (!stopped) {
  27.                 select();
  28.                 ...
  29.             }
  30.             ...
  31.         }
  32.         
  33.         private void select() {
  34.             selector.select();
  35.             Set<SelectionKey> selected = selector.selectedKeys();
  36.             ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
  37.             Collections.shuffle(selectedList);
  38.             Iterator<SelectionKey> selectedKeys = selectedList.iterator();
  39.             while (!stopped && selectedKeys.hasNext()) {
  40.                 SelectionKey key = selectedKeys.next();
  41.                 selected.remove(key);
  42.                 ...
  43.                 if (key.isReadable() || key.isWritable()) {
  44.                     //服务端从客户端读数据(读取请求) + 服务端向客户端写数据(发送响应)
  45.                     handleIO(key);
  46.                 }
  47.                 ...
  48.             }
  49.         }
  50.         
  51.         private void handleIO(SelectionKey key) {
  52.             IOWorkRequest workRequest = new IOWorkRequest(this, key);
  53.             NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
  54.             cnxn.disableSelectable();
  55.             key.interestOps(0);
  56.             //激活连接:添加连接到连接过期队列
  57.             touchCnxn(cnxn);
  58.             //通过工作线程池来处理请求
  59.             workerPool.schedule(workRequest);
  60.         }
  61.         ...
  62.     }
  63.    
  64.     public void touchCnxn(NIOServerCnxn cnxn) {
  65.         cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
  66.     }
  67.     ...
  68. }
  69. public class WorkerService {
  70.     ...
  71.     public void schedule(WorkRequest workRequest) {
  72.         schedule(workRequest, 0);
  73.     }
  74.    
  75.     public void schedule(WorkRequest workRequest, long id) {
  76.         ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
  77.         int size = workers.size();
  78.         if (size > 0) {
  79.             int workerNum = ((int) (id % size) + size) % size;
  80.             ExecutorService worker = workers.get(workerNum);
  81.             worker.execute(scheduledWorkRequest);
  82.         } else {
  83.             scheduledWorkRequest.run();
  84.         }
  85.     }
  86.    
  87.     private class ScheduledWorkRequest implements Runnable {
  88.         private final WorkRequest workRequest;
  89.         
  90.         ScheduledWorkRequest(WorkRequest workRequest) {
  91.             this.workRequest = workRequest;
  92.         }
  93.         
  94.         @Override
  95.         public void run() {
  96.             ...
  97.             workRequest.doWork();
  98.         }
  99.     }
  100.     ...
  101. }
  102. public class NIOServerCnxnFactory extends ServerCnxnFactory {
  103.     private class IOWorkRequest extends WorkerService.WorkRequest {
  104.         private final NIOServerCnxn cnxn;
  105.         public void doWork() throws InterruptedException {
  106.             ...
  107.             if (key.isReadable() || key.isWritable()) {
  108.                 cnxn.doIO(key);
  109.                 ...
  110.             }
  111.             ...
  112.         }
  113.         ...
  114.     }
  115.     ...
  116. }
  117. public class NIOServerCnxn extends ServerCnxn {
  118.     private final ZooKeeperServer zkServer;
  119.    
  120.     void doIO(SelectionKey k) throws InterruptedException {
  121.         ...
  122.         if (k.isReadable()) {
  123.             ...
  124.             readPayload();
  125.             ...
  126.         }
  127.         ...
  128.     }
  129.    
  130.     private void readPayload() throws IOException, InterruptedException {
  131.         ...
  132.         readRequest();
  133.         ...
  134.     }
  135.    
  136.     private void readRequest() throws IOException {
  137.         //处理输入流
  138.         zkServer.processPacket(this, incomingBuffer);
  139.     }
  140.     ...
  141. }
  142. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  143.     ...
  144.     public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
  145.         InputStream bais = new ByteBufferInputStream(incomingBuffer);
  146.         BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
  147.         RequestHeader h = new RequestHeader();
  148.         h.deserialize(bia, "header");
  149.         incomingBuffer = incomingBuffer.slice();
  150.         ...
  151.         Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
  152.         ...
  153.         submitRequest(si);
  154.         ...
  155.     }
  156.    
  157.     public void submitRequest(Request si) {
  158.         ...
  159.         //激活会话
  160.         touch(si.cnxn);
  161.         firstProcessor.processRequest(si);
  162.         ...
  163.     }
  164.    
  165.     void touch(ServerCnxn cnxn) throws MissingSessionException {
  166.         if (cnxn == null) {
  167.             return;
  168.         }
  169.         long id = cnxn.getSessionId();
  170.         int to = cnxn.getSessionTimeout();
  171.         //激活会话
  172.         if (!sessionTracker.touchSession(id, to)) {
  173.             throw new MissingSessionException("No session with sessionid 0x" + Long.toHexString(id) + " exists, probably expired and removed");
  174.         }
  175.     }
  176.     ...
  177. }
复制代码
由于ZooKeeperServer的submitRequest()方法会调用touch()方法激活会话,所以只要客户端有请求发送到服务端,服务端就会进行一次会话激活。

执行SessionTracker的touchSession()方法进行会话激活的重要流程如下:

一.查抄该会话是否已经被关闭
如果该会话已经被关闭,则返回,不消激活会话。

二.计算该会话新的过期时间点newExpiryTime
调用ExpiryQueue的roundToNextInterval()方法计算会话新的过期时间点。通过总时间除以隔断时间然后向上取整再乘以隔断时间来计算新的过期时间点。

三.将该会话添加到新的过期时间点对应的bucket中
从过期队列expiryMap获取新的过期时间点对应的bucket,然后添加该会话到新的过期时间点对应的bucket中。

四.将该会话从旧的过期时间点对应的bucket中移除
从elemMap中获取该会话旧的过期时间点,然后将该会话从旧的过期时间点对应的bucket中移除。
  1. public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
  2.     ...
  3.     ConcurrentHashMap<Long, SessionImpl> sessionsById;//根据会话ID来管理具体的会话实体
  4.    
  5.     synchronized public boolean touchSession(long sessionId, int timeout) {
  6.         SessionImpl s = sessionsById.get(sessionId);
  7.         if (s == null) {
  8.             logTraceTouchInvalidSession(sessionId, timeout);
  9.             return false;
  10.         }
  11.         //1.检查会话是否已经被关闭
  12.         if (s.isClosing()) {
  13.             logTraceTouchClosingSession(sessionId, timeout);
  14.             return false;
  15.         }
  16.         //激活会话
  17.         updateSessionExpiry(s, timeout);
  18.         return true;
  19.     }
  20.    
  21.     private void updateSessionExpiry(SessionImpl s, int timeout) {
  22.         logTraceTouchSession(s.sessionId, timeout, "");
  23.         //激活会话
  24.         sessionExpiryQueue.update(s, timeout);
  25.     }
  26.     ...
  27. }
  28. public class ExpiryQueue<E> {
  29.     //存放会话对象SessionImpl及其对应的最近的过期时间点
  30.     private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
  31.     //存放过期队列,bucket可以理解为一个Set<SessionImpl>会话对象集合
  32.     private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
  33.     //最近的一批会话的过期时间点
  34.     private final AtomicLong nextExpirationTime = new AtomicLong();
  35.     //将会话划分到一个个bucket的时间间隔,也是超时检查线程的定时检查时间间隔
  36.     private final int expirationInterval;
  37.     ...
  38.     private long roundToNextInterval(long time) {
  39.         //通过向上取整来进行计算新的过期时间点
  40.         return (time / expirationInterval + 1) * expirationInterval;
  41.     }
  42.     ...
  43.     public Long update(E elem, int timeout) {
  44.         Long prevExpiryTime = elemMap.get(elem);
  45.         long now = Time.currentElapsedTime();
  46.         //2.计算该会话新的过期时间点newExpiryTime
  47.         Long newExpiryTime = roundToNextInterval(now + timeout);
  48.         if (newExpiryTime.equals(prevExpiryTime)) {
  49.             // No change, so nothing to update
  50.             return null;
  51.         }
  52.       
  53.         //3.从过期队列expiryMap获取新的过期时间点对应的bucket
  54.         //First add the elem to the new expiry time bucket in expiryMap.
  55.         Set<E> set = expiryMap.get(newExpiryTime);
  56.         if (set == null) {
  57.             // Construct a ConcurrentHashSet using a ConcurrentHashMap
  58.             set = Collections.newSetFromMap(new ConcurrentHashMap<E, Boolean>());
  59.             // Put the new set in the map, but only if another thread hasn't beaten us to it
  60.             Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
  61.             if (existingSet != null) {
  62.                 set = existingSet;
  63.             }
  64.         }
  65.         //将会话添加到新的过期时间点对应的bucket中
  66.         set.add(elem);
  67.       
  68.         //4.从elemMap中获取该会话旧的过期时间点
  69.         //Map the elem to the new expiry time. If a different previous mapping was present, clean up the previous expiry bucket.
  70.         prevExpiryTime = elemMap.put(elem, newExpiryTime);
  71.         //然后将该会话从旧的过期时间点对应的bucket中移除
  72.         if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
  73.             Set<E> prevSet = expiryMap.get(prevExpiryTime);
  74.             if (prevSet != null) {
  75.                 prevSet.remove(elem);
  76.             }
  77.         }
  78.         return newExpiryTime;
  79.     }
  80.     ...
  81. }
复制代码
(3)会话超时查抄
SessionTracker中会有一个线程专门进行会话超时查抄,该线程会依次对bucket会话桶中剩下的会话进行清理,超时查抄线程的定时查抄时间隔断实在就是expirationInterval。

当一个会话被激活时,SessionTracker会将其从上一个bucket会话桶迁移到下一个bucket会话桶。所以超时查抄线程的使命就是查抄bucket会话桶中没被迁移的会话。

超时查抄线程是怎样进行定时查抄的:
由于会话分桶计谋会将expirationInterval的倍数作为会话最近过期时间点,所以超时查抄线程只要在expirationInterval倍数的时间点进行查抄即可。如许既提高了服从,而且由于是批量清理,因此性能也非常好。这也是zk要通过分桶计谋来管理客户端会话的最重要缘故原由。一个zk集群的客户端会话可能会非常多,逐个依次查抄会非常耗费时间。
  1. public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
  2.     private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//会话过期队列
  3.     private final SessionExpirer expirer;
  4.     ...
  5.     //超时检查线程
  6.     @Override
  7.     public void run() {
  8.         try {
  9.             while (running) {
  10.                 //获取会话过期队列中最近的过期时间和当前时间之差
  11.                 long waitTime = sessionExpiryQueue.getWaitTime();
  12.                 if (waitTime > 0) {
  13.                     //时间未到则进行睡眠
  14.                     Thread.sleep(waitTime);
  15.                     continue;
  16.                 }
  17.                 for (SessionImpl s : sessionExpiryQueue.poll()) {
  18.                     //设置过期的会话状态为已关闭
  19.                     setSessionClosing(s.sessionId);
  20.                     //对会话进行过期处理
  21.                     expirer.expire(s);
  22.                 }
  23.             }
  24.         } catch (InterruptedException e) {
  25.             handleException(this.getName(), e);
  26.         }
  27.         LOG.info("SessionTrackerImpl exited loop!");
  28.     }
  29.     ...
  30. }
  31. public class ExpiryQueue<E> {
  32.     private final AtomicLong nextExpirationTime = new AtomicLong();
  33.     ...
  34.     public long getWaitTime() {
  35.         //当前时间
  36.         long now = Time.currentElapsedTime();
  37.         //获取最近的过期时间点
  38.         long expirationTime = nextExpirationTime.get();
  39.         return now < expirationTime ? (expirationTime - now) : 0L;
  40.     }
  41.    
  42.     public Set<E> poll() {
  43.         long now = Time.currentElapsedTime();
  44.         //获取最近的过期时间点
  45.         long expirationTime = nextExpirationTime.get();
  46.         if (now < expirationTime) {
  47.             return Collections.emptySet();
  48.         }
  49.         Set<E> set = null;
  50.         //根据expirationInterval计算最新的最近过期时间点
  51.         long newExpirationTime = expirationTime + expirationInterval;
  52.         //重置bucket桶中最近的过期时间点
  53.         if (nextExpirationTime.compareAndSet(expirationTime, newExpirationTime)) {
  54.             //移出过期队列
  55.             set = expiryMap.remove(expirationTime);
  56.         }
  57.         if (set == null) {
  58.             return Collections.emptySet();
  59.         }
  60.         return set;
  61.     }
  62.     ...
  63. }
复制代码
(4)会话清理
当SessionTracker的会话超时查抄线程遍历出一些已经过期的会话时,就要进行会话清理了,会话清理的步骤如下:

一.标记会话状态为已关闭
二.发起关闭会话请求
三.网络临时节点
四.添加临时节点的删除请求到事务变动队列
五.删除临时节点
六.移除会话
七.关闭NIOServerCnxn

一.标记会话状态为已关闭
SessionTracker的setSessionClosing()方法会标记会话状态为已关闭,这是由于整个会话清理过程须要一段时间,为了包管在会话清理期间不再处理惩罚来自该会话对应的客户端的请求,SessionTracker会首先将该会话的isClosing属性标记为true。

二.发起关闭会话请求
ZooKeeperServer的expire()方法和close()方法会发起关闭会话请求,为了使对该会话的关闭利用在整个服务端集群中都见效,zk使用提交"关闭会话"请求的方式,将请求交给PrepRequestProcessor处理惩罚。

三.网络临时节点
PrepRequestProcessor的pRequest2Txn()方法会网络须要清理的临时节点。在zk中,一旦某个会话失效,那么和该会话相干的临时节点也要被清撤除。因此须要首先将服务器上全部和该会话相干的临时节点找出来。

zk的内存数据库会为每个会话都生存一份由该会话维护的临时节点集合。因此在会话清理阶段,只需根据当前即将关闭的会话的sessionID,便可以从zk的内存数据库中获取到该会话的临时节点列表。

四.添加临时节点的删除请求到事务变动队列
将临时节点的删除请求添加到事务变动队列outstandingChanges中。完成该会话相干的临时节点网络后,zk会将这些临时节点逐个转换成节点删除请求,添加到事务变动队列中。

五.删除临时节点
FinalRequestProcessor的processRequest()方法触发删除临时节点。当网络完全部须要删除的临时节点,以及创建了对应的节点删除请求后,便会在FinalRequestProcessor的processRequest()方法中,通过调用ZooKeeperServer的processTxn()方法,调用到ZKDatabase的processTxn()方法,最后调用DataTree的killSession()方法,从而最终删除内存数据库中该会话的全部临时节点。

六.移除会话
在FinalRequestProcessor的processRequest()方法中,会通过调用ZooKeeperServer的processTxn()方法,调用到SessionTracker的removeSession()方法将会话从SessionTracker移除。即从sessionsById、sessionsWithTimeout、sessionExpiryQueue中移除会话。

七.关闭NIOServerCnxn
在FinalRequestProcessor的processRequest()方法中,最后会调用FinalRequestProcessor的closeSession()方法,从NIOServerCnxnFactory的sessionMap中将该会话进行移除。
  1. public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
  2.     private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//会话过期队列
  3.     private final SessionExpirer expirer;
  4.     ...
  5.     //超时检查线程
  6.     @Override
  7.     public void run() {
  8.         while (running) {
  9.             //获取会话过期队列中最近的过期时间和当前时间之差
  10.             long waitTime = sessionExpiryQueue.getWaitTime();
  11.             if (waitTime > 0) {
  12.                 //时间未到则进行睡眠
  13.                 Thread.sleep(waitTime);
  14.                 continue;
  15.             }
  16.             for (SessionImpl s : sessionExpiryQueue.poll()) {
  17.                 //1.设置过期的会话状态为已关闭
  18.                 setSessionClosing(s.sessionId);
  19.                 //2.对会话进行过期处理,ZooKeeperServer实现了SessionExpirer接口
  20.                 expirer.expire(s);
  21.             }
  22.         }
  23.     }
  24.    
  25.     synchronized public void setSessionClosing(long sessionId) {
  26.         SessionImpl s = sessionsById.get(sessionId);
  27.         s.isClosing = true;
  28.     }
  29.    
  30.     //6.移除会话
  31.     synchronized public void removeSession(long sessionId) {
  32.         SessionImpl s = sessionsById.remove(sessionId);
  33.         sessionsWithTimeout.remove(sessionId);
  34.         sessionExpiryQueue.remove(s);
  35.     }
  36.     ...
  37. }
  38. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  39.     public synchronized void startup() {
  40.         if (sessionTracker == null) {
  41.             createSessionTracker();//创建会话管理器
  42.         }
  43.         startSessionTracker();//启动会话管理器的超时检查线程
  44.         setupRequestProcessors();//初始化请求处理链
  45.         registerJMX();
  46.         setState(State.RUNNING);
  47.         notifyAll();
  48.     }
  49.    
  50.     protected void setupRequestProcessors() {
  51.         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  52.         RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
  53.         ((SyncRequestProcessor)syncProcessor).start();
  54.         firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  55.         ((PrepRequestProcessor)firstProcessor).start();
  56.     }
  57.     ...
  58.     public void expire(Session session) {
  59.         long sessionId = session.getSessionId();
  60.         //2.发起关闭会话请求
  61.         close(sessionId);
  62.     }
  63.    
  64.     private void close(long sessionId) {
  65.         Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
  66.         setLocalSessionFlag(si);
  67.         //2.以提交"关闭会话"请求的方式,发起关闭会话请求
  68.         submitRequest(si);
  69.     }
  70.    
  71.     public void submitRequest(Request si) {
  72.         ...
  73.         touch(si.cnxn);
  74.         //2.首先由PrepRequestProcessor请求处理器的processRequest方法进行处理
  75.         firstProcessor.processRequest(si);
  76.         ...
  77.     }
  78.    
  79.     public ProcessTxnResult processTxn(Request request) {
  80.         return processTxn(request, request.getHdr(), request.getTxn());
  81.     }
  82.    
  83.     private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) {
  84.         ...
  85.         //5.ZKDatabase.processTxn方法会根据opCode.closeSession来删除临时节点
  86.         rc = getZKDatabase().processTxn(hdr, txn);
  87.         ...
  88.         if (opCode == OpCode.createSession) {
  89.             ...
  90.         } else if (opCode == OpCode.closeSession) {
  91.             //6.移除会话
  92.             sessionTracker.removeSession(sessionId);
  93.         }
  94.         return rc;
  95.     }
  96.     ...
  97. }
  98. public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
  99.     LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
  100.     private final RequestProcessor nextProcessor;
  101.     ZooKeeperServer zks;
  102.    
  103.     public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
  104.         super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
  105.         this.nextProcessor = nextProcessor;
  106.         this.zks = zks;
  107.     }
  108.     ...
  109.     public void processRequest(Request request) {
  110.         submittedRequests.add(request);
  111.     }
  112.    
  113.     @Override
  114.     public void run() {
  115.         while (true) {
  116.             Request request = submittedRequests.take();
  117.             pRequest(request);
  118.         }
  119.     }
  120.    
  121.     protected void pRequest(Request request) throws RequestProcessorException {
  122.         ...
  123.         case OpCode.closeSession:
  124.             pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
  125.             break;
  126.         ...
  127.         //交给下一个请求处理器处理
  128.         nextProcessor.processRequest(request);
  129.     }
  130.    
  131.     protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
  132.         //将请求标记为事务请求
  133.         request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
  134.         ...
  135.         case OpCode.closeSession:
  136.             //3.收集需要清理的临时节点
  137.             Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
  138.             synchronized (zks.outstandingChanges) {
  139.                 ...
  140.                 for (String path2Delete : es) {
  141.                     //4.将临时节点的删除请求添加到事务变更队列outstandingChanges中
  142.                     addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null));
  143.                 }
  144.                 zks.sessionTracker.setSessionClosing(request.sessionId);
  145.             }
  146.             break;
  147.         ...
  148.     }
  149.    
  150.     private void addChangeRecord(ChangeRecord c) {
  151.         //4.将临时节点的删除请求添加到事务变更队列outstandingChanges中
  152.         synchronized (zks.outstandingChanges) {
  153.             zks.outstandingChanges.add(c);
  154.             zks.outstandingChangesForPath.put(c.path, c);
  155.         }
  156.     }
  157.     ...
  158. }
  159. public class FinalRequestProcessor implements RequestProcessor {
  160.     ZooKeeperServer zks;
  161.    
  162.     public void processRequest(Request request) {
  163.         ...
  164.         //5.删除临时节点 + 6.移除会话
  165.         rc = zks.processTxn(request);
  166.         ...
  167.         if (request.type == OpCode.closeSession && connClosedByClient(request)) {
  168.             //7.关闭NIOServerCnxn
  169.             if (closeSession(zks.serverCnxnFactory, request.sessionId) ||
  170.                 closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
  171.                 return;
  172.             }
  173.         }
  174.         ...
  175.     }
  176.    
  177.     private boolean closeSession(ServerCnxnFactory serverCnxnFactory, long sessionId) {
  178.         if (serverCnxnFactory == null) {
  179.             return false;
  180.         }
  181.         //7.关闭NIOServerCnxn
  182.         return serverCnxnFactory.closeSession(sessionId);
  183.     }
  184.     ...
  185. }
  186. public class NIOServerCnxnFactory extends ServerCnxnFactory {
  187.     private final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap = new ConcurrentHashMap<Long, NIOServerCnxn>();
  188.     ...
  189.     public void addSession(long sessionId, NIOServerCnxn cnxn) {
  190.         sessionMap.put(sessionId, cnxn);
  191.     }
  192.    
  193.     @Override
  194.     public boolean closeSession(long sessionId) {
  195.         NIOServerCnxn cnxn = sessionMap.remove(sessionId);
  196.         if (cnxn != null) {
  197.             cnxn.close();
  198.             return true;
  199.         }
  200.         return false;
  201.     }
  202.     ...
  203. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万有斥力

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表