AQS源码解读----AbstractQueuedSynchronizer

打印 上一主题 下一主题

主题 948|帖子 948|积分 2844

  1.   36 package cn.com.pep;
  2.   37 import java.util.concurrent.TimeUnit;
  3.   38 import java.util.concurrent.locks.AbstractOwnableSynchronizer;
  4.   39 import java.util.concurrent.locks.Condition;
  5.   40 import java.util.concurrent.locks.LockSupport;
  6.   41 import java.util.ArrayList;
  7.   42 import java.util.Collection;
  8.   43 import java.util.Date;
  9.   44
  10.   45 import sun.misc.Unsafe;
  11.   46
  12.   47 /**
  13.   48  * Provides a framework for implementing blocking locks and related
  14.   49  * synchronizers (semaphores, events, etc) that rely on
  15.   50  * first-in-first-out (FIFO) wait queues.  This class is designed to
  16.   51  * be a useful basis for most kinds of synchronizers that rely on a
  17.   52  * single atomic {@code int} value to represent state. Subclasses
  18.   53  * must define the protected methods that change this state, and which
  19.   54  * define what that state means in terms of this object being acquired
  20.   55  * or released.  Given these, the other methods in this class carry
  21.   56  * out all queuing and blocking mechanics. Subclasses can maintain
  22.   57  * other state fields, but only the atomically updated {@code int}
  23.   58  * value manipulated using methods {@link #getState}, {@link
  24.   59  * #setState} and {@link #compareAndSetState} is tracked with respect
  25.   60  * to synchronization.
  26.   61  *
  27.   62  * <p>Subclasses should be defined as non-public internal helper
  28.   63  * classes that are used to implement the synchronization properties
  29.   64  * of their enclosing class.  Class
  30.   65  * {@code AbstractQueuedSynchronizer} does not implement any
  31.   66  * synchronization interface.  Instead it defines methods such as
  32.   67  * {@link #acquireInterruptibly} that can be invoked as
  33.   68  * appropriate by concrete locks and related synchronizers to
  34.   69  * implement their public methods.
  35.   70  *
  36.   71  * <p>This class supports either or both a default <em>exclusive</em>
  37.   72  * mode and a <em>shared</em> mode. When acquired in exclusive mode,
  38.   73  * attempted acquires by other threads cannot succeed. Shared mode
  39.   74  * acquires by multiple threads may (but need not) succeed. This class
  40.   75  * does not "understand" these differences except in the
  41.   76  * mechanical sense that when a shared mode acquire succeeds, the next
  42.   77  * waiting thread (if one exists) must also determine whether it can
  43.   78  * acquire as well. Threads waiting in the different modes share the
  44.   79  * same FIFO queue. Usually, implementation subclasses support only
  45.   80  * one of these modes, but both can come into play for example in a
  46.   81  * {@link ReadWriteLock}. Subclasses that support only exclusive or
  47.   82  * only shared modes need not define the methods supporting the unused mode.
  48.   83  *
  49.   84  * <p>This class defines a nested {@link ConditionObject} class that
  50.   85  * can be used as a {@link Condition} implementation by subclasses
  51.   86  * supporting exclusive mode for which method {@link
  52.   87  * #isHeldExclusively} reports whether synchronization is exclusively
  53.   88  * held with respect to the current thread, method {@link #release}
  54.   89  * invoked with the current {@link #getState} value fully releases
  55.   90  * this object, and {@link #acquire}, given this saved state value,
  56.   91  * eventually restores this object to its previous acquired state.  No
  57.   92  * {@code AbstractQueuedSynchronizer} method otherwise creates such a
  58.   93  * condition, so if this constraint cannot be met, do not use it.  The
  59.   94  * behavior of {@link ConditionObject} depends of course on the
  60.   95  * semantics of its synchronizer implementation.
  61.   96  *
  62.   97  * <p>This class provides inspection, instrumentation, and monitoring
  63.   98  * methods for the internal queue, as well as similar methods for
  64.   99  * condition objects. These can be exported as desired into classes
  65. 100  * using an {@code AbstractQueuedSynchronizer} for their
  66. 101  * synchronization mechanics.
  67. 102  *
  68. 103  * <p>Serialization of this class stores only the underlying atomic
  69. 104  * integer maintaining state, so deserialized objects have empty
  70. 105  * thread queues. Typical subclasses requiring serializability will
  71. 106  * define a {@code readObject} method that restores this to a known
  72. 107  * initial state upon deserialization.
  73. 108  *
  74. 109  * <h3>Usage</h3>
  75. 110  *
  76. 111  * <p>To use this class as the basis of a synchronizer, redefine the
  77. 112  * following methods, as applicable, by inspecting and/or modifying
  78. 113  * the synchronization state using {@link #getState}, {@link
  79. 114  * #setState} and/or {@link #compareAndSetState}:
  80. 115  *
  81. 116  * <ul>
  82. 117  * <li> {@link #tryAcquire}
  83. 118  * <li> {@link #tryRelease}
  84. 119  * <li> {@link #tryAcquireShared}
  85. 120  * <li> {@link #tryReleaseShared}
  86. 121  * <li> {@link #isHeldExclusively}
  87. 122  * </ul>
  88. 123  *
  89. 124  * Each of these methods by default throws {@link
  90. 125  * UnsupportedOperationException}.  Implementations of these methods
  91. 126  * must be internally thread-safe, and should in general be short and
  92. 127  * not block. Defining these methods is the <em>only</em> supported
  93. 128  * means of using this class. All other methods are declared
  94. 129  * {@code final} because they cannot be independently varied.
  95. 130  *
  96. 131  * <p>You may also find the inherited methods from {@link
  97. 132  * AbstractOwnableSynchronizer} useful to keep track of the thread
  98. 133  * owning an exclusive synchronizer.  You are encouraged to use them
  99. 134  * -- this enables monitoring and diagnostic tools to assist users in
  100. 135  * determining which threads hold locks.
  101. 136  *
  102. 137  * <p>Even though this class is based on an internal FIFO queue, it
  103. 138  * does not automatically enforce FIFO acquisition policies.  The core
  104. 139  * of exclusive synchronization takes the form:
  105. 140  *
  106. 141  * <pre>
  107. 142  * Acquire:
  108. 143  *     while (!tryAcquire(arg)) {
  109. 144  *        <em>enqueue thread if it is not already queued</em>;
  110. 145  *        <em>possibly block current thread</em>;
  111. 146  *     }
  112. 147  *
  113. 148  * Release:
  114. 149  *     if (tryRelease(arg))
  115. 150  *        <em>unblock the first queued thread</em>;
  116. 151  * </pre>
  117. 152  *
  118. 153  * (Shared mode is similar but may involve cascading signals.)
  119. 154  *
  120. 155  * <p id="barging">Because checks in acquire are invoked before
  121. 156  * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
  122. 157  * others that are blocked and queued.  However, you can, if desired,
  123. 158  * define {@code tryAcquire} and/or {@code tryAcquireShared} to
  124. 159  * disable barging by internally invoking one or more of the inspection
  125. 160  * methods, thereby providing a <em>fair</em> FIFO acquisition order.
  126. 161  * In particular, most fair synchronizers can define {@code tryAcquire}
  127. 162  * to return {@code false} if {@link #hasQueuedPredecessors} (a method
  128. 163  * specifically designed to be used by fair synchronizers) returns
  129. 164  * {@code true}.  Other variations are possible.
  130. 165  *
  131. 166  * <p>Throughput and scalability are generally highest for the
  132. 167  * default barging (also known as <em>greedy</em>,
  133. 168  * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
  134. 169  * While this is not guaranteed to be fair or starvation-free, earlier
  135. 170  * queued threads are allowed to recontend before later queued
  136. 171  * threads, and each recontention has an unbiased chance to succeed
  137. 172  * against incoming threads.  Also, while acquires do not
  138. 173  * "spin" in the usual sense, they may perform multiple
  139. 174  * invocations of {@code tryAcquire} interspersed with other
  140. 175  * computations before blocking.  This gives most of the benefits of
  141. 176  * spins when exclusive synchronization is only briefly held, without
  142. 177  * most of the liabilities when it isn't. If so desired, you can
  143. 178  * augment this by preceding calls to acquire methods with
  144. 179  * "fast-path" checks, possibly prechecking {@link #hasContended}
  145. 180  * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
  146. 181  * is likely not to be contended.
  147. 182  *
  148. 183  * <p>This class provides an efficient and scalable basis for
  149. 184  * synchronization in part by specializing its range of use to
  150. 185  * synchronizers that can rely on {@code int} state, acquire, and
  151. 186  * release parameters, and an internal FIFO wait queue. When this does
  152. 187  * not suffice, you can build synchronizers from a lower level using
  153. 188  * {@link java.util.concurrent.atomic atomic} classes, your own custom
  154. 189  * {@link java.util.Queue} classes, and {@link LockSupport} blocking
  155. 190  * support.
  156. 191  *
  157. 192  * <h3>Usage Examples</h3>
  158. 193  *
  159. 194  * <p>Here is a non-reentrant mutual exclusion lock class that uses
  160. 195  * the value zero to represent the unlocked state, and one to
  161. 196  * represent the locked state. While a non-reentrant lock
  162. 197  * does not strictly require recording of the current owner
  163. 198  * thread, this class does so anyway to make usage easier to monitor.
  164. 199  * It also supports conditions and exposes
  165. 200  * one of the instrumentation methods:
  166. 201  *
  167. 202  *  <pre> {@code
  168. 203  * class Mutex implements Lock, java.io.Serializable {
  169. 204  *
  170. 205  *   // Our internal helper class
  171. 206  *   private static class Sync extends AbstractQueuedSynchronizer {
  172. 207  *     // Reports whether in locked state
  173. 208  *     protected boolean isHeldExclusively() {
  174. 209  *       return getState() == 1;
  175. 210  *     }
  176. 211  *
  177. 212  *     // Acquires the lock if state is zero
  178. 213  *     public boolean tryAcquire(int acquires) {
  179. 214  *       assert acquires == 1; // Otherwise unused
  180. 215  *       if (compareAndSetState(0, 1)) {
  181. 216  *         setExclusiveOwnerThread(Thread.currentThread());
  182. 217  *         return true;
  183. 218  *       }
  184. 219  *       return false;
  185. 220  *     }
  186. 221  *
  187. 222  *     // Releases the lock by setting state to zero
  188. 223  *     protected boolean tryRelease(int releases) {
  189. 224  *       assert releases == 1; // Otherwise unused
  190. 225  *       if (getState() == 0) throw new IllegalMonitorStateException();
  191. 226  *       setExclusiveOwnerThread(null);
  192. 227  *       setState(0);
  193. 228  *       return true;
  194. 229  *     }
  195. 230  *
  196. 231  *     // Provides a Condition
  197. 232  *     Condition newCondition() { return new ConditionObject(); }
  198. 233  *
  199. 234  *     // Deserializes properly
  200. 235  *     private void readObject(ObjectInputStream s)
  201. 236  *         throws IOException, ClassNotFoundException {
  202. 237  *       s.defaultReadObject();
  203. 238  *       setState(0); // reset to unlocked state
  204. 239  *     }
  205. 240  *   }
  206. 241  *
  207. 242  *   // The sync object does all the hard work. We just forward to it.
  208. 243  *   private final Sync sync = new Sync();
  209. 244  *
  210. 245  *   public void lock()                { sync.acquire(1); }
  211. 246  *   public boolean tryLock()          { return sync.tryAcquire(1); }
  212. 247  *   public void unlock()              { sync.release(1); }
  213. 248  *   public Condition newCondition()   { return sync.newCondition(); }
  214. 249  *   public boolean isLocked()         { return sync.isHeldExclusively(); }
  215. 250  *   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  216. 251  *   public void lockInterruptibly() throws InterruptedException {
  217. 252  *     sync.acquireInterruptibly(1);
  218. 253  *   }
  219. 254  *   public boolean tryLock(long timeout, TimeUnit unit)
  220. 255  *       throws InterruptedException {
  221. 256  *     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  222. 257  *   }
  223. 258  * }}</pre>
  224. 259  *
  225. 260  * <p>Here is a latch class that is like a
  226. 261  * {@link java.util.concurrent.CountDownLatch CountDownLatch}
  227. 262  * except that it only requires a single {@code signal} to
  228. 263  * fire. Because a latch is non-exclusive, it uses the {@code shared}
  229. 264  * acquire and release methods.
  230. 265  *
  231. 266  *  <pre> {@code
  232. 267  * class BooleanLatch {
  233. 268  *
  234. 269  *   private static class Sync extends AbstractQueuedSynchronizer {
  235. 270  *     boolean isSignalled() { return getState() != 0; }
  236. 271  *
  237. 272  *     protected int tryAcquireShared(int ignore) {
  238. 273  *       return isSignalled() ? 1 : -1;
  239. 274  *     }
  240. 275  *
  241. 276  *     protected boolean tryReleaseShared(int ignore) {
  242. 277  *       setState(1);
  243. 278  *       return true;
  244. 279  *     }
  245. 280  *   }
  246. 281  *
  247. 282  *   private final Sync sync = new Sync();
  248. 283  *   public boolean isSignalled() { return sync.isSignalled(); }
  249. 284  *   public void signal()         { sync.releaseShared(1); }
  250. 285  *   public void await() throws InterruptedException {
  251. 286  *     sync.acquireSharedInterruptibly(1);
  252. 287  *   }
  253. 288  * }}</pre>
  254. 289  *
  255. 290  * @since 1.5
  256. 291  * @author Doug Lea
  257. 292  */
  258. 293 public abstract class AbstractQueuedSynchronizer
  259. 294     extends AbstractOwnableSynchronizer
  260. 295     implements java.io.Serializable {
  261. 296
  262. 297     private static final long serialVersionUID = 7373984972572414691L;
  263. 298
  264. 299     /**
  265. 300      * Creates a new {@code AbstractQueuedSynchronizer} instance
  266. 301      * with initial synchronization state of zero.
  267. 302      */
  268. 303     protected AbstractQueuedSynchronizer() { }
  269. 304
  270. 305     /**
  271. 306      * Wait queue node class.
  272. 307      *
  273. 308      * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
  274. 309      * Hagersten) lock queue. CLH locks are normally used for
  275. 310      * spinlocks.  We instead use them for blocking synchronizers, but
  276. 311      * use the same basic tactic of holding some of the control
  277. 312      * information about a thread in the predecessor of its node.  A
  278. 313      * "status" field in each node keeps track of whether a thread
  279. 314      * should block.  A node is signalled when its predecessor
  280. 315      * releases.  Each node of the queue otherwise serves as a
  281. 316      * specific-notification-style monitor holding a single waiting
  282. 317      * thread. The status field does NOT control whether threads are
  283. 318      * granted locks etc though.  A thread may try to acquire if it is
  284. 319      * first in the queue. But being first does not guarantee success;
  285. 320      * it only gives the right to contend.  So the currently released
  286. 321      * contender thread may need to rewait.
  287. 322      *
  288. 323      * <p>To enqueue into a CLH lock, you atomically splice it in as new
  289. 324      * tail. To dequeue, you just set the head field.
  290. 325      * <pre>
  291. 326      *      +------+  prev +-----+       +-----+
  292. 327      * head |      | <---- |     | <---- |     |  tail
  293. 328      *      +------+       +-----+       +-----+
  294. 329      * </pre>
  295. 330      *
  296. 331      * <p>Insertion into a CLH queue requires only a single atomic
  297. 332      * operation on "tail", so there is a simple atomic point of
  298. 333      * demarcation from unqueued to queued. Similarly, dequeuing
  299. 334      * involves only updating the "head". However, it takes a bit
  300. 335      * more work for nodes to determine who their successors are,
  301. 336      * in part to deal with possible cancellation due to timeouts
  302. 337      * and interrupts.
  303. 338      *
  304. 339      * <p>The "prev" links (not used in original CLH locks), are mainly
  305. 340      * needed to handle cancellation. If a node is cancelled, its
  306. 341      * successor is (normally) relinked to a non-cancelled
  307. 342      * predecessor. For explanation of similar mechanics in the case
  308. 343      * of spin locks, see the papers by Scott and Scherer at
  309. 344      * http://www.cs.rochester.edu/u/scott/synchronization/
  310. 345      *
  311. 346      * <p>We also use "next" links to implement blocking mechanics.
  312. 347      * The thread id for each node is kept in its own node, so a
  313. 348      * predecessor signals the next node to wake up by traversing
  314. 349      * next link to determine which thread it is.  Determination of
  315. 350      * successor must avoid races with newly queued nodes to set
  316. 351      * the "next" fields of their predecessors.  This is solved
  317. 352      * when necessary by checking backwards from the atomically
  318. 353      * updated "tail" when a node's successor appears to be null.
  319. 354      * (Or, said differently, the next-links are an optimization
  320. 355      * so that we don't usually need a backward scan.)
  321. 356      *
  322. 357      * <p>Cancellation introduces some conservatism to the basic
  323. 358      * algorithms.  Since we must poll for cancellation of other
  324. 359      * nodes, we can miss noticing whether a cancelled node is
  325. 360      * ahead or behind us. This is dealt with by always unparking
  326. 361      * successors upon cancellation, allowing them to stabilize on
  327. 362      * a new predecessor, unless we can identify an uncancelled
  328. 363      * predecessor who will carry this responsibility.
  329. 364      *
  330. 365      * <p>CLH queues need a dummy header node to get started. But
  331. 366      * we don't create them on construction, because it would be wasted
  332. 367      * effort if there is never contention. Instead, the node
  333. 368      * is constructed and head and tail pointers are set upon first
  334. 369      * contention.
  335. 370      *
  336. 371      * <p>Threads waiting on Conditions use the same nodes, but
  337. 372      * use an additional link. Conditions only need to link nodes
  338. 373      * in simple (non-concurrent) linked queues because they are
  339. 374      * only accessed when exclusively held.  Upon await, a node is
  340. 375      * inserted into a condition queue.  Upon signal, the node is
  341. 376      * transferred to the main queue.  A special value of status
  342. 377      * field is used to mark which queue a node is on.
  343. 378      *
  344. 379      * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
  345. 380      * Scherer and Michael Scott, along with members of JSR-166
  346. 381      * expert group, for helpful ideas, discussions, and critiques
  347. 382      * on the design of this class.
  348. 383      */
  349. 384     static final class Node {
  350. 385         /** Marker to indicate a node is waiting in shared mode */
  351. 386         static final Node SHARED = new Node();
  352. 387         /** Marker to indicate a node is waiting in exclusive mode */
  353. 388         static final Node EXCLUSIVE = null;
  354. 389
  355. 390         /** waitStatus value to indicate thread has cancelled */
  356. 391         static final int CANCELLED =  1;
  357. 392         /** waitStatus value to indicate successor's thread needs unparking */
  358. 393         static final int SIGNAL    = -1;
  359. 394         /** waitStatus value to indicate thread is waiting on condition */
  360. 395         static final int CONDITION = -2;
  361. 396         /**
  362. 397          * waitStatus value to indicate the next acquireShared should
  363. 398          * unconditionally propagate
  364. 399          */
  365. 400         static final int PROPAGATE = -3;
  366. 401
  367. 402         /**
  368. 403          * Status field, taking on only the values:
  369. 404          *   SIGNAL:     The successor of this node is (or will soon be)
  370. 405          *               blocked (via park), so the current node must
  371. 406          *               unpark its successor when it releases or
  372. 407          *               cancels. To avoid races, acquire methods must
  373. 408          *               first indicate they need a signal,
  374. 409          *               then retry the atomic acquire, and then,
  375. 410          *               on failure, block.
  376. 411          *   CANCELLED:  This node is cancelled due to timeout or interrupt.
  377. 412          *               Nodes never leave this state. In particular,
  378. 413          *               a thread with cancelled node never again blocks.
  379. 414          *   CONDITION:  This node is currently on a condition queue.
  380. 415          *               It will not be used as a sync queue node
  381. 416          *               until transferred, at which time the status
  382. 417          *               will be set to 0. (Use of this value here has
  383. 418          *               nothing to do with the other uses of the
  384. 419          *               field, but simplifies mechanics.)
  385. 420          *   PROPAGATE:  A releaseShared should be propagated to other
  386. 421          *               nodes. This is set (for head node only) in
  387. 422          *               doReleaseShared to ensure propagation
  388. 423          *               continues, even if other operations have
  389. 424          *               since intervened.
  390. 425          *   0:          None of the above
  391. 426          *
  392. 427          * The values are arranged numerically to simplify use.
  393. 428          * Non-negative values mean that a node doesn't need to
  394. 429          * signal. So, most code doesn't need to check for particular
  395. 430          * values, just for sign.
  396. 431          *
  397. 432          * The field is initialized to 0 for normal sync nodes, and
  398. 433          * CONDITION for condition nodes.  It is modified using CAS
  399. 434          * (or when possible, unconditional volatile writes).
  400. 435          */
  401. 436         volatile int waitStatus;
  402. 437
  403. 438         /**
  404. 439          * Link to predecessor node that current node/thread relies on
  405. 440          * for checking waitStatus. Assigned during enqueuing, and nulled
  406. 441          * out (for sake of GC) only upon dequeuing.  Also, upon
  407. 442          * cancellation of a predecessor, we short-circuit while
  408. 443          * finding a non-cancelled one, which will always exist
  409. 444          * because the head node is never cancelled: A node becomes
  410. 445          * head only as a result of successful acquire. A
  411. 446          * cancelled thread never succeeds in acquiring, and a thread only
  412. 447          * cancels itself, not any other node.
  413. 448          */
  414. 449         volatile Node prev;
  415. 450
  416. 451         /**
  417. 452          * Link to the successor node that the current node/thread
  418. 453          * unparks upon release. Assigned during enqueuing, adjusted
  419. 454          * when bypassing cancelled predecessors, and nulled out (for
  420. 455          * sake of GC) when dequeued.  The enq operation does not
  421. 456          * assign next field of a predecessor until after attachment,
  422. 457          * so seeing a null next field does not necessarily mean that
  423. 458          * node is at end of queue. However, if a next field appears
  424. 459          * to be null, we can scan prev's from the tail to
  425. 460          * double-check.  The next field of cancelled nodes is set to
  426. 461          * point to the node itself instead of null, to make life
  427. 462          * easier for isOnSyncQueue.
  428. 463          */
  429. 464         volatile Node next;
  430. 465
  431. 466         /**
  432. 467          * The thread that enqueued this node.  Initialized on
  433. 468          * construction and nulled out after use.
  434. 469          */
  435. 470         volatile Thread thread;
  436. 471
  437. 472         /**
  438. 473          * Link to next node waiting on condition, or the special
  439. 474          * value SHARED.  Because condition queues are accessed only
  440. 475          * when holding in exclusive mode, we just need a simple
  441. 476          * linked queue to hold nodes while they are waiting on
  442. 477          * conditions. They are then transferred to the queue to
  443. 478          * re-acquire. And because conditions can only be exclusive,
  444. 479          * we save a field by using special value to indicate shared
  445. 480          * mode.
  446. 481          */
  447. 482         Node nextWaiter;
  448. 483
  449. 484         /**
  450. 485          * Returns true if node is waiting in shared mode.
  451. 486          */
  452. 487         final boolean isShared() {
  453. 488             return nextWaiter == SHARED;
  454. 489         }
  455. 490
  456. 491         /**
  457. 492          * Returns previous node, or throws NullPointerException if null.
  458. 493          * Use when predecessor cannot be null.  The null check could
  459. 494          * be elided, but is present to help the VM.
  460. 495          *
  461. 496          * @return the predecessor of this node
  462. 497          */
  463. 498         final Node predecessor() throws NullPointerException {
  464. 499             Node p = prev;
  465. 500             if (p == null)
  466. 501                 throw new NullPointerException();
  467. 502             else
  468. 503                 return p;
  469. 504         }
  470. 505
  471. 506         Node() {    // Used to establish initial head or SHARED marker
  472. 507         }
  473. 508
  474. 509         Node(Thread thread, Node mode) {     // Used by addWaiter
  475. 510             this.nextWaiter = mode;
  476. 511             this.thread = thread;
  477. 512         }
  478. 513
  479. 514         Node(Thread thread, int waitStatus) { // Used by Condition
  480. 515             this.waitStatus = waitStatus;
  481. 516             this.thread = thread;
  482. 517         }
  483. 518     }
  484. 519
  485. 520     /**
  486. 521      * Head of the wait queue, lazily initialized.  Except for
  487. 522      * initialization, it is modified only via method setHead.  Note:
  488. 523      * If head exists, its waitStatus is guaranteed not to be
  489. 524      * CANCELLED.
  490. 525      */
  491. 526     private transient volatile Node head;
  492. 527
  493. 528     /**
  494. 529      * Tail of the wait queue, lazily initialized.  Modified only via
  495. 530      * method enq to add new wait node.
  496. 531      */
  497. 532     private transient volatile Node tail;
  498. 533
  499. 534     /**
  500. 535      * The synchronization state.
  501. 536      */
  502. 537     private volatile int state;
  503. 538
  504. 539     /**
  505. 540      * Returns the current value of synchronization state.
  506. 541      * This operation has memory semantics of a {@code volatile} read.
  507. 542      * @return current state value
  508. 543      */
  509. 544     protected final int getState() {
  510. 545         return state;
  511. 546     }
  512. 547
  513. 548     /**
  514. 549      * Sets the value of synchronization state.
  515. 550      * This operation has memory semantics of a {@code volatile} write.
  516. 551      * @param newState the new state value
  517. 552      */
  518. 553     protected final void setState(int newState) {
  519. 554         state = newState;
  520. 555     }
  521. 556
  522. 557     /**
  523. 558      * Atomically sets synchronization state to the given updated
  524. 559      * value if the current state value equals the expected value.
  525. 560      * This operation has memory semantics of a {@code volatile} read
  526. 561      * and write.
  527. 562      *
  528. 563      * @param expect the expected value
  529. 564      * @param update the new value
  530. 565      * @return {@code true} if successful. False return indicates that the actual
  531. 566      *         value was not equal to the expected value.
  532. 567      */
  533. 568     protected final boolean compareAndSetState(int expect, int update) {
  534. 569         // See below for intrinsics setup to support this
  535. 570         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  536. 571     }
  537. 572
  538. 573     // Queuing utilities
  539. 574
  540. 575     /**
  541. 576      * The number of nanoseconds for which it is faster to spin
  542. 577      * rather than to use timed park. A rough estimate suffices
  543. 578      * to improve responsiveness with very short timeouts.
  544. 579      */
  545. 580     static final long spinForTimeoutThreshold = 1000L;
  546. 581
  547. 582     /**
  548. 583      * Inserts node into queue, initializing if necessary. See picture above.
  549. 584      * @param node the node to insert
  550. 585      * @return node's predecessor
  551. 586      */
  552. 587     private Node enq(final Node node) {
  553. 588         /*"自旋",将给定的节点插入到同步队列的尾部*/
  554. 589         for (;;) {
  555. 590             Node t = tail;
  556. 591             /*同步队列为空,则dummy哑节点作为同步队列的头结点head,并且将尾节点tail也指向头结点head*/
  557. 592             if (t == null) {
  558. 593                 /*CAS操作,设置同步队列的头结点*/
  559. 594                 if (compareAndSetHead(new Node())) {
  560. 595                     /*将尾节点设置为头结点,进入下次"自旋"*/
  561. 596                     tail = head;
  562. 597                 }
  563. 598             }else {
  564. 599                 /*尾部节点不为空,则进行正常添加动作*/
  565. 600                 node.prev = t;
  566. 601                 /*CAS操作,设置同步队列的头结点*/
  567. 602                 if (compareAndSetTail(t, node)) {
  568. 603                     t.next = node;
  569. 604                     return t;
  570. 605                 }
  571. 606             }
  572. 607         }
  573. 608     }
  574. 609
  575. 610     /**
  576. 611      * Creates and enqueues node for current thread and given mode.
  577. 612      * 以给定的模式包装当前线程节点,将当前节点加入到阻塞队列的队尾
  578. 613      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  579. 614      * @return the new node
  580. 615      */
  581. 616     private Node addWaiter(Node mode) {
  582. 617         /*以给定的模式将当前线程包装成node节点*/
  583. 618         Node node = new Node(Thread.currentThread(), mode);
  584. 619         
  585. 620         /*快速采用尾插法,将当前节点插入到同步队列的队尾*/
  586. 621         Node predNode = tail;
  587. 622         if (predNode != null) {
  588. 623             /*preNode <-- node*/
  589. 624             node.prev = predNode;
  590. 625             /*采用CAS将node设置阻塞队列的尾节点,设置成功,说明没有并发*/
  591. 626             if (compareAndSetTail(tail, node)) {
  592. 627                 predNode.next = node;
  593. 628                 /*尾插法插入成功,则直接返回当前节点*/
  594. 629                 return node;
  595. 630             }
  596. 631         }
  597. 632         /*"自旋"将节点加入到队列的尾部,直到成功为止*/
  598. 633         enq(node);
  599. 634         return node;
  600. 635     }
  601. 636
  602. 637     /**
  603. 638      * Sets head of queue to be node, thus dequeuing. Called only by
  604. 639      * acquire methods.  Also nulls out unused fields for sake of GC
  605. 640      * and to suppress unnecessary signals and traversals.
  606. 641      *
  607. 642      * @param node the node
  608. 643      */
  609. 644     private void setHead(Node node) {
  610. 645         head = node;
  611. 646         node.thread = null;
  612. 647         node.prev = null;
  613. 648     }
  614. 649
  615. 650     /**
  616. 651      * Wakes up node's successor, if one exists.
  617. 652      *
  618. 653      * @param node the node
  619. 654      */
  620. 655     private void unparkSuccessor(Node node) {
  621. 656         /*
  622. 657          * If status is negative (i.e., possibly needing signal) try
  623. 658          * to clear in anticipation of signalling.  It is OK if this
  624. 659          * fails or if status is changed by waiting thread.
  625. 660          */
  626. 661         //在这里,这个节点其实是同步队列的头结点,头结点唤醒后继节点之后,使命就完成了,所以应该将其状态置为0
  627. 662         int ws = node.waitStatus;
  628. 663         if (ws < 0)
  629. 664             compareAndSetWaitStatus(node, ws, 0);
  630. 665
  631. 666         /*
  632. 667          * Thread to unpark is held in successor, which is normally
  633. 668          * just the next node.  But if cancelled or apparently null,
  634. 669          * traverse backwards from tail to find the actual
  635. 670          * non-cancelled successor.
  636. 671          */
  637. 672         Node s = node.next;
  638. 673         //因为s.next相当于从同步队列的头部遍历所以可能会出现s == null的情况,上面分析过原因,不再赘述了。
  639. 674         if (s == null || s.waitStatus > 0) {
  640. 675             s = null;
  641. 676             //从同步队列的尾部向前遍历,找到当前node节点(头结点)的最近的有效后继节点
  642. 677             for (Node t = tail; t != null && t != node; t = t.prev)
  643. 678                 if (t.waitStatus <= 0)
  644. 679                     s = t;
  645. 680         }
  646. 681         
  647. 682         /**
  648. 683          * 找到最近的有效后继节点,则唤醒后继节点中的线程在parkAndCheckInterrupt()方法上的阻塞,去尝试竞争共享资源,
  649. 684          * 这就体现了线程之间的协作,而在这个竞争的过程中也会忽略这个Node.CANCELLED状态的节点,这当前node节点也就放弃了竞争共享资源的机会,相当于出队了。
  650. 685          */
  651. 686         if (s != null)
  652. 687             LockSupport.unpark(s.thread);
  653. 688     }
  654. 689
  655. 690     /**
  656. 691      * Release action for shared mode -- signals successor and ensures
  657. 692      * propagation. (Note: For exclusive mode, release just amounts
  658. 693      * to calling unparkSuccessor of head if it needs signal.)
  659. 694      */
  660. 695     private void doReleaseShared() {
  661. 696         /*
  662. 697          * Ensure that a release propagates, even if there are other
  663. 698          * in-progress acquires/releases.  This proceeds in the usual
  664. 699          * way of trying to unparkSuccessor of head if it needs
  665. 700          * signal. But if it does not, status is set to PROPAGATE to
  666. 701          * ensure that upon release, propagation continues.
  667. 702          * Additionally, we must loop in case a new node is added
  668. 703          * while we are doing this. Also, unlike other uses of
  669. 704          * unparkSuccessor, we need to know if CAS to reset status
  670. 705          * fails, if so rechecking.
  671. 706          */
  672. 707         for (;;) {
  673. 708             Node h = head;
  674. 709             if (h != null && h != tail) {
  675. 710                 int ws = h.waitStatus;
  676. 711                 if (ws == Node.SIGNAL) {
  677. 712                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  678. 713                         continue;            // loop to recheck cases
  679. 714                     unparkSuccessor(h);
  680. 715                 }
  681. 716                 else if (ws == 0 &&
  682. 717                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  683. 718                     continue;                // loop on failed CAS
  684. 719             }
  685. 720             if (h == head)                   // loop if head changed
  686. 721                 break;
  687. 722         }
  688. 723     }
  689. 724
  690. 725     /**
  691. 726      * Sets head of queue, and checks if successor may be waiting
  692. 727      * in shared mode, if so propagating if either propagate > 0 or
  693. 728      * PROPAGATE status was set.
  694. 729      *
  695. 730      * @param node the node
  696. 731      * @param propagate the return value from a tryAcquireShared
  697. 732      */
  698. 733     private void setHeadAndPropagate(Node node, int propagate) {
  699. 734         //后继节点成功获取了共享锁,队列的"旧head"还没有改变,将其保存下来,锁定到方法的局部变量做后序的判断使用;
  700. 735         Node h = head; // Record old head for check below
  701. 736         /**
  702. 737          * 将这个获取共享锁成功的后继节点设置为同步队列的“新head”,此时同步队列的head发生变化, 此线程还未唤起任何线程。
  703. 738          */
  704. 739         setHead(node);
  705. 740         /**
  706. 741          * 1、h == null这个条件什么时候成立呢?仔细翻了下AQS中的源码发现:
  707. 742          * 这个setHeadAndPropagate()方法只在共享锁模式下,同步队列head的后继节点成功获取了共享锁才会调用。
  708. 743          * 获取到共享锁的当前线程是同步队列的头结点的后继节点,"旧head"有后继节点,说明同步队列不为空,那么"旧head"也必定不为空,
  709. 744          * 此方法中第一行通过h == head,在执行setHead(node)方法之前将"旧head"保存了下来,所以h == null必定不会成立,
  710. 745          * 至于为什么这么写呢? 查阅了下资料网上说"发现这个是防止空指针异常发生的标准写法(既如果要取一个对象的某个属性进行判断的时候,首先对这个对象进行null判断)。"
  711. 746          * 这说的过去吧?
  712. 747          *
  713. 748          * 2、(h = head) == null这个条件什么时候成立呢?
  714. 749          * 这个条件也是不可能成立的,下面这种情况应该是最常见的:
  715. 750          *     (1)、例如有个Semaphore实例s初始化了2个许可,线程A首先调用s.acquire(2)申请了两个许可,成功申请到了许可;
  716. 751          *     (2)、线程B调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列;
  717. 752          *     (3)、线程C调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列;
  718. 753          *  (4)、线程A调用了s.releaseShared(2)方法释放了两个许可,再调用doReleaseShared()方法,进行同步队列唤醒;
  719. 754          *  (5)、首先唤醒了同步队列中的线程B,B线程获取到共享锁:
  720. 755          *      a)、如果此时线程B还未setHead(Node)方法,还未改变同步队列的head头结点,那么线程A的唤醒工作就结束,也仅仅只是唤醒了同步队列中的线程B,
  721. 756          *              则必定有(h = head) == Node(C) != null成立,线程C的唤醒工作仍然需要线程B去执行;
  722. 757          *      b)、如果此时线程B执行了SetHead(Node)方法,改变了同步队列的head头结点,那么线程A同时也会唤醒线程C,相当于线程A同时唤醒了线程B和线程C:
  723. 758          *         1)、如果线程C中的setHeadAndPropagate()在线程B前调用完毕(即线程C执行了setHead()方法改变了同步队列的head),那么 (h = head) == Node(C);
  724. 759          *         2)、如果线程C中的setHeadAndPropagate()在线程B之后才调用(即线程C此时还未执行setHead()方法,未改变同步队列的head),那么 (h = head) == Node(B)
  725. 760          *  所以综上所述,只要执行过addWaiter()方法,向同步队列中添加过线程,那么(h = head)== null必定不成立。只能理解为“防止空指针的标准写法”。
  726. 761          */
  727. 762         if (propagate > 0 || h == null || h.waitStatus < 0 ||
  728. 763             (h = head) == null || h.waitStatus < 0) {
  729. 764             Node s = node.next;
  730. 765             /**
  731. 766              * s == null这种情况是可能存在的,如果当前唤醒的这个node节点是同步队列的尾节点就可能出现node.next == null;
  732. 767              * s.isShared()指定是共享锁模式,当前线程获取共享锁之后,是需要尝试唤醒同步队列中的其它线程的。
  733. 768              */
  734. 769             if (s == null || s.isShared())
  735. 770                 doReleaseShared();
  736. 771         }
  737. 772     }
  738. 773
  739. 774     // Utilities for various versions of acquire
  740. 775
  741. 776     /**
  742. 777      * Cancels an ongoing attempt to acquire.
  743. 778      * @param node the node
  744. 779      */
  745. 780     private void cancelAcquire(Node node) {
  746. 781         //当前节点为空,则说明当前线程永远不会被调度到了,所以直接返回
  747. 782         if (node == null) {
  748. 783             return;
  749. 784         }
  750. 785         
  751. 786         /**
  752. 787          * 接下来将点前Node节点从同步队列出队,主要做以下几件事:
  753. 788          * 1、将当前节点不与任何线程绑定,设置当前节点为Node.CANCELLED状态;
  754. 789          * 2、将当前取消节点的前置非取消节点和后置非取消节点"链接"起来;
  755. 790          * 3、如果前置节点释放了锁,那么当前取消节点承担起后续节点的唤醒职责。
  756. 791          */
  757. 792         
  758. 793         //1、取消当前节点与线程的绑定
  759. 794         node.thread = null;
  760. 795         
  761. 796         //2、找到当前节点的有效前继节点pred
  762. 797         Node pred = node.prev;
  763. 798         while (pred.waitStatus > 0) {
  764. 799             //为什么双向链表从后往前遍历呢?而不是从前往后遍历呢?
  765. 800             node.prev = pred = pred.prev;
  766. 801         }
  767. 802         
  768. 803         //用作CAS操作时候的条件判断需要使用的值
  769. 804         Node predNext = pred.next;
  770. 805         
  771. 806         //3、将当前节点设置为取消状态
  772. 807         node.waitStatus = Node.CANCELLED;
  773. 808         
  774. 809         /**
  775. 810          * 接下来就需要将当前取消节点的前后两个有效节点"链接"起来了,"达成让当前node节点出队的目的"。
  776. 811          * 这里按照node节点在同步队列中的不同位置分了三种情况:
  777. 812          * 1、node节点是同步队列的尾节点tail;
  778. 813          * 2、node节点既不是同步队列头结点head的后继节点,也不是尾节点tail;
  779. 814          * 3、node节点是同步队列头结点head的后继节点;
  780. 815          */
  781. 816         
  782. 817         //1、node是尾节点,并且执行过程中没有并发,直接将pred设置为同步队列的tail
  783. 818         if (node == tail && compareAndSetTail(node, pred)) {
  784. 819             /*
  785. 820              * 此时pred已经设置为同步队列的tail,需要通过CAS操作,将pred的next指向null,没有节点再引用node,就完成了node节点的出队
  786. 821              * 可以看出出队操作会破坏这个同步队列的next指针,这应该“向链表从后往前遍历呢?而不是从前往后遍历呢”的原因吧?
  787. 822              */
  788. 823             compareAndSetNext(pred, predNext, null);
  789. 824         }else {
  790. 825             /*
  791. 826              * 2、node不是尾节点,也不是头结点head的后继节点,那么当前节点node出队以后,node的有效前继结点pred,
  792. 827              *  就有义务在它自身释放资源的时候,唤醒node的有效后继节点successor,即将pred的状态设置为Node.SIGNAL;
  793. 828              */
  794. 829             int ws;
  795. 830             //能执行到这里,说明当前node节点不是head的后继节点,也不是同步队列tail节点
  796. 831             if (pred != head &&
  797. 832                     ((ws = pred.waitStatus) == Node.SIGNAL ||
  798. 833                     //前继节点状态虽然有效但不是SIGNAL,采用CAS操作设置为SIGNAL确保后继有效节点可以被唤醒
  799. 834                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  800. 835                     pred.thread != null) {
  801. 836                 Node next = node.next;
  802. 837                 //只负责唤醒有效后继节点
  803. 838                 if (next != null && next.waitStatus <= 0) {
  804. 839                     /**
  805. 840                      * 下面这段代码相当于将pred-->next,我们提到这个同步队列是个双向队列,那么pred<--next这是谁执行的呢?
  806. 841                      * 答案是其他线程:其它线程在获取共享资源在同步队列中阻塞的时候,调用shouldParkAfterFailedAcquire()方法,
  807. 842                      * 从后向前遍历队列,寻找能唤醒它的有效前继节点,当找到node的时候,因为它的状态已经是Node.CANCELLED,所以会忽略node节点,
  808. 843                      * 直到遍历到有效前继节点pred,将next.prev执行pred,即next--->pred,没有节点再引用node节点,所以node节点至此才完成出队。
  809. 844                      */
  810. 845                     compareAndSetNext(pred, predNext, next);
  811. 846                 }
  812. 847             }else {
  813. 848                 //3、说明node节点是同步队列head的后继节点,调用unparkSuccessor(Node)"出队"。
  814. 849                 unparkSuccessor(node);
  815. 850             }
  816. 851            
  817. 852             node.next = node;//help GC
  818. 853         }
  819. 854     }
  820. 855
  821. 856     /**
  822. 857      * Checks and updates status for a node that failed to acquire.
  823. 858      * Returns true if thread should block. This is the main signal
  824. 859      * control in all acquire loops.  Requires that pred == node.prev.
  825. 860      *
  826. 861      * @param pred node's predecessor holding status
  827. 862      * @param node the node
  828. 863      * @return {@code true} if thread should block
  829. 864      */
  830. 865     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  831. 866         int ws = pred.waitStatus;
  832. 867         /*判断前驱结点的状态,只有前驱结点的状态为SIGNAL,后继节点才能被唤醒,所以其可以安心地挂起来了*/
  833. 868         if (ws == node.waitStatus) {
  834. 869             return true;
  835. 870         }
  836. 871         
  837. 872         /*ws>0表示前驱结点中的线程已经被取消调度了,则认为其是无效节点,继续向前查找,直至找到有效状态的节点*/
  838. 873         if (ws > 0) {
  839. 874             do {
  840. 875                 //前驱结点已经被取消,则将前驱结点设置为pred = pred.prev
  841. 876                 node.prev = pred = pred.prev;
  842. 877                 //不断遍历,直到找到第一个不是取消状态的节点
  843. 878             } while (pred.waitStatus > 0);
  844. 879             //
  845. 880             pred.next = node;
  846. 881         }else {
  847. 882             /*前驱结点状态正常,将前驱结点状态设置为SIGNAL,则前驱结点释放资源的时候,就可以尝试唤醒它的后继节点了*/
  848. 883             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  849. 884         }
  850. 885         return false;
  851. 886     }
  852. 887
  853. 888     /**
  854. 889      * Convenience method to interrupt current thread.
  855. 890      */
  856. 891     static void selfInterrupt() {
  857. 892         Thread.currentThread().interrupt();
  858. 893     }
  859. 894
  860. 895     /**
  861. 896      * Convenience method to park and then check if interrupted
  862. 897      *
  863. 898      * @return {@code true} if interrupted
  864. 899      */
  865. 900     private final boolean parkAndCheckInterrupt() {
  866. 901         //将当前线程挂起
  867. 902         LockSupport.park(this);
  868. 903         //检测当前线程的中断状态,并且会清除线程的中断标记
  869. 904         return Thread.interrupted();
  870. 905     }
  871. 906
  872. 907     /*
  873. 908      * Various flavors of acquire, varying in exclusive/shared and
  874. 909      * control modes.  Each is mostly the same, but annoyingly
  875. 910      * different.  Only a little bit of factoring is possible due to
  876. 911      * interactions of exception mechanics (including ensuring that we
  877. 912      * cancel if tryAcquire throws exception) and other control, at
  878. 913      * least not without hurting performance too much.
  879. 914      */
  880. 915
  881. 916     /**
  882. 917      * Acquires in exclusive uninterruptible mode for thread already in
  883. 918      * queue. Used by condition wait methods as well as acquire.
  884. 919      * 使线程阻塞在同步队列中等待获取资源,直到获取资源成功才返回,此过程中线程发生了中断就返回true,否则就返回false
  885. 920      * @param node the node
  886. 921      * @param arg the acquire argument
  887. 922      * @return {@code true} if interrupted while waiting
  888. 923      */
  889. 924     final boolean acquireQueued(final Node node, int arg) {
  890. 925         /*标记等待过程中是否发生了异常*/
  891. 926         boolean failed = true;
  892. 927         
  893. 928         try {
  894. 929             /*标记线程阻塞的过程中是否发生了中断*/
  895. 930             boolean interrupted = false;
  896. 931             /*线程自旋阻塞*/
  897. 932             for(;;){
  898. 933                 /*获取当前节点的前驱结点*/
  899. 934                 final Node p = node.predecessor();
  900. 935                 /*前驱结点是头结点,则说明当前线程有资格获取共享资源,尝试获取,获取成功,将当前节点设置为头结点*/
  901. 936                 if (p == head && tryAcquire(arg)) {
  902. 937                     /*将当前节点设置为头结点*/
  903. 938                     setHead(node);
  904. 939                     p.next = null; //help GC
  905. 940                     failed = false;
  906. 941                     return interrupted;
  907. 942                 }
  908. 943                 
  909. 944                 /*判断当前线程是否可以挂起*/
  910. 945                 if (shouldParkAfterFailedAcquire(p, node)
  911. 946                         /*当前线程可以挂起,则挂起线程,并且线程被unpark()唤醒,检查线程的状态*/
  912. 947                         && parkAndCheckInterrupt()) {
  913. 948                     interrupted = true;
  914. 949                 }
  915. 950             }
  916. 951         }finally{
  917. 952             if (failed) {
  918. 953                 /*阻塞获取同步资源的时候,发生了异常,取消当前线程的在同步队列中的排队*/
  919. 954                 cancelAcquire(node);
  920. 955             }
  921. 956         }
  922. 957     }
  923. 958
  924. 959     /**
  925. 960      * Acquires in exclusive interruptible mode.
  926. 961      * @param arg the acquire argument
  927. 962      */
  928. 963     //尝试获取锁,并响应中断
  929. 964     private void doAcquireInterruptibly(int arg)
  930. 965         throws InterruptedException {
  931. 966         //将当前线程包装成Node节点,加入到同步队列sync queue中
  932. 967         final Node node = addWaiter(Node.EXCLUSIVE);
  933. 968         //标记是否获取锁的过程中是否发生了异常
  934. 969         boolean failed = true;
  935. 970         try {
  936. 971             //自旋
  937. 972             for (;;) {
  938. 973                 //获取当前节点的前驱结点
  939. 974                 final Node p = node.predecessor();
  940. 975                 //如果当前节点的前驱结点是头结点head,尝试获取锁,当前线程才有资格获取排他锁(头结点head是个哑结点,不代表任何线程)
  941. 976                 if (p == head && tryAcquire(arg)) {
  942. 977                     //当前节点获取锁成功,将当前节点设置为头结点head
  943. 978                     setHead(node);
  944. 979                     //将原来的头结点从同步队列sync queue中移除
  945. 980                     p.next = null; // help GC
  946. 981                     failed = false;
  947. 982                     return;
  948. 983                 }
  949. 984                 //尝试获取锁失败,判断是否可以将当前线程安全地挂起,只有当前线程有效的前驱结点状态为Node.SIGNAL,当前线程才可以安全地被挂起,后续也会被及时地唤醒
  950. 985                 if (shouldParkAfterFailedAcquire(p, node) &&
  951. 986                         //将当前线程挂起,并等他唤醒的时候判断是否发生了线程中断,发生了线程中断,则进入finally块中,取消当前线程对锁的尝试获取
  952. 987                     parkAndCheckInterrupt())
  953. 988                     throw new InterruptedException();
  954. 989             }
  955. 990         } finally {
  956. 991             if (failed)
  957. 992                 cancelAcquire(node);
  958. 993         }
  959. 994     }
  960. 995
  961. 996     /**
  962. 997      * Acquires in exclusive timed mode.
  963. 998      *
  964. 999      * @param arg the acquire argument
  965. 1000      * @param nanosTimeout max wait time
  966. 1001      * @return {@code true} if acquired
  967. 1002      */
  968. 1003     private boolean doAcquireNanos(int arg, long nanosTimeout)
  969. 1004             throws InterruptedException {
  970. 1005         //指定的时间<=0则直接返回false
  971. 1006         if (nanosTimeout <= 0L)
  972. 1007             return false;
  973. 1008         //计算当前线程阻塞的截至时间
  974. 1009         final long deadline = System.nanoTime() + nanosTimeout;
  975. 1010         //将当前线程包装成Node节点,加入到同步队列sync queue中
  976. 1011         final Node node = addWaiter(Node.EXCLUSIVE);
  977. 1012         boolean failed = true;
  978. 1013         try {
  979. 1014             //自旋尝试获取锁
  980. 1015             for (;;) {
  981. 1016                 //获取当前节点的前驱结点
  982. 1017                 final Node p = node.predecessor();
  983. 1018                 //前驱结点是头结点,则当前节点有资格尝试获取锁,则尝试获取锁
  984. 1019                 if (p == head && tryAcquire(arg)) {
  985. 1020                     //获取锁成功,则将当前节点设置为头结点head
  986. 1021                     setHead(node);
  987. 1022                     //将同步队列sync queue中原来的头结点移除队列
  988. 1023                     p.next = null; // help GC
  989. 1024                     //表示获取锁的过程没有发生异常
  990. 1025                     failed = false;
  991. 1026                     return true;
  992. 1027                 }
  993. 1028                 //计算剩余的等待时间
  994. 1029                 nanosTimeout = deadline - System.nanoTime();
  995. 1030                 //剩余的等待时间<=0,直接返回false
  996. 1031                 if (nanosTimeout <= 0L)
  997. 1032                     return false;
  998. 1033                 //当前线程获取锁失败,判断是否可以将当前线程安全地挂起
  999. 1034                 if (shouldParkAfterFailedAcquire(p, node) &&
  1000. 1035                         //如果剩余的等待时间<= spinForTimeoutThreshold,则不用将当前线程挂起,进行自旋即可
  1001. 1036                     nanosTimeout > spinForTimeoutThreshold)
  1002. 1037                     LockSupport.parkNanos(this, nanosTimeout);
  1003. 1038                 //获取锁的过程中发生了中断,则直接抛出InterruptedException异常
  1004. 1039                 if (Thread.interrupted())
  1005. 1040                     throw new InterruptedException();
  1006. 1041             }
  1007. 1042         } finally {
  1008. 1043             //获取锁的过程中发生了异常,则将当前线程取消
  1009. 1044             if (failed)
  1010. 1045                 cancelAcquire(node);
  1011. 1046         }
  1012. 1047     }
  1013. 1048
  1014. 1049     /**
  1015. 1050      * Acquires in shared uninterruptible mode.
  1016. 1051      * @param arg the acquire argument
  1017. 1052      */
  1018. 1053     private void doAcquireShared(int arg) {
  1019. 1054         //将竞争共享资源失败的线程加入到同步队列中,并标记为共享模式
  1020. 1055         final Node node = addWaiter(Node.SHARED);
  1021. 1056         //标记阻塞获取资源的过程中是否发生了异常
  1022. 1057         boolean failed = true;
  1023. 1058         
  1024. 1059         try {
  1025. 1060             //标记阻塞获取资源的过程中,是否发生了线程中断请求
  1026. 1061             boolean interrupted = false;
  1027. 1062             //线程阻塞等待获取资源,被有效前继节点唤醒后,尝试竞争共享资源
  1028. 1063             for(;;){
  1029. 1064                 /**
  1030. 1065                  * 当前线程被唤醒之后,什么时候有资格竞争共享资源呢?
  1031. 1066                  * 之后当它的前继节点是头结点(头结点是当前持有共享资源的线程),在唤醒后继节点的过程中,可能释放了资源,所以后继节点尝试获取一次共享资源。
  1032. 1067                  */
  1033. 1068                 final Node p = node.predecessor();
  1034. 1069                 if (p == head) {
  1035. 1070                     int r = tryAcquireShared(arg);
  1036. 1071                     if (r >= 0) {
  1037. 1072                         setHeadAndPropagate(node, r);
  1038. 1073                         p.next = null;//help GC
  1039. 1074                         if (interrupted) {
  1040. 1075                             selfInterrupt();
  1041. 1076                         }
  1042. 1077                         failed = false;
  1043. 1078                         return;
  1044. 1079                     }
  1045. 1080                 }
  1046. 1081                 
  1047. 1082                 if (shouldParkAfterFailedAcquire(p, node) &&
  1048. 1083                         parkAndCheckInterrupt()) {
  1049. 1084                     interrupted = true;
  1050. 1085                 }
  1051. 1086             }
  1052. 1087         } finally{
  1053. 1088             if (failed) {
  1054. 1089                 cancelAcquire(node);
  1055. 1090             }
  1056. 1091         }
  1057. 1092     }
  1058. 1093
  1059. 1094     /**
  1060. 1095      * Acquires in shared interruptible mode.
  1061. 1096      * @param arg the acquire argument
  1062. 1097      */
  1063. 1098     private void doAcquireSharedInterruptibly(int arg)
  1064. 1099         throws InterruptedException {
  1065. 1100         final Node node = addWaiter(Node.SHARED);
  1066. 1101         boolean failed = true;
  1067. 1102         try {
  1068. 1103             for (;;) {
  1069. 1104                 final Node p = node.predecessor();
  1070. 1105                 if (p == head) {
  1071. 1106                     int r = tryAcquireShared(arg);
  1072. 1107                     if (r >= 0) {
  1073. 1108                         setHeadAndPropagate(node, r);
  1074. 1109                         p.next = null; // help GC
  1075. 1110                         failed = false;
  1076. 1111                         return;
  1077. 1112                     }
  1078. 1113                 }
  1079. 1114                 if (shouldParkAfterFailedAcquire(p, node) &&
  1080. 1115                     parkAndCheckInterrupt())
  1081. 1116                     throw new InterruptedException();
  1082. 1117             }
  1083. 1118         } finally {
  1084. 1119             if (failed)
  1085. 1120                 cancelAcquire(node);
  1086. 1121         }
  1087. 1122     }
  1088. 1123
  1089. 1124     /**
  1090. 1125      * Acquires in shared timed mode.
  1091. 1126      *
  1092. 1127      * @param arg the acquire argument
  1093. 1128      * @param nanosTimeout max wait time
  1094. 1129      * @return {@code true} if acquired
  1095. 1130      */
  1096. 1131     private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
  1097. 1132             throws InterruptedException {
  1098. 1133         if (nanosTimeout <= 0L)
  1099. 1134             return false;
  1100. 1135         final long deadline = System.nanoTime() + nanosTimeout;
  1101. 1136         final Node node = addWaiter(Node.SHARED);
  1102. 1137         boolean failed = true;
  1103. 1138         try {
  1104. 1139             for (;;) {
  1105. 1140                 final Node p = node.predecessor();
  1106. 1141                 if (p == head) {
  1107. 1142                     int r = tryAcquireShared(arg);
  1108. 1143                     if (r >= 0) {
  1109. 1144                         setHeadAndPropagate(node, r);
  1110. 1145                         p.next = null; // help GC
  1111. 1146                         failed = false;
  1112. 1147                         return true;
  1113. 1148                     }
  1114. 1149                 }
  1115. 1150                 nanosTimeout = deadline - System.nanoTime();
  1116. 1151                 if (nanosTimeout <= 0L)
  1117. 1152                     return false;
  1118. 1153                 if (shouldParkAfterFailedAcquire(p, node) &&
  1119. 1154                     nanosTimeout > spinForTimeoutThreshold)
  1120. 1155                     LockSupport.parkNanos(this, nanosTimeout);
  1121. 1156                 if (Thread.interrupted())
  1122. 1157                     throw new InterruptedException();
  1123. 1158             }
  1124. 1159         } finally {
  1125. 1160             if (failed)
  1126. 1161                 cancelAcquire(node);
  1127. 1162         }
  1128. 1163     }
  1129. 1164
  1130. 1165     // Main exported methods
  1131. 1166
  1132. 1167     /**
  1133. 1168      * Attempts to acquire in exclusive mode. This method should query
  1134. 1169      * if the state of the object permits it to be acquired in the
  1135. 1170      * exclusive mode, and if so to acquire it.
  1136. 1171      *
  1137. 1172      * <p>This method is always invoked by the thread performing
  1138. 1173      * acquire.  If this method reports failure, the acquire method
  1139. 1174      * may queue the thread, if it is not already queued, until it is
  1140. 1175      * signalled by a release from some other thread. This can be used
  1141. 1176      * to implement method {@link Lock#tryLock()}.
  1142. 1177      *
  1143. 1178      * <p>The default
  1144. 1179      * implementation throws {@link UnsupportedOperationException}.
  1145. 1180      *
  1146. 1181      * @param arg the acquire argument. This value is always the one
  1147. 1182      *        passed to an acquire method, or is the value saved on entry
  1148. 1183      *        to a condition wait.  The value is otherwise uninterpreted
  1149. 1184      *        and can represent anything you like.
  1150. 1185      * @return {@code true} if successful. Upon success, this object has
  1151. 1186      *         been acquired.
  1152. 1187      * @throws IllegalMonitorStateException if acquiring would place this
  1153. 1188      *         synchronizer in an illegal state. This exception must be
  1154. 1189      *         thrown in a consistent fashion for synchronization to work
  1155. 1190      *         correctly.
  1156. 1191      * @throws UnsupportedOperationException if exclusive mode is not supported
  1157. 1192      */
  1158. 1193     protected boolean tryAcquire(int arg) {
  1159. 1194         throw new UnsupportedOperationException();
  1160. 1195     }
  1161. 1196
  1162. 1197     /**
  1163. 1198      * Attempts to set the state to reflect a release in exclusive
  1164. 1199      * mode.
  1165. 1200      *
  1166. 1201      * <p>This method is always invoked by the thread performing release.
  1167. 1202      *
  1168. 1203      * <p>The default implementation throws
  1169. 1204      * {@link UnsupportedOperationException}.
  1170. 1205      *
  1171. 1206      * @param arg the release argument. This value is always the one
  1172. 1207      *        passed to a release method, or the current state value upon
  1173. 1208      *        entry to a condition wait.  The value is otherwise
  1174. 1209      *        uninterpreted and can represent anything you like.
  1175. 1210      * @return {@code true} if this object is now in a fully released
  1176. 1211      *         state, so that any waiting threads may attempt to acquire;
  1177. 1212      *         and {@code false} otherwise.
  1178. 1213      * @throws IllegalMonitorStateException if releasing would place this
  1179. 1214      *         synchronizer in an illegal state. This exception must be
  1180. 1215      *         thrown in a consistent fashion for synchronization to work
  1181. 1216      *         correctly.
  1182. 1217      * @throws UnsupportedOperationException if exclusive mode is not supported
  1183. 1218      */
  1184. 1219     protected boolean tryRelease(int arg) {
  1185. 1220         throw new UnsupportedOperationException();
  1186. 1221     }
  1187. 1222
  1188. 1223     /**
  1189. 1224      * Attempts to acquire in shared mode. This method should query if
  1190. 1225      * the state of the object permits it to be acquired in the shared
  1191. 1226      * mode, and if so to acquire it.
  1192. 1227      *
  1193. 1228      * <p>This method is always invoked by the thread performing
  1194. 1229      * acquire.  If this method reports failure, the acquire method
  1195. 1230      * may queue the thread, if it is not already queued, until it is
  1196. 1231      * signalled by a release from some other thread.
  1197. 1232      *
  1198. 1233      * <p>The default implementation throws {@link
  1199. 1234      * UnsupportedOperationException}.
  1200. 1235      *
  1201. 1236      * @param arg the acquire argument. This value is always the one
  1202. 1237      *        passed to an acquire method, or is the value saved on entry
  1203. 1238      *        to a condition wait.  The value is otherwise uninterpreted
  1204. 1239      *        and can represent anything you like.
  1205. 1240      * @return a negative value on failure; zero if acquisition in shared
  1206. 1241      *         mode succeeded but no subsequent shared-mode acquire can
  1207. 1242      *         succeed; and a positive value if acquisition in shared
  1208. 1243      *         mode succeeded and subsequent shared-mode acquires might
  1209. 1244      *         also succeed, in which case a subsequent waiting thread
  1210. 1245      *         must check availability. (Support for three different
  1211. 1246      *         return values enables this method to be used in contexts
  1212. 1247      *         where acquires only sometimes act exclusively.)  Upon
  1213. 1248      *         success, this object has been acquired.
  1214. 1249      * @throws IllegalMonitorStateException if acquiring would place this
  1215. 1250      *         synchronizer in an illegal state. This exception must be
  1216. 1251      *         thrown in a consistent fashion for synchronization to work
  1217. 1252      *         correctly.
  1218. 1253      * @throws UnsupportedOperationException if shared mode is not supported
  1219. 1254      */
  1220. 1255     protected int tryAcquireShared(int arg) {
  1221. 1256         for(;;){
  1222. 1257             //计算可用资源量
  1223. 1258             int available = getState();
  1224. 1259             //计算剩余资源量
  1225. 1260             int remaining = available - arg;
  1226. 1261             //计算资源数量,不够直接放回,加入CAS是为了保证state状态一定能够更新成功
  1227. 1262             if (remaining < 0 ||
  1228. 1263                     compareAndSetState(available, remaining)) {
  1229. 1264                     return remaining;
  1230. 1265             }
  1231. 1266         }
  1232. 1267         
  1233. 1268         //throw new UnsupportedOperationException();
  1234. 1269     }
  1235. 1270
  1236. 1271     /**
  1237. 1272      * Attempts to set the state to reflect a release in shared mode.
  1238. 1273      *
  1239. 1274      * <p>This method is always invoked by the thread performing release.
  1240. 1275      *
  1241. 1276      * <p>The default implementation throws
  1242. 1277      * {@link UnsupportedOperationException}.
  1243. 1278      *
  1244. 1279      * @param arg the release argument. This value is always the one
  1245. 1280      *        passed to a release method, or the current state value upon
  1246. 1281      *        entry to a condition wait.  The value is otherwise
  1247. 1282      *        uninterpreted and can represent anything you like.
  1248. 1283      * @return {@code true} if this release of shared mode may permit a
  1249. 1284      *         waiting acquire (shared or exclusive) to succeed; and
  1250. 1285      *         {@code false} otherwise
  1251. 1286      * @throws IllegalMonitorStateException if releasing would place this
  1252. 1287      *         synchronizer in an illegal state. This exception must be
  1253. 1288      *         thrown in a consistent fashion for synchronization to work
  1254. 1289      *         correctly.
  1255. 1290      * @throws UnsupportedOperationException if shared mode is not supported
  1256. 1291      */
  1257. 1292     protected boolean tryReleaseShared(int arg) {
  1258. 1293         throw new UnsupportedOperationException();
  1259. 1294     }
  1260. 1295
  1261. 1296     /**
  1262. 1297      * Returns {@code true} if synchronization is held exclusively with
  1263. 1298      * respect to the current (calling) thread.  This method is invoked
  1264. 1299      * upon each call to a non-waiting {@link ConditionObject} method.
  1265. 1300      * (Waiting methods instead invoke {@link #release}.)
  1266. 1301      *
  1267. 1302      * <p>The default implementation throws {@link
  1268. 1303      * UnsupportedOperationException}. This method is invoked
  1269. 1304      * internally only within {@link ConditionObject} methods, so need
  1270. 1305      * not be defined if conditions are not used.
  1271. 1306      *
  1272. 1307      * @return {@code true} if synchronization is held exclusively;
  1273. 1308      *         {@code false} otherwise
  1274. 1309      * @throws UnsupportedOperationException if conditions are not supported
  1275. 1310      */
  1276. 1311     protected boolean isHeldExclusively() {
  1277. 1312         throw new UnsupportedOperationException();
  1278. 1313     }
  1279. 1314
  1280. 1315     /**
  1281. 1316      * Acquires in exclusive mode, ignoring interrupts.  Implemented
  1282. 1317      * by invoking at least once {@link #tryAcquire},
  1283. 1318      * returning on success.  Otherwise the thread is queued, possibly
  1284. 1319      * repeatedly blocking and unblocking, invoking {@link
  1285. 1320      * #tryAcquire} until success.  This method can be used
  1286. 1321      * to implement method {@link Lock#lock}.
  1287. 1322      *
  1288. 1323      * @param arg the acquire argument.  This value is conveyed to
  1289. 1324      *        {@link #tryAcquire} but is otherwise uninterpreted and
  1290. 1325      *        can represent anything you like.
  1291. 1326      */
  1292. 1327     public final void acquire(int arg) {
  1293. 1328         /*加塞抢占共享资源(因为同步队列中可能还有其它节点等待),获取成功则直接返回*/
  1294. 1329         if (!tryAcquire(arg)  
  1295. 1330                 /* 1、抢占共享资源失败,则将当前节点放入到同步队列的尾部,并标记为独占模式;
  1296. 1331                  * 2、使线程阻塞在同步队列中获取资源,直到获取成功才返回;如果整个过程中被中断过就返回true,否则就返回false;*/
  1297. 1332                 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
  1298. 1333             /*阻塞获取资源的过程中是不响应线程中断的,内部进行了中断检测,所以这块进行线程中断*/
  1299. 1334             selfInterrupt();
  1300. 1335         }
  1301. 1336     }
  1302. 1337
  1303. 1338     /**
  1304. 1339      * Acquires in exclusive mode, aborting if interrupted.
  1305. 1340      * Implemented by first checking interrupt status, then invoking
  1306. 1341      * at least once {@link #tryAcquire}, returning on
  1307. 1342      * success.  Otherwise the thread is queued, possibly repeatedly
  1308. 1343      * blocking and unblocking, invoking {@link #tryAcquire}
  1309. 1344      * until success or the thread is interrupted.  This method can be
  1310. 1345      * used to implement method {@link Lock#lockInterruptibly}.
  1311. 1346      *
  1312. 1347      * @param arg the acquire argument.  This value is conveyed to
  1313. 1348      *        {@link #tryAcquire} but is otherwise uninterpreted and
  1314. 1349      *        can represent anything you like.
  1315. 1350      * @throws InterruptedException if the current thread is interrupted
  1316. 1351      */
  1317. 1352     public final void acquireInterruptibly(int arg)
  1318. 1353             throws InterruptedException {
  1319. 1354         if (Thread.interrupted())
  1320. 1355             throw new InterruptedException();
  1321. 1356         if (!tryAcquire(arg))
  1322. 1357             doAcquireInterruptibly(arg);
  1323. 1358     }
  1324. 1359
  1325. 1360     /**
  1326. 1361      * Attempts to acquire in exclusive mode, aborting if interrupted,
  1327. 1362      * and failing if the given timeout elapses.  Implemented by first
  1328. 1363      * checking interrupt status, then invoking at least once {@link
  1329. 1364      * #tryAcquire}, returning on success.  Otherwise, the thread is
  1330. 1365      * queued, possibly repeatedly blocking and unblocking, invoking
  1331. 1366      * {@link #tryAcquire} until success or the thread is interrupted
  1332. 1367      * or the timeout elapses.  This method can be used to implement
  1333. 1368      * method {@link Lock#tryLock(long, TimeUnit)}.
  1334. 1369      *
  1335. 1370      * @param arg the acquire argument.  This value is conveyed to
  1336. 1371      *        {@link #tryAcquire} but is otherwise uninterpreted and
  1337. 1372      *        can represent anything you like.
  1338. 1373      * @param nanosTimeout the maximum number of nanoseconds to wait
  1339. 1374      * @return {@code true} if acquired; {@code false} if timed out
  1340. 1375      * @throws InterruptedException if the current thread is interrupted
  1341. 1376      */
  1342. 1377     public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  1343. 1378             throws InterruptedException {
  1344. 1379         //当前线程被中断,直接抛出异常
  1345. 1380         if (Thread.interrupted())
  1346. 1381             throw new InterruptedException();
  1347. 1382         //尝试直接获取锁,获取成功就直接返回
  1348. 1383         return tryAcquire(arg) ||
  1349. 1384                 //否则就“阻塞”指定时间,再尝试获取锁
  1350. 1385             doAcquireNanos(arg, nanosTimeout);
  1351. 1386     }
  1352. 1387
  1353. 1388     /**
  1354. 1389      * Releases in exclusive mode.  Implemented by unblocking one or
  1355. 1390      * more threads if {@link #tryRelease} returns true.
  1356. 1391      * This method can be used to implement method {@link Lock#unlock}.
  1357. 1392      *
  1358. 1393      * @param arg the release argument.  This value is conveyed to
  1359. 1394      *        {@link #tryRelease} but is otherwise uninterpreted and
  1360. 1395      *        can represent anything you like.
  1361. 1396      * @return the value returned from {@link #tryRelease}
  1362. 1397      */
  1363. 1398     public final boolean release(int arg) {
  1364. 1399         if (tryRelease(arg)) {
  1365. 1400             Node h = head;
  1366. 1401             //当前线程已经释放锁,但是sync queue同步队列不为空,并且头结点head未被取消
  1367. 1402             if (h != null && h.waitStatus != 0)
  1368. 1403                 //说明还有其它线程等待竞争锁,则尝试唤醒sync queue队列中等待锁的线程
  1369. 1404                 unparkSuccessor(h);
  1370. 1405             return true;
  1371. 1406         }
  1372. 1407         return false;
  1373. 1408     }
  1374. 1409
  1375. 1410     /**
  1376. 1411      * Acquires in shared mode, ignoring interrupts.  Implemented by
  1377. 1412      * first invoking at least once {@link #tryAcquireShared},
  1378. 1413      * returning on success.  Otherwise the thread is queued, possibly
  1379. 1414      * repeatedly blocking and unblocking, invoking {@link
  1380. 1415      * #tryAcquireShared} until success.
  1381. 1416      *
  1382. 1417      * @param arg the acquire argument.  This value is conveyed to
  1383. 1418      *        {@link #tryAcquireShared} but is otherwise uninterpreted
  1384. 1419      *        and can represent anything you like.
  1385. 1420      */
  1386. 1421     public final void acquireShared(int arg) {
  1387. 1422         if (tryAcquireShared(arg) < 0)
  1388. 1423             doAcquireShared(arg);
  1389. 1424     }
  1390. 1425
  1391. 1426     /**
  1392. 1427      * Acquires in shared mode, aborting if interrupted.  Implemented
  1393. 1428      * by first checking interrupt status, then invoking at least once
  1394. 1429      * {@link #tryAcquireShared}, returning on success.  Otherwise the
  1395. 1430      * thread is queued, possibly repeatedly blocking and unblocking,
  1396. 1431      * invoking {@link #tryAcquireShared} until success or the thread
  1397. 1432      * is interrupted.
  1398. 1433      * @param arg the acquire argument.
  1399. 1434      * This value is conveyed to {@link #tryAcquireShared} but is
  1400. 1435      * otherwise uninterpreted and can represent anything
  1401. 1436      * you like.
  1402. 1437      * @throws InterruptedException if the current thread is interrupted
  1403. 1438      */
  1404. 1439     public final void acquireSharedInterruptibly(int arg)
  1405. 1440             throws InterruptedException {
  1406. 1441         //当前线程已经被中断,则直接抛出InterruptedException异常
  1407. 1442         if (Thread.interrupted())
  1408. 1443             throw new InterruptedException();
  1409. 1444         if (tryAcquireShared(arg) < 0)
  1410. 1445             doAcquireSharedInterruptibly(arg);
  1411. 1446     }
  1412. 1447
  1413. 1448     /**
  1414. 1449      * Attempts to acquire in shared mode, aborting if interrupted, and
  1415. 1450      * failing if the given timeout elapses.  Implemented by first
  1416. 1451      * checking interrupt status, then invoking at least once {@link
  1417. 1452      * #tryAcquireShared}, returning on success.  Otherwise, the
  1418. 1453      * thread is queued, possibly repeatedly blocking and unblocking,
  1419. 1454      * invoking {@link #tryAcquireShared} until success or the thread
  1420. 1455      * is interrupted or the timeout elapses.
  1421. 1456      *
  1422. 1457      * @param arg the acquire argument.  This value is conveyed to
  1423. 1458      *        {@link #tryAcquireShared} but is otherwise uninterpreted
  1424. 1459      *        and can represent anything you like.
  1425. 1460      * @param nanosTimeout the maximum number of nanoseconds to wait
  1426. 1461      * @return {@code true} if acquired; {@code false} if timed out
  1427. 1462      * @throws InterruptedException if the current thread is interrupted
  1428. 1463      */
  1429. 1464     public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
  1430. 1465             throws InterruptedException {
  1431. 1466         if (Thread.interrupted())
  1432. 1467             throw new InterruptedException();
  1433. 1468         return tryAcquireShared(arg) >= 0 ||
  1434. 1469             doAcquireSharedNanos(arg, nanosTimeout);
  1435. 1470     }
  1436. 1471
  1437. 1472     /**
  1438. 1473      * Releases in shared mode.  Implemented by unblocking one or more
  1439. 1474      * threads if {@link #tryReleaseShared} returns true.
  1440. 1475      *
  1441. 1476      * @param arg the release argument.  This value is conveyed to
  1442. 1477      *        {@link #tryReleaseShared} but is otherwise uninterpreted
  1443. 1478      *        and can represent anything you like.
  1444. 1479      * @return the value returned from {@link #tryReleaseShared}
  1445. 1480      */
  1446. 1481     public final boolean releaseShared(int arg) {
  1447. 1482         //尝试释放锁,释放成功,则返回true
  1448. 1483         if (tryReleaseShared(arg)) {
  1449. 1484             //唤醒同步队列中,等待获取锁的线程
  1450. 1485             doReleaseShared();
  1451. 1486             return true;
  1452. 1487         }
  1453. 1488         return false;
  1454. 1489     }
  1455. 1490
  1456. 1491     // Queue inspection methods
  1457. 1492
  1458. 1493     /**
  1459. 1494      * Queries whether any threads are waiting to acquire. Note that
  1460. 1495      * because cancellations due to interrupts and timeouts may occur
  1461. 1496      * at any time, a {@code true} return does not guarantee that any
  1462. 1497      * other thread will ever acquire.
  1463. 1498      *
  1464. 1499      * <p>In this implementation, this operation returns in
  1465. 1500      * constant time.
  1466. 1501      *
  1467. 1502      * @return {@code true} if there may be other threads waiting to acquire
  1468. 1503      */
  1469. 1504     public final boolean hasQueuedThreads() {
  1470. 1505         return head != tail;
  1471. 1506     }
  1472. 1507
  1473. 1508     /**
  1474. 1509      * Queries whether any threads have ever contended to acquire this
  1475. 1510      * synchronizer; that is if an acquire method has ever blocked.
  1476. 1511      *
  1477. 1512      * <p>In this implementation, this operation returns in
  1478. 1513      * constant time.
  1479. 1514      *
  1480. 1515      * @return {@code true} if there has ever been contention
  1481. 1516      */
  1482. 1517     public final boolean hasContended() {
  1483. 1518         return head != null;
  1484. 1519     }
  1485. 1520
  1486. 1521     /**
  1487. 1522      * Returns the first (longest-waiting) thread in the queue, or
  1488. 1523      * {@code null} if no threads are currently queued.
  1489. 1524      *
  1490. 1525      * <p>In this implementation, this operation normally returns in
  1491. 1526      * constant time, but may iterate upon contention if other threads are
  1492. 1527      * concurrently modifying the queue.
  1493. 1528      *
  1494. 1529      * @return the first (longest-waiting) thread in the queue, or
  1495. 1530      *         {@code null} if no threads are currently queued
  1496. 1531      */
  1497. 1532     public final Thread getFirstQueuedThread() {
  1498. 1533         // handle only fast path, else relay
  1499. 1534         return (head == tail) ? null : fullGetFirstQueuedThread();
  1500. 1535     }
  1501. 1536
  1502. 1537     /**
  1503. 1538      * Version of getFirstQueuedThread called when fastpath fails
  1504. 1539      */
  1505. 1540     private Thread fullGetFirstQueuedThread() {
  1506. 1541         /*
  1507. 1542          * The first node is normally head.next. Try to get its
  1508. 1543          * thread field, ensuring consistent reads: If thread
  1509. 1544          * field is nulled out or s.prev is no longer head, then
  1510. 1545          * some other thread(s) concurrently performed setHead in
  1511. 1546          * between some of our reads. We try this twice before
  1512. 1547          * resorting to traversal.
  1513. 1548          */
  1514. 1549         Node h, s;
  1515. 1550         Thread st;
  1516. 1551         if (((h = head) != null && (s = h.next) != null &&
  1517. 1552              s.prev == head && (st = s.thread) != null) ||
  1518. 1553             ((h = head) != null && (s = h.next) != null &&
  1519. 1554              s.prev == head && (st = s.thread) != null))
  1520. 1555             return st;
  1521. 1556
  1522. 1557         /*
  1523. 1558          * Head's next field might not have been set yet, or may have
  1524. 1559          * been unset after setHead. So we must check to see if tail
  1525. 1560          * is actually first node. If not, we continue on, safely
  1526. 1561          * traversing from tail back to head to find first,
  1527. 1562          * guaranteeing termination.
  1528. 1563          */
  1529. 1564
  1530. 1565         Node t = tail;
  1531. 1566         Thread firstThread = null;
  1532. 1567         while (t != null && t != head) {
  1533. 1568             Thread tt = t.thread;
  1534. 1569             if (tt != null)
  1535. 1570                 firstThread = tt;
  1536. 1571             t = t.prev;
  1537. 1572         }
  1538. 1573         return firstThread;
  1539. 1574     }
  1540. 1575
  1541. 1576     /**
  1542. 1577      * Returns true if the given thread is currently queued.
  1543. 1578      *
  1544. 1579      * <p>This implementation traverses the queue to determine
  1545. 1580      * presence of the given thread.
  1546. 1581      *
  1547. 1582      * @param thread the thread
  1548. 1583      * @return {@code true} if the given thread is on the queue
  1549. 1584      * @throws NullPointerException if the thread is null
  1550. 1585      */
  1551. 1586     public final boolean isQueued(Thread thread) {
  1552. 1587         if (thread == null)
  1553. 1588             throw new NullPointerException();
  1554. 1589         for (Node p = tail; p != null; p = p.prev)
  1555. 1590             if (p.thread == thread)
  1556. 1591                 return true;
  1557. 1592         return false;
  1558. 1593     }
  1559. 1594
  1560. 1595     /**
  1561. 1596      * Returns {@code true} if the apparent first queued thread, if one
  1562. 1597      * exists, is waiting in exclusive mode.  If this method returns
  1563. 1598      * {@code true}, and the current thread is attempting to acquire in
  1564. 1599      * shared mode (that is, this method is invoked from {@link
  1565. 1600      * #tryAcquireShared}) then it is guaranteed that the current thread
  1566. 1601      * is not the first queued thread.  Used only as a heuristic in
  1567. 1602      * ReentrantReadWriteLock.
  1568. 1603      */
  1569. 1604     final boolean apparentlyFirstQueuedIsExclusive() {
  1570. 1605         Node h, s;
  1571. 1606         return (h = head) != null &&
  1572. 1607             (s = h.next)  != null &&
  1573. 1608             !s.isShared()         &&
  1574. 1609             s.thread != null;
  1575. 1610     }
  1576. 1611
  1577. 1612     /**
  1578. 1613      * Queries whether any threads have been waiting to acquire longer
  1579. 1614      * than the current thread.
  1580. 1615      *
  1581. 1616      * <p>An invocation of this method is equivalent to (but may be
  1582. 1617      * more efficient than):
  1583. 1618      *  <pre> {@code
  1584. 1619      * getFirstQueuedThread() != Thread.currentThread() &&
  1585. 1620      * hasQueuedThreads()}</pre>
  1586. 1621      *
  1587. 1622      * <p>Note that because cancellations due to interrupts and
  1588. 1623      * timeouts may occur at any time, a {@code true} return does not
  1589. 1624      * guarantee that some other thread will acquire before the current
  1590. 1625      * thread.  Likewise, it is possible for another thread to win a
  1591. 1626      * race to enqueue after this method has returned {@code false},
  1592. 1627      * due to the queue being empty.
  1593. 1628      *
  1594. 1629      * <p>This method is designed to be used by a fair synchronizer to
  1595. 1630      * avoid <a target="_blank" href="https://www.cnblogs.com/AbstractQueuedSynchronizer#barging">barging</a>.
  1596. 1631      * Such a synchronizer's {@link #tryAcquire} method should return
  1597. 1632      * {@code false}, and its {@link #tryAcquireShared} method should
  1598. 1633      * return a negative value, if this method returns {@code true}
  1599. 1634      * (unless this is a reentrant acquire).  For example, the {@code
  1600. 1635      * tryAcquire} method for a fair, reentrant, exclusive mode
  1601. 1636      * synchronizer might look like this:
  1602. 1637      *
  1603. 1638      *  <pre> {@code
  1604. 1639      * protected boolean tryAcquire(int arg) {
  1605. 1640      *   if (isHeldExclusively()) {
  1606. 1641      *     // A reentrant acquire; increment hold count
  1607. 1642      *     return true;
  1608. 1643      *   } else if (hasQueuedPredecessors()) {
  1609. 1644      *     return false;
  1610. 1645      *   } else {
  1611. 1646      *     // try to acquire normally
  1612. 1647      *   }
  1613. 1648      * }}</pre>
  1614. 1649      *
  1615. 1650      * @return {@code true} if there is a queued thread preceding the
  1616. 1651      *         current thread, and {@code false} if the current thread
  1617. 1652      *         is at the head of the queue or the queue is empty
  1618. 1653      * @since 1.7
  1619. 1654      */
  1620. 1655     public final boolean hasQueuedPredecessors() {
  1621. 1656         // The correctness of this depends on head being initialized
  1622. 1657         // before tail and on head.next being accurate if the current
  1623. 1658         // thread is first in queue.
  1624. 1659         Node t = tail; // Read fields in reverse initialization order
  1625. 1660         Node h = head;
  1626. 1661         Node s;
  1627. 1662         return h != t &&
  1628. 1663             ((s = h.next) == null || s.thread != Thread.currentThread());
  1629. 1664     }
  1630. 1665
  1631. 1666
  1632. 1667     // Instrumentation and monitoring methods
  1633. 1668
  1634. 1669     /**
  1635. 1670      * Returns an estimate of the number of threads waiting to
  1636. 1671      * acquire.  The value is only an estimate because the number of
  1637. 1672      * threads may change dynamically while this method traverses
  1638. 1673      * internal data structures.  This method is designed for use in
  1639. 1674      * monitoring system state, not for synchronization
  1640. 1675      * control.
  1641. 1676      *
  1642. 1677      * @return the estimated number of threads waiting to acquire
  1643. 1678      */
  1644. 1679     public final int getQueueLength() {
  1645. 1680         int n = 0;
  1646. 1681         for (Node p = tail; p != null; p = p.prev) {
  1647. 1682             if (p.thread != null)
  1648. 1683                 ++n;
  1649. 1684         }
  1650. 1685         return n;
  1651. 1686     }
  1652. 1687
  1653. 1688     /**
  1654. 1689      * Returns a collection containing threads that may be waiting to
  1655. 1690      * acquire.  Because the actual set of threads may change
  1656. 1691      * dynamically while constructing this result, the returned
  1657. 1692      * collection is only a best-effort estimate.  The elements of the
  1658. 1693      * returned collection are in no particular order.  This method is
  1659. 1694      * designed to facilitate construction of subclasses that provide
  1660. 1695      * more extensive monitoring facilities.
  1661. 1696      *
  1662. 1697      * @return the collection of threads
  1663. 1698      */
  1664. 1699     public final Collection<Thread> getQueuedThreads() {
  1665. 1700         ArrayList<Thread> list = new ArrayList<Thread>();
  1666. 1701         for (Node p = tail; p != null; p = p.prev) {
  1667. 1702             Thread t = p.thread;
  1668. 1703             if (t != null)
  1669. 1704                 list.add(t);
  1670. 1705         }
  1671. 1706         return list;
  1672. 1707     }
  1673. 1708
  1674. 1709     /**
  1675. 1710      * Returns a collection containing threads that may be waiting to
  1676. 1711      * acquire in exclusive mode. This has the same properties
  1677. 1712      * as {@link #getQueuedThreads} except that it only returns
  1678. 1713      * those threads waiting due to an exclusive acquire.
  1679. 1714      *
  1680. 1715      * @return the collection of threads
  1681. 1716      */
  1682. 1717     public final Collection<Thread> getExclusiveQueuedThreads() {
  1683. 1718         ArrayList<Thread> list = new ArrayList<Thread>();
  1684. 1719         for (Node p = tail; p != null; p = p.prev) {
  1685. 1720             if (!p.isShared()) {
  1686. 1721                 Thread t = p.thread;
  1687. 1722                 if (t != null)
  1688. 1723                     list.add(t);
  1689. 1724             }
  1690. 1725         }
  1691. 1726         return list;
  1692. 1727     }
  1693. 1728
  1694. 1729     /**
  1695. 1730      * Returns a collection containing threads that may be waiting to
  1696. 1731      * acquire in shared mode. This has the same properties
  1697. 1732      * as {@link #getQueuedThreads} except that it only returns
  1698. 1733      * those threads waiting due to a shared acquire.
  1699. 1734      *
  1700. 1735      * @return the collection of threads
  1701. 1736      */
  1702. 1737     public final Collection<Thread> getSharedQueuedThreads() {
  1703. 1738         ArrayList<Thread> list = new ArrayList<Thread>();
  1704. 1739         for (Node p = tail; p != null; p = p.prev) {
  1705. 1740             if (p.isShared()) {
  1706. 1741                 Thread t = p.thread;
  1707. 1742                 if (t != null)
  1708. 1743                     list.add(t);
  1709. 1744             }
  1710. 1745         }
  1711. 1746         return list;
  1712. 1747     }
  1713. 1748
  1714. 1749     /**
  1715. 1750      * Returns a string identifying this synchronizer, as well as its state.
  1716. 1751      * The state, in brackets, includes the String {@code "State ="}
  1717. 1752      * followed by the current value of {@link #getState}, and either
  1718. 1753      * {@code "nonempty"} or {@code "empty"} depending on whether the
  1719. 1754      * queue is empty.
  1720. 1755      *
  1721. 1756      * @return a string identifying this synchronizer, as well as its state
  1722. 1757      */
  1723. 1758     public String toString() {
  1724. 1759         int s = getState();
  1725. 1760         String q  = hasQueuedThreads() ? "non" : "";
  1726. 1761         return super.toString() +
  1727. 1762             "[State = " + s + ", " + q + "empty queue]";
  1728. 1763     }
  1729. 1764
  1730. 1765
  1731. 1766     // Internal support methods for Conditions
  1732. 1767
  1733. 1768     /**
  1734. 1769      * Returns true if a node, always one that was initially placed on
  1735. 1770      * a condition queue, is now waiting to reacquire on sync queue.
  1736. 1771      * @param node the node
  1737. 1772      * @return true if is reacquiring
  1738. 1773      */
  1739. 1774     //判断当前线程是否在sync queue同步队列中。
  1740. 1775     final boolean isOnSyncQueue(Node node) {
  1741. 1776         /**
  1742. 1777          * 1、当前节点node的状态waitStatus为Node.CONDITION,则其必定还在条件队列condition中;
  1743. 1778          * 2、如果当前节点node的node.prev == null,说明当前节点还未被加入到sync queue队列中,因为node节点要加入到sync queue中(无论本次加入是否成功),则本次尝试必有node.prev != null;
  1744. 1779          */
  1745. 1780         if (node.waitStatus == Node.CONDITION || node.prev == null)
  1746. 1781             return false;
  1747. 1782         //如果当前节点node.next != null,说明其已经有后继节点了,则其必定在sync queue同步队列中
  1748. 1783         if (node.next != null) // If has successor, it must be on queue
  1749. 1784             return true;
  1750. 1785         /*
  1751. 1786          * node.prev can be non-null, but not yet on queue because
  1752. 1787          * the CAS to place it on queue can fail. So we have to
  1753. 1788          * traverse from tail to make sure it actually made it.  It
  1754. 1789          * will always be near the tail in calls to this method, and
  1755. 1790          * unless the CAS failed (which is unlikely), it will be
  1756. 1791          * there, so we hardly ever traverse much.
  1757. 1792          */
  1758. 1793         /**
  1759. 1794          * 能走到这里来,说明当前节点node.waitStatus != Node.CONDITION && node.prev != null && node.next == null
  1760. 1795          * ,说明已经开始尝试将当前节点node加入到sync queue同步队列中,已经完成了node.prev = tail,但是进行接下来的CAS操作设置sync queue的尾节点,有可能失败
  1761. 1796          * 导致当前节点入队失败,所以从sync queue的尾节点开始遍历,进一步确认当前节点是否成功加入到了sync queue中。
  1762. 1797          */
  1763. 1798         return findNodeFromTail(node);
  1764. 1799     }
  1765. 1800
  1766. 1801     /**
  1767. 1802      * Returns true if node is on sync queue by searching backwards from tail.
  1768. 1803      * Called only when needed by isOnSyncQueue.
  1769. 1804      * @return true if present
  1770. 1805      */
  1771. 1806     //判断指定的node节点是否在sync queue同步队列中
  1772. 1807     private boolean findNodeFromTail(Node node) {
  1773. 1808         Node t = tail;
  1774. 1809         //从链表的尾部开始遍历
  1775. 1810         for (;;) {
  1776. 1811             //遍历到的当前节点等于指定的节点,则返回true
  1777. 1812             if (t == node)
  1778. 1813                 return true;
  1779. 1814             //遍历到的当前节点已经为空,说明链表已经遍历结束,未找到指定的节点,则返回false
  1780. 1815             if (t == null)
  1781. 1816                 return false;
  1782. 1817             //移动指针,进入下一次自旋遍历
  1783. 1818             t = t.prev;
  1784. 1819         }
  1785. 1820     }
  1786. 1821
  1787. 1822     /**
  1788. 1823      * Transfers a node from a condition queue onto sync queue.
  1789. 1824      * Returns true if successful.
  1790. 1825      * @param node the node
  1791. 1826      * @return true if successfully transferred (else the node was
  1792. 1827      * cancelled before signal)
  1793. 1828      */
  1794. 1829     final boolean transferForSignal(Node node) {
  1795. 1830         /*
  1796. 1831          * If cannot change waitStatus, the node has been cancelled.
  1797. 1832          */
  1798. 1833         //调用signal方法的时候,当前node节点已经取消了等待,则忽略这个节点
  1799. 1834         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  1800. 1835             return false;
  1801. 1836
  1802. 1837         /*
  1803. 1838          * Splice onto queue and try to set waitStatus of predecessor to
  1804. 1839          * indicate that thread is (probably) waiting. If cancelled or
  1805. 1840          * attempt to set waitStatus fails, wake up to resync (in which
  1806. 1841          * case the waitStatus can be transiently and harmlessly wrong).
  1807. 1842          */
  1808. 1843         //如果这个节点node在条件队列condition中正常等待,则将node加入到sync queue同步队列中,并返回这个新结点的前驱结点
  1809. 1844         Node p = enq(node);
  1810. 1845         int ws = p.waitStatus;
  1811. 1846         /**
  1812. 1847          * 同步队列sync queue中的节点是通过前驱结点来唤醒的,前驱结点已经被取消,或者前驱结点的waitStatus设置Node.SIGNAL失败,
  1813. 1848          * 后继节点不能通过前驱结点正常唤醒,则直接唤醒这个刚刚加入到同步队列sync queue中的节点
  1814. 1849          */
  1815. 1850         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  1816. 1851             LockSupport.unpark(node.thread);
  1817. 1852         return true;
  1818. 1853     }
  1819. 1854
  1820. 1855     /**
  1821. 1856      * Transfers node, if necessary, to sync queue after a cancelled wait.
  1822. 1857      * Returns true if thread was cancelled before being signalled.
  1823. 1858      *
  1824. 1859      * @param node the node
  1825. 1860      * @return true if cancelled before the node was signalled
  1826. 1861      */
  1827. 1862     final boolean transferAfterCancelledWait(Node node) {
  1828. 1863         /**
  1829. 1864          * 判断一个node是否被signal()过,最简单有效的方式就是是否离开了condition条件队列,进入到了sync queue同步队列中
  1830. 1865          */
  1831. 1866         //如果一个节点的状态还是Node.CONDITION,说明它还未被signal过,是因为中断导致了它被唤醒(从condition条件队列转移到sync queue同步队列中)
  1832. 1867         if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  1833. 1868             //将当前被唤醒的节点加入到同步队列sync queue中,准备竞争锁,但是还没有将这个节点从条件队列中移除,所以后面的处理一下
  1834. 1869             enq(node);
  1835. 1870             return true;
  1836. 1871         }
  1837. 1872         /*
  1838. 1873          * If we lost out to a signal(), then we can't proceed
  1839. 1874          * until it finishes its enq().  Cancelling during an
  1840. 1875          * incomplete transfer is both rare and transient, so just
  1841. 1876          * spin.
  1842. 1877          */
  1843. 1878         while (!isOnSyncQueue(node))
  1844. 1879             Thread.yield();
  1845. 1880         return false;
  1846. 1881     }
  1847. 1882
  1848. 1883     /**
  1849. 1884      * Invokes release with current state value; returns saved state.
  1850. 1885      * Cancels node and throws exception on failure.
  1851. 1886      * @param node the condition node for this wait
  1852. 1887      * @return previous sync state
  1853. 1888      */
  1854. 1889     //完全释放线程所占用的锁
  1855. 1890     final int fullyRelease(Node node) {
  1856. 1891         boolean failed = true;
  1857. 1892         try {
  1858. 1893             //获取当前锁状态
  1859. 1894             int savedState = getState();
  1860. 1895             //完全释放锁,对于重入锁而言,无论重入了几次都一次释放;同时持有锁的线程不是当前线程时候,也会直接抛出IllegalMonitorStateException异常,直接进入finally块
  1861. 1896             if (release(savedState)) {
  1862. 1897                 failed = false;
  1863. 1898                 return savedState;
  1864. 1899             } else {
  1865. 1900                 throw new IllegalMonitorStateException();
  1866. 1901             }
  1867. 1902         } finally {
  1868. 1903             /**
  1869. 1904              * 如果释放锁失败,则将刚才新加入到条件队列尾节点的node节点的waitStatus设置为Node.CANCELLED,
  1870. 1905              * 后序其它线程被包装成Node节点加入到条件队列中时候,会将这些已经被取消的节点移除
  1871. 1906              */
  1872. 1907             if (failed)
  1873. 1908                 node.waitStatus = Node.CANCELLED;
  1874. 1909         }
  1875. 1910     }
  1876. 1911
  1877. 1912     // Instrumentation methods for conditions
  1878. 1913
  1879. 1914     /**
  1880. 1915      * Queries whether the given ConditionObject
  1881. 1916      * uses this synchronizer as its lock.
  1882. 1917      *
  1883. 1918      * @param condition the condition
  1884. 1919      * @return {@code true} if owned
  1885. 1920      * @throws NullPointerException if the condition is null
  1886. 1921      */
  1887. 1922     public final boolean owns(ConditionObject condition) {
  1888. 1923         return condition.isOwnedBy(this);
  1889. 1924     }
  1890. 1925
  1891. 1926     /**
  1892. 1927      * Queries whether any threads are waiting on the given condition
  1893. 1928      * associated with this synchronizer. Note that because timeouts
  1894. 1929      * and interrupts may occur at any time, a {@code true} return
  1895. 1930      * does not guarantee that a future {@code signal} will awaken
  1896. 1931      * any threads.  This method is designed primarily for use in
  1897. 1932      * monitoring of the system state.
  1898. 1933      *
  1899. 1934      * @param condition the condition
  1900. 1935      * @return {@code true} if there are any waiting threads
  1901. 1936      * @throws IllegalMonitorStateException if exclusive synchronization
  1902. 1937      *         is not held
  1903. 1938      * @throws IllegalArgumentException if the given condition is
  1904. 1939      *         not associated with this synchronizer
  1905. 1940      * @throws NullPointerException if the condition is null
  1906. 1941      */
  1907. 1942     public final boolean hasWaiters(ConditionObject condition) {
  1908. 1943         if (!owns(condition))
  1909. 1944             throw new IllegalArgumentException("Not owner");
  1910. 1945         return condition.hasWaiters();
  1911. 1946     }
  1912. 1947
  1913. 1948     /**
  1914. 1949      * Returns an estimate of the number of threads waiting on the
  1915. 1950      * given condition associated with this synchronizer. Note that
  1916. 1951      * because timeouts and interrupts may occur at any time, the
  1917. 1952      * estimate serves only as an upper bound on the actual number of
  1918. 1953      * waiters.  This method is designed for use in monitoring of the
  1919. 1954      * system state, not for synchronization control.
  1920. 1955      *
  1921. 1956      * @param condition the condition
  1922. 1957      * @return the estimated number of waiting threads
  1923. 1958      * @throws IllegalMonitorStateException if exclusive synchronization
  1924. 1959      *         is not held
  1925. 1960      * @throws IllegalArgumentException if the given condition is
  1926. 1961      *         not associated with this synchronizer
  1927. 1962      * @throws NullPointerException if the condition is null
  1928. 1963      */
  1929. 1964     public final int getWaitQueueLength(ConditionObject condition) {
  1930. 1965         if (!owns(condition))
  1931. 1966             throw new IllegalArgumentException("Not owner");
  1932. 1967         return condition.getWaitQueueLength();
  1933. 1968     }
  1934. 1969
  1935. 1970     /**
  1936. 1971      * Returns a collection containing those threads that may be
  1937. 1972      * waiting on the given condition associated with this
  1938. 1973      * synchronizer.  Because the actual set of threads may change
  1939. 1974      * dynamically while constructing this result, the returned
  1940. 1975      * collection is only a best-effort estimate. The elements of the
  1941. 1976      * returned collection are in no particular order.
  1942. 1977      *
  1943. 1978      * @param condition the condition
  1944. 1979      * @return the collection of threads
  1945. 1980      * @throws IllegalMonitorStateException if exclusive synchronization
  1946. 1981      *         is not held
  1947. 1982      * @throws IllegalArgumentException if the given condition is
  1948. 1983      *         not associated with this synchronizer
  1949. 1984      * @throws NullPointerException if the condition is null
  1950. 1985      */
  1951. 1986     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
  1952. 1987         if (!owns(condition))
  1953. 1988             throw new IllegalArgumentException("Not owner");
  1954. 1989         return condition.getWaitingThreads();
  1955. 1990     }
  1956. 1991
  1957. 1992     /**
  1958. 1993      * Condition implementation for a {@link
  1959. 1994      * AbstractQueuedSynchronizer} serving as the basis of a {@link
  1960. 1995      * Lock} implementation.
  1961. 1996      *
  1962. 1997      * <p>Method documentation for this class describes mechanics,
  1963. 1998      * not behavioral specifications from the point of view of Lock
  1964. 1999      * and Condition users. Exported versions of this class will in
  1965. 2000      * general need to be accompanied by documentation describing
  1966. 2001      * condition semantics that rely on those of the associated
  1967. 2002      * {@code AbstractQueuedSynchronizer}.
  1968. 2003      *
  1969. 2004      * <p>This class is Serializable, but all fields are transient,
  1970. 2005      * so deserialized conditions have no waiters.
  1971. 2006      */
  1972. 2007     public class ConditionObject implements Condition, java.io.Serializable {
  1973. 2008         private static final long serialVersionUID = 1173984872572414699L;
  1974. 2009         /** First node of condition queue. */
  1975. 2010         private transient Node firstWaiter;
  1976. 2011         /** Last node of condition queue. */
  1977. 2012         private transient Node lastWaiter;
  1978. 2013
  1979. 2014         /**
  1980. 2015          * Creates a new {@code ConditionObject} instance.
  1981. 2016          */
  1982. 2017         public ConditionObject() { }
  1983. 2018
  1984. 2019         // Internal methods
  1985. 2020
  1986. 2021         /**
  1987. 2022          * Adds a new waiter to wait queue.
  1988. 2023          * @return its new wait node
  1989. 2024          */
  1990. 2025         //将当前线程包装成Node节点,加入到条件队列中
  1991. 2026         private Node addConditionWaiter() {
  1992. 2027             //条件队列的尾节点
  1993. 2028             Node t = lastWaiter;
  1994. 2029             // If lastWaiter is cancelled, clean out.
  1995. 2030             //条件尾节点不为空,说明条件队列不为空,但是它的尾节点的waitStatus已经不是Node.CONDITION了,说明其已经取消了,则清除掉条件队列中已经取消的节点
  1996. 2031             if (t != null && t.waitStatus != Node.CONDITION) {
  1997. 2032                 //清除掉条件队列中已经取消的节点
  1998. 2033                 unlinkCancelledWaiters();
  1999. 2034                 t = lastWaiter;
  2000. 2035             }
  2001. 2036             //将当前线程包装成Node节点,并将新的node节点的状态设置为Node.CONDITION
  2002. 2037             Node node = new Node(Thread.currentThread(), Node.CONDITION);
  2003. 2038             //如果条件队列为空,则将条件队列的头结点firstWaiter指向刚创建出来的node节点
  2004. 2039             if (t == null)
  2005. 2040                 firstWaiter = node;
  2006. 2041             else
  2007. 2042                 //否则, 将条件队列旧的尾节点的nextWaiter指向新创建的node节点
  2008. 2043                 t.nextWaiter = node;
  2009. 2044             //将新创建的node节点设置为条件队列的尾节点
  2010. 2045             lastWaiter = node;
  2011. 2046             return node;
  2012. 2047         }
  2013. 2048
  2014. 2049         /**
  2015. 2050          * Removes and transfers nodes until hit non-cancelled one or
  2016. 2051          * null. Split out from signal in part to encourage compilers
  2017. 2052          * to inline the case of no waiters.
  2018. 2053          * @param first (non-null) the first node on condition queue
  2019. 2054          */
  2020. 2055         //唤醒条件队列中第一个未被取消的节点
  2021. 2056         private void doSignal(Node first) {
  2022. 2057             do {
  2023. 2058                 //如果条件队列中只有一个节点,则将条件队列清空
  2024. 2059                 if ( (firstWaiter = first.nextWaiter) == null)
  2025. 2060                     //则将条件队列的尾节点lastWaiter也置为null
  2026. 2061                     lastWaiter = null;
  2027. 2062                 //将当前要被唤醒的节点从条件队列中移除
  2028. 2063                 first.nextWaiter = null;
  2029. 2064                 //当前节点已经被取消了,并且条件队列中还有节点,则进行下一次循环
  2030. 2065             } while (!transferForSignal(first) &&
  2031. 2066                      (first = firstWaiter) != null);
  2032. 2067         }
  2033. 2068
  2034. 2069         /**
  2035. 2070          * Removes and transfers all nodes.
  2036. 2071          * @param first (non-null) the first node on condition queue
  2037. 2072          */
  2038. 2073         private void doSignalAll(Node first) {
  2039. 2074             //清空条件队列
  2040. 2075             lastWaiter = firstWaiter = null;
  2041. 2076             do {
  2042. 2077                 //获取条件队列头结点的下一个结点
  2043. 2078                 Node next = first.nextWaiter;
  2044. 2079                 //将条件队列的头结点从链表上移除
  2045. 2080                 first.nextWaiter = null;
  2046. 2081                 //将这个刚从条件队列condition中移除的节点转移到sync queue同步队列中
  2047. 2082                 transferForSignal(first);
  2048. 2083                 //修改指针,准备继续下一次的遍历
  2049. 2084                 first = next;
  2050. 2085                 //first != null表示链表还未遍历完成,继续下次遍历
  2051. 2086             } while (first != null);
  2052. 2087         }
  2053. 2088
  2054. 2089         /**
  2055. 2090          * Unlinks cancelled waiter nodes from condition queue.
  2056. 2091          * Called only while holding lock. This is called when
  2057. 2092          * cancellation occurred during condition wait, and upon
  2058. 2093          * insertion of a new waiter when lastWaiter is seen to have
  2059. 2094          * been cancelled. This method is needed to avoid garbage
  2060. 2095          * retention in the absence of signals. So even though it may
  2061. 2096          * require a full traversal, it comes into play only when
  2062. 2097          * timeouts or cancellations occur in the absence of
  2063. 2098          * signals. It traverses all nodes rather than stopping at a
  2064. 2099          * particular target to unlink all pointers to garbage nodes
  2065. 2100          * without requiring many re-traversals during cancellation
  2066. 2101          * storms.
  2067. 2102          */
  2068. 2103         private void unlinkCancelledWaiters() {
  2069. 2104             //当前节点,从表示条件队列链表的头结点firstWaiter开始遍历
  2070. 2105             Node t = firstWaiter;
  2071. 2106             //表示条件队列中最后一个waitStatus不为Node.CANCELLED的节点
  2072. 2107             Node trail = null;
  2073. 2108             //条件队列不为空
  2074. 2109             while (t != null) {
  2075. 2110                 //当前节点的后继节点
  2076. 2111                 Node next = t.nextWaiter;
  2077. 2112                 //如果当前节点t已经取消了正常等待,则将节点t从条件队列中移除
  2078. 2113                 if (t.waitStatus != Node.CONDITION) {
  2079. 2114                     //将当前节点从条件队列移除
  2080. 2115                     t.nextWaiter = null;
  2081. 2116                     //trail == null表示条件队列当前节点之前的都被取消了,所以将条件队列的头结点firstWaiter重置指向next
  2082. 2117                     if (trail == null)
  2083. 2118                         firstWaiter = next;
  2084. 2119                     else
  2085. 2120                         //当前节点已经被取消,trail != null,则需要将trail与当前节点断开,指向next;
  2086. 2121                         trail.nextWaiter = next;
  2087. 2122                     //如果next == null说明队列已经完成遍历,将原来的条件队列中最后一个正常等待的节点赋值给lastWaiter;
  2088. 2123                     if (next == null)
  2089. 2124                         lastWaiter = trail;
  2090. 2125                 }
  2091. 2126                 else{
  2092. 2127                     //如果当前节点未被取消,就将当前节点记为队列中最后一个未被取消的节点,令trail = t;
  2093. 2128                     trail = t;
  2094. 2129                 }
  2095. 2130                 //修改当前节点的指针,指向下一个节点next
  2096. 2131                 t = next;
  2097. 2132             }
  2098. 2133         }
  2099. 2134
  2100. 2135         // public methods
  2101. 2136
  2102. 2137         /**
  2103. 2138          * Moves the longest-waiting thread, if one exists, from the
  2104. 2139          * wait queue for this condition to the wait queue for the
  2105. 2140          * owning lock.
  2106. 2141          *
  2107. 2142          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2108. 2143          *         returns {@code false}
  2109. 2144          */
  2110. 2145         public final void signal() {
  2111. 2146             //判断当前线程是否是持有锁的线程,如果不是则直接抛出IllegalMonitorStateException异常
  2112. 2147             if (!isHeldExclusively())
  2113. 2148                 throw new IllegalMonitorStateException();
  2114. 2149             Node first = firstWaiter;
  2115. 2150             //条件队列condition不为空
  2116. 2151             if (first != null)
  2117. 2152                 //唤醒条件队列condition中的头结点
  2118. 2153                 doSignal(first);
  2119. 2154         }
  2120. 2155
  2121. 2156         /**
  2122. 2157          * Moves all threads from the wait queue for this condition to
  2123. 2158          * the wait queue for the owning lock.
  2124. 2159          *
  2125. 2160          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2126. 2161          *         returns {@code false}
  2127. 2162          */
  2128. 2163         public final void signalAll() {
  2129. 2164             //判断当前线程是否是持有锁的线程,如果不是,则直接抛出IllegalMonitorStateException异常
  2130. 2165             if (!isHeldExclusively())
  2131. 2166                 throw new IllegalMonitorStateException();
  2132. 2167             Node first = firstWaiter;
  2133. 2168             //判断条件队列是否为空
  2134. 2169             if (first != null)
  2135. 2170                 //唤醒条件队列中进行等待的节点
  2136. 2171                 doSignalAll(first);
  2137. 2172         }
  2138. 2173
  2139. 2174         /**
  2140. 2175          * Implements uninterruptible condition wait.
  2141. 2176          * <ol>
  2142. 2177          * <li> Save lock state returned by {@link #getState}.
  2143. 2178          * <li> Invoke {@link #release} with saved state as argument,
  2144. 2179          *      throwing IllegalMonitorStateException if it fails.
  2145. 2180          * <li> Block until signalled.
  2146. 2181          * <li> Reacquire by invoking specialized version of
  2147. 2182          *      {@link #acquire} with saved state as argument.
  2148. 2183          * </ol>
  2149. 2184          */
  2150. 2185         /**
  2151. 2186          * await()方法中,中断和signal()起到了相同的效果,将当前线程唤醒,但是中断一个正常等待的线程,线程被唤醒后,
  2152. 2187          * 抢到了锁,但是发现等待的条件还未满足,线程会重新被挂起,所以我们希望有一个方法,在等待的时候,不要响应线程中断,
  2153. 2188          * 所以下面awaitUnitertuptibly()方法就实现了这个功能。
  2154. 2189          */
  2155. 2190         public final void awaitUninterruptibly() {
  2156. 2191             //将当前线程包装成Node节点,加入到条件队列condition中
  2157. 2192             Node node = addConditionWaiter();
  2158. 2193             //释放当前线程所持有的锁
  2159. 2194             int savedState = fullyRelease(node);
  2160. 2195             boolean interrupted = false;
  2161. 2196             //判断当前线程是否已经在sync queue同步队列
  2162. 2197             while (!isOnSyncQueue(node)) {
  2163. 2198                 //当前线程未在sync queue同步队列中,说明其还没有资格竞争独占锁,将当前线程挂起
  2164. 2199                 LockSupport.park(this);
  2165. 2200                 /**
  2166. 2201                  * 判断当前线程是否是由于被其他线程中断唤醒的,如果是则只是简单记录下其中断标记即可,
  2167. 2202                  * 当前线程要跳出此while循环则满足的唯一条件是,被其他线程signal(),进入到同步队列
  2168. 2203                  */
  2169. 2204                 if (Thread.interrupted())
  2170. 2205                     interrupted = true;
  2171. 2206             }
  2172. 2207            
  2173. 2208             /**
  2174. 2209              * 这也是一个阻塞操作,当前线程已经加入到同步队列中,尝试竞争锁,竞争失败,找一个合适的地方,
  2175. 2210              * 将当前线程挂起,等待其他线程signal(),如果在竞争锁的过程中,发生了异常,则需要记录一下
  2176. 2211              */
  2177. 2212             if (acquireQueued(node, savedState) || interrupted)
  2178. 2213                 selfInterrupt();
  2179. 2214         }
  2180. 2215
  2181. 2216         /*
  2182. 2217          * For interruptible waits, we need to track whether to throw
  2183. 2218          * InterruptedException, if interrupted while blocked on
  2184. 2219          * condition, versus reinterrupt current thread, if
  2185. 2220          * interrupted while blocked waiting to re-acquire.
  2186. 2221          */
  2187. 2222
  2188. 2223         /** Mode meaning to reinterrupt on exit from wait */
  2189. 2224         private static final int REINTERRUPT =  1;
  2190. 2225         /** Mode meaning to throw InterruptedException on exit from wait */
  2191. 2226         private static final int THROW_IE    = -1;
  2192. 2227
  2193. 2228         /**
  2194. 2229          * Checks for interrupt, returning THROW_IE if interrupted
  2195. 2230          * before signalled, REINTERRUPT if after signalled, or
  2196. 2231          * 0 if not interrupted.
  2197. 2232          */
  2198. 2233         private int checkInterruptWhileWaiting(Node node) {
  2199. 2234             //判断在等待的过程中,线程是否发生了中断
  2200. 2235             return Thread.interrupted() ?
  2201. 2236                     //判断线程中断时发生在signal()之前,还是signal()之后,THROW_IE表示中断发生在signal()之前,REINTERRUPT表示中断发生在signal()之后,中断来得太晚了
  2202. 2237                 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  2203. 2238                 0;
  2204. 2239         }
  2205. 2240
  2206. 2241         /**
  2207. 2242          * Throws InterruptedException, reinterrupts current thread, or
  2208. 2243          * does nothing, depending on mode.
  2209. 2244          */
  2210. 2245         private void reportInterruptAfterWait(int interruptMode)
  2211. 2246             throws InterruptedException {
  2212. 2247             //表示还未被其他线程signal()就发生了中断,当前线程是由于中断被唤醒的,所以抛出InterruptedException异常
  2213. 2248             if (interruptMode == THROW_IE)
  2214. 2249                 throw new InterruptedException();
  2215. 2250             //表示被其它线程signal()过了之后才发生了线程中断,中断来得太迟了,所以补一下中断标记就行了
  2216. 2251             else if (interruptMode == REINTERRUPT)
  2217. 2252                 selfInterrupt();
  2218. 2253         }
  2219. 2254
  2220. 2255         /**
  2221. 2256          * Implements interruptible condition wait.
  2222. 2257          * <ol>
  2223. 2258          * <li> If current thread is interrupted, throw InterruptedException.
  2224. 2259          * <li> Save lock state returned by {@link #getState}.
  2225. 2260          * <li> Invoke {@link #release} with saved state as argument,
  2226. 2261          *      throwing IllegalMonitorStateException if it fails.
  2227. 2262          * <li> Block until signalled or interrupted.
  2228. 2263          * <li> Reacquire by invoking specialized version of
  2229. 2264          *      {@link #acquire} with saved state as argument.
  2230. 2265          * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2231. 2266          * </ol>
  2232. 2267          */
  2233. 2268         public final void await() throws InterruptedException {
  2234. 2269             //调用await()方法时,当前线程已经被中断,则直接抛出中断异常
  2235. 2270             if (Thread.interrupted())
  2236. 2271                 throw new InterruptedException();
  2237. 2272             //将当前线程包装成Node节点,加入到条件队列中
  2238. 2273             Node node = addConditionWaiter();
  2239. 2274             //释放当前线程所占用的锁
  2240. 2275             int savedState = fullyRelease(node);
  2241. 2276             int interruptMode = 0;
  2242. 2277             //判断代表当前线程节点是否在同步队列中,如果不在,说明还没有其它线程调用signal()方法,将当前线程从条件队列Conditon种转移到同步队列中
  2243. 2278             while (!isOnSyncQueue(node)) {
  2244. 2279                 //当前线程还在条件队列中,没有资格竞争锁,则直接将当前线程挂起
  2245. 2280                 LockSupport.park(this);
  2246. 2281                 //能走到这里,说明当前线程被其他线程signal()或者被其它线程中断了
  2247. 2282                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2248. 2283                     break;
  2249. 2284             }
  2250. 2285             //interruptMode != THROW_IE表示当前线程不是由于线程中断而被唤醒的
  2251. 2286             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2252. 2287                 interruptMode = REINTERRUPT;
  2253. 2288             //上面将从条件队列condition中唤醒的节点加入到sync queue条件队列中时候,并没有从条件队列中移除,这里需要清理一下
  2254. 2289             if (node.nextWaiter != null) // clean up if cancelled
  2255. 2290                 unlinkCancelledWaiters();
  2256. 2291             //当前节点在等待的过程中发生了中断
  2257. 2292             if (interruptMode != 0)
  2258. 2293                 reportInterruptAfterWait(interruptMode);
  2259. 2294         }
  2260. 2295
  2261. 2296         /**
  2262. 2297          * Implements timed condition wait.
  2263. 2298          * <ol>
  2264. 2299          * <li> If current thread is interrupted, throw InterruptedException.
  2265. 2300          * <li> Save lock state returned by {@link #getState}.
  2266. 2301          * <li> Invoke {@link #release} with saved state as argument,
  2267. 2302          *      throwing IllegalMonitorStateException if it fails.
  2268. 2303          * <li> Block until signalled, interrupted, or timed out.
  2269. 2304          * <li> Reacquire by invoking specialized version of
  2270. 2305          *      {@link #acquire} with saved state as argument.
  2271. 2306          * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2272. 2307          * </ol>
  2273. 2308          */
  2274. 2309         public final long awaitNanos(long nanosTimeout)
  2275. 2310                 throws InterruptedException {
  2276. 2311             if (Thread.interrupted())
  2277. 2312                 throw new InterruptedException();
  2278. 2313             Node node = addConditionWaiter();
  2279. 2314             int savedState = fullyRelease(node);
  2280. 2315             final long deadline = System.nanoTime() + nanosTimeout;
  2281. 2316             int interruptMode = 0;
  2282. 2317             while (!isOnSyncQueue(node)) {
  2283. 2318                 if (nanosTimeout <= 0L) {
  2284. 2319                     transferAfterCancelledWait(node);
  2285. 2320                     break;
  2286. 2321                 }
  2287. 2322                 if (nanosTimeout >= spinForTimeoutThreshold)
  2288. 2323                     LockSupport.parkNanos(this, nanosTimeout);
  2289. 2324                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2290. 2325                     break;
  2291. 2326                 nanosTimeout = deadline - System.nanoTime();
  2292. 2327             }
  2293. 2328             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2294. 2329                 interruptMode = REINTERRUPT;
  2295. 2330             if (node.nextWaiter != null)
  2296. 2331                 unlinkCancelledWaiters();
  2297. 2332             if (interruptMode != 0)
  2298. 2333                 reportInterruptAfterWait(interruptMode);
  2299. 2334             return deadline - System.nanoTime();
  2300. 2335         }
  2301. 2336
  2302. 2337         /**
  2303. 2338          * Implements absolute timed condition wait.
  2304. 2339          * <ol>
  2305. 2340          * <li> If current thread is interrupted, throw InterruptedException.
  2306. 2341          * <li> Save lock state returned by {@link #getState}.
  2307. 2342          * <li> Invoke {@link #release} with saved state as argument,
  2308. 2343          *      throwing IllegalMonitorStateException if it fails.
  2309. 2344          * <li> Block until signalled, interrupted, or timed out.
  2310. 2345          * <li> Reacquire by invoking specialized version of
  2311. 2346          *      {@link #acquire} with saved state as argument.
  2312. 2347          * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2313. 2348          * <li> If timed out while blocked in step 4, return false, else true.
  2314. 2349          * </ol>
  2315. 2350          */
  2316. 2351         public final boolean awaitUntil(Date deadline)
  2317. 2352                 throws InterruptedException {
  2318. 2353             long abstime = deadline.getTime();
  2319. 2354             if (Thread.interrupted())
  2320. 2355                 throw new InterruptedException();
  2321. 2356             Node node = addConditionWaiter();
  2322. 2357             int savedState = fullyRelease(node);
  2323. 2358             boolean timedout = false;
  2324. 2359             int interruptMode = 0;
  2325. 2360             while (!isOnSyncQueue(node)) {
  2326. 2361                 if (System.currentTimeMillis() > abstime) {
  2327. 2362                     timedout = transferAfterCancelledWait(node);
  2328. 2363                     break;
  2329. 2364                 }
  2330. 2365                 LockSupport.parkUntil(this, abstime);
  2331. 2366                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2332. 2367                     break;
  2333. 2368             }
  2334. 2369             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2335. 2370                 interruptMode = REINTERRUPT;
  2336. 2371             if (node.nextWaiter != null)
  2337. 2372                 unlinkCancelledWaiters();
  2338. 2373             if (interruptMode != 0)
  2339. 2374                 reportInterruptAfterWait(interruptMode);
  2340. 2375             return !timedout;
  2341. 2376         }
  2342. 2377
  2343. 2378         /**
  2344. 2379          * Implements timed condition wait.
  2345. 2380          * <ol>
  2346. 2381          * <li> If current thread is interrupted, throw InterruptedException.
  2347. 2382          * <li> Save lock state returned by {@link #getState}.
  2348. 2383          * <li> Invoke {@link #release} with saved state as argument,
  2349. 2384          *      throwing IllegalMonitorStateException if it fails.
  2350. 2385          * <li> Block until signalled, interrupted, or timed out.
  2351. 2386          * <li> Reacquire by invoking specialized version of
  2352. 2387          *      {@link #acquire} with saved state as argument.
  2353. 2388          * <li> If interrupted while blocked in step 4, throw InterruptedException.
  2354. 2389          * <li> If timed out while blocked in step 4, return false, else true.
  2355. 2390          * </ol>
  2356. 2391          */
  2357. 2392         public final boolean await(long time, TimeUnit unit)
  2358. 2393                 throws InterruptedException {
  2359. 2394             long nanosTimeout = unit.toNanos(time);
  2360. 2395             if (Thread.interrupted())
  2361. 2396                 throw new InterruptedException();
  2362. 2397             Node node = addConditionWaiter();
  2363. 2398             int savedState = fullyRelease(node);
  2364. 2399             final long deadline = System.nanoTime() + nanosTimeout;
  2365. 2400             boolean timedout = false;
  2366. 2401             int interruptMode = 0;
  2367. 2402             while (!isOnSyncQueue(node)) {
  2368. 2403                 if (nanosTimeout <= 0L) {
  2369. 2404                     timedout = transferAfterCancelledWait(node);
  2370. 2405                     break;
  2371. 2406                 }
  2372. 2407                 if (nanosTimeout >= spinForTimeoutThreshold)
  2373. 2408                     LockSupport.parkNanos(this, nanosTimeout);
  2374. 2409                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  2375. 2410                     break;
  2376. 2411                 nanosTimeout = deadline - System.nanoTime();
  2377. 2412             }
  2378. 2413             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  2379. 2414                 interruptMode = REINTERRUPT;
  2380. 2415             if (node.nextWaiter != null)
  2381. 2416                 unlinkCancelledWaiters();
  2382. 2417             if (interruptMode != 0)
  2383. 2418                 reportInterruptAfterWait(interruptMode);
  2384. 2419             return !timedout;
  2385. 2420         }
  2386. 2421
  2387. 2422         //  support for instrumentation
  2388. 2423
  2389. 2424         /**
  2390. 2425          * Returns true if this condition was created by the given
  2391. 2426          * synchronization object.
  2392. 2427          *
  2393. 2428          * @return {@code true} if owned
  2394. 2429          */
  2395. 2430         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
  2396. 2431             return sync == AbstractQueuedSynchronizer.this;
  2397. 2432         }
  2398. 2433
  2399. 2434         /**
  2400. 2435          * Queries whether any threads are waiting on this condition.
  2401. 2436          * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
  2402. 2437          *
  2403. 2438          * @return {@code true} if there are any waiting threads
  2404. 2439          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2405. 2440          *         returns {@code false}
  2406. 2441          */
  2407. 2442         protected final boolean hasWaiters() {
  2408. 2443             if (!isHeldExclusively())
  2409. 2444                 throw new IllegalMonitorStateException();
  2410. 2445             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  2411. 2446                 if (w.waitStatus == Node.CONDITION)
  2412. 2447                     return true;
  2413. 2448             }
  2414. 2449             return false;
  2415. 2450         }
  2416. 2451
  2417. 2452         /**
  2418. 2453          * Returns an estimate of the number of threads waiting on
  2419. 2454          * this condition.
  2420. 2455          * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
  2421. 2456          *
  2422. 2457          * @return the estimated number of waiting threads
  2423. 2458          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2424. 2459          *         returns {@code false}
  2425. 2460          */
  2426. 2461         protected final int getWaitQueueLength() {
  2427. 2462             if (!isHeldExclusively())
  2428. 2463                 throw new IllegalMonitorStateException();
  2429. 2464             int n = 0;
  2430. 2465             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  2431. 2466                 if (w.waitStatus == Node.CONDITION)
  2432. 2467                     ++n;
  2433. 2468             }
  2434. 2469             return n;
  2435. 2470         }
  2436. 2471
  2437. 2472         /**
  2438. 2473          * Returns a collection containing those threads that may be
  2439. 2474          * waiting on this Condition.
  2440. 2475          * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
  2441. 2476          *
  2442. 2477          * @return the collection of threads
  2443. 2478          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
  2444. 2479          *         returns {@code false}
  2445. 2480          */
  2446. 2481         protected final Collection<Thread> getWaitingThreads() {
  2447. 2482             if (!isHeldExclusively())
  2448. 2483                 throw new IllegalMonitorStateException();
  2449. 2484             ArrayList<Thread> list = new ArrayList<Thread>();
  2450. 2485             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
  2451. 2486                 if (w.waitStatus == Node.CONDITION) {
  2452. 2487                     Thread t = w.thread;
  2453. 2488                     if (t != null)
  2454. 2489                         list.add(t);
  2455. 2490                 }
  2456. 2491             }
  2457. 2492             return list;
  2458. 2493         }
  2459. 2494     }
  2460. 2495
  2461. 2496     /**
  2462. 2497      * Setup to support compareAndSet. We need to natively implement
  2463. 2498      * this here: For the sake of permitting future enhancements, we
  2464. 2499      * cannot explicitly subclass AtomicInteger, which would be
  2465. 2500      * efficient and useful otherwise. So, as the lesser of evils, we
  2466. 2501      * natively implement using hotspot intrinsics API. And while we
  2467. 2502      * are at it, we do the same for other CASable fields (which could
  2468. 2503      * otherwise be done with atomic field updaters).
  2469. 2504      */
  2470. 2505     private static final Unsafe unsafe = Unsafe.getUnsafe();
  2471. 2506     private static final long stateOffset;
  2472. 2507     private static final long headOffset;
  2473. 2508     private static final long tailOffset;
  2474. 2509     private static final long waitStatusOffset;
  2475. 2510     private static final long nextOffset;
  2476. 2511
  2477. 2512     static {
  2478. 2513         try {
  2479. 2514             stateOffset = unsafe.objectFieldOffset
  2480. 2515                 (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  2481. 2516             headOffset = unsafe.objectFieldOffset
  2482. 2517                 (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  2483. 2518             tailOffset = unsafe.objectFieldOffset
  2484. 2519                 (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  2485. 2520             waitStatusOffset = unsafe.objectFieldOffset
  2486. 2521                 (Node.class.getDeclaredField("waitStatus"));
  2487. 2522             nextOffset = unsafe.objectFieldOffset
  2488. 2523                 (Node.class.getDeclaredField("next"));
  2489. 2524
  2490. 2525         } catch (Exception ex) { throw new Error(ex); }
  2491. 2526     }
  2492. 2527
  2493. 2528     /**
  2494. 2529      * CAS head field. Used only by enq.
  2495. 2530      */
  2496. 2531     private final boolean compareAndSetHead(Node update) {
  2497. 2532         return unsafe.compareAndSwapObject(this, headOffset, null, update);
  2498. 2533     }
  2499. 2534
  2500. 2535     /**
  2501. 2536      * CAS tail field. Used only by enq.
  2502. 2537      */
  2503. 2538     private final boolean compareAndSetTail(Node expect, Node update) {
  2504. 2539         return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
  2505. 2540     }
  2506. 2541
  2507. 2542     /**
  2508. 2543      * CAS waitStatus field of a node.
  2509. 2544      */
  2510. 2545     private static final boolean compareAndSetWaitStatus(Node node,
  2511. 2546                                                          int expect,
  2512. 2547                                                          int update) {
  2513. 2548         return unsafe.compareAndSwapInt(node, waitStatusOffset,
  2514. 2549                                         expect, update);
  2515. 2550     }
  2516. 2551
  2517. 2552     /**
  2518. 2553      * CAS next field of a node.
  2519. 2554      */
  2520. 2555     private static final boolean compareAndSetNext(Node node,
  2521. 2556                                                    Node expect,
  2522. 2557                                                    Node update) {
  2523. 2558         return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
  2524. 2559     }
  2525. 2560 }
复制代码
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莫张周刘王

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表