- 36 package cn.com.pep;
- 37 import java.util.concurrent.TimeUnit;
- 38 import java.util.concurrent.locks.AbstractOwnableSynchronizer;
- 39 import java.util.concurrent.locks.Condition;
- 40 import java.util.concurrent.locks.LockSupport;
- 41 import java.util.ArrayList;
- 42 import java.util.Collection;
- 43 import java.util.Date;
- 44
- 45 import sun.misc.Unsafe;
- 46
- 47 /**
- 48 * Provides a framework for implementing blocking locks and related
- 49 * synchronizers (semaphores, events, etc) that rely on
- 50 * first-in-first-out (FIFO) wait queues. This class is designed to
- 51 * be a useful basis for most kinds of synchronizers that rely on a
- 52 * single atomic {@code int} value to represent state. Subclasses
- 53 * must define the protected methods that change this state, and which
- 54 * define what that state means in terms of this object being acquired
- 55 * or released. Given these, the other methods in this class carry
- 56 * out all queuing and blocking mechanics. Subclasses can maintain
- 57 * other state fields, but only the atomically updated {@code int}
- 58 * value manipulated using methods {@link #getState}, {@link
- 59 * #setState} and {@link #compareAndSetState} is tracked with respect
- 60 * to synchronization.
- 61 *
- 62 * <p>Subclasses should be defined as non-public internal helper
- 63 * classes that are used to implement the synchronization properties
- 64 * of their enclosing class. Class
- 65 * {@code AbstractQueuedSynchronizer} does not implement any
- 66 * synchronization interface. Instead it defines methods such as
- 67 * {@link #acquireInterruptibly} that can be invoked as
- 68 * appropriate by concrete locks and related synchronizers to
- 69 * implement their public methods.
- 70 *
- 71 * <p>This class supports either or both a default <em>exclusive</em>
- 72 * mode and a <em>shared</em> mode. When acquired in exclusive mode,
- 73 * attempted acquires by other threads cannot succeed. Shared mode
- 74 * acquires by multiple threads may (but need not) succeed. This class
- 75 * does not "understand" these differences except in the
- 76 * mechanical sense that when a shared mode acquire succeeds, the next
- 77 * waiting thread (if one exists) must also determine whether it can
- 78 * acquire as well. Threads waiting in the different modes share the
- 79 * same FIFO queue. Usually, implementation subclasses support only
- 80 * one of these modes, but both can come into play for example in a
- 81 * {@link ReadWriteLock}. Subclasses that support only exclusive or
- 82 * only shared modes need not define the methods supporting the unused mode.
- 83 *
- 84 * <p>This class defines a nested {@link ConditionObject} class that
- 85 * can be used as a {@link Condition} implementation by subclasses
- 86 * supporting exclusive mode for which method {@link
- 87 * #isHeldExclusively} reports whether synchronization is exclusively
- 88 * held with respect to the current thread, method {@link #release}
- 89 * invoked with the current {@link #getState} value fully releases
- 90 * this object, and {@link #acquire}, given this saved state value,
- 91 * eventually restores this object to its previous acquired state. No
- 92 * {@code AbstractQueuedSynchronizer} method otherwise creates such a
- 93 * condition, so if this constraint cannot be met, do not use it. The
- 94 * behavior of {@link ConditionObject} depends of course on the
- 95 * semantics of its synchronizer implementation.
- 96 *
- 97 * <p>This class provides inspection, instrumentation, and monitoring
- 98 * methods for the internal queue, as well as similar methods for
- 99 * condition objects. These can be exported as desired into classes
- 100 * using an {@code AbstractQueuedSynchronizer} for their
- 101 * synchronization mechanics.
- 102 *
- 103 * <p>Serialization of this class stores only the underlying atomic
- 104 * integer maintaining state, so deserialized objects have empty
- 105 * thread queues. Typical subclasses requiring serializability will
- 106 * define a {@code readObject} method that restores this to a known
- 107 * initial state upon deserialization.
- 108 *
- 109 * <h3>Usage</h3>
- 110 *
- 111 * <p>To use this class as the basis of a synchronizer, redefine the
- 112 * following methods, as applicable, by inspecting and/or modifying
- 113 * the synchronization state using {@link #getState}, {@link
- 114 * #setState} and/or {@link #compareAndSetState}:
- 115 *
- 116 * <ul>
- 117 * <li> {@link #tryAcquire}
- 118 * <li> {@link #tryRelease}
- 119 * <li> {@link #tryAcquireShared}
- 120 * <li> {@link #tryReleaseShared}
- 121 * <li> {@link #isHeldExclusively}
- 122 * </ul>
- 123 *
- 124 * Each of these methods by default throws {@link
- 125 * UnsupportedOperationException}. Implementations of these methods
- 126 * must be internally thread-safe, and should in general be short and
- 127 * not block. Defining these methods is the <em>only</em> supported
- 128 * means of using this class. All other methods are declared
- 129 * {@code final} because they cannot be independently varied.
- 130 *
- 131 * <p>You may also find the inherited methods from {@link
- 132 * AbstractOwnableSynchronizer} useful to keep track of the thread
- 133 * owning an exclusive synchronizer. You are encouraged to use them
- 134 * -- this enables monitoring and diagnostic tools to assist users in
- 135 * determining which threads hold locks.
- 136 *
- 137 * <p>Even though this class is based on an internal FIFO queue, it
- 138 * does not automatically enforce FIFO acquisition policies. The core
- 139 * of exclusive synchronization takes the form:
- 140 *
- 141 * <pre>
- 142 * Acquire:
- 143 * while (!tryAcquire(arg)) {
- 144 * <em>enqueue thread if it is not already queued</em>;
- 145 * <em>possibly block current thread</em>;
- 146 * }
- 147 *
- 148 * Release:
- 149 * if (tryRelease(arg))
- 150 * <em>unblock the first queued thread</em>;
- 151 * </pre>
- 152 *
- 153 * (Shared mode is similar but may involve cascading signals.)
- 154 *
- 155 * <p id="barging">Because checks in acquire are invoked before
- 156 * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
- 157 * others that are blocked and queued. However, you can, if desired,
- 158 * define {@code tryAcquire} and/or {@code tryAcquireShared} to
- 159 * disable barging by internally invoking one or more of the inspection
- 160 * methods, thereby providing a <em>fair</em> FIFO acquisition order.
- 161 * In particular, most fair synchronizers can define {@code tryAcquire}
- 162 * to return {@code false} if {@link #hasQueuedPredecessors} (a method
- 163 * specifically designed to be used by fair synchronizers) returns
- 164 * {@code true}. Other variations are possible.
- 165 *
- 166 * <p>Throughput and scalability are generally highest for the
- 167 * default barging (also known as <em>greedy</em>,
- 168 * <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
- 169 * While this is not guaranteed to be fair or starvation-free, earlier
- 170 * queued threads are allowed to recontend before later queued
- 171 * threads, and each recontention has an unbiased chance to succeed
- 172 * against incoming threads. Also, while acquires do not
- 173 * "spin" in the usual sense, they may perform multiple
- 174 * invocations of {@code tryAcquire} interspersed with other
- 175 * computations before blocking. This gives most of the benefits of
- 176 * spins when exclusive synchronization is only briefly held, without
- 177 * most of the liabilities when it isn't. If so desired, you can
- 178 * augment this by preceding calls to acquire methods with
- 179 * "fast-path" checks, possibly prechecking {@link #hasContended}
- 180 * and/or {@link #hasQueuedThreads} to only do so if the synchronizer
- 181 * is likely not to be contended.
- 182 *
- 183 * <p>This class provides an efficient and scalable basis for
- 184 * synchronization in part by specializing its range of use to
- 185 * synchronizers that can rely on {@code int} state, acquire, and
- 186 * release parameters, and an internal FIFO wait queue. When this does
- 187 * not suffice, you can build synchronizers from a lower level using
- 188 * {@link java.util.concurrent.atomic atomic} classes, your own custom
- 189 * {@link java.util.Queue} classes, and {@link LockSupport} blocking
- 190 * support.
- 191 *
- 192 * <h3>Usage Examples</h3>
- 193 *
- 194 * <p>Here is a non-reentrant mutual exclusion lock class that uses
- 195 * the value zero to represent the unlocked state, and one to
- 196 * represent the locked state. While a non-reentrant lock
- 197 * does not strictly require recording of the current owner
- 198 * thread, this class does so anyway to make usage easier to monitor.
- 199 * It also supports conditions and exposes
- 200 * one of the instrumentation methods:
- 201 *
- 202 * <pre> {@code
- 203 * class Mutex implements Lock, java.io.Serializable {
- 204 *
- 205 * // Our internal helper class
- 206 * private static class Sync extends AbstractQueuedSynchronizer {
- 207 * // Reports whether in locked state
- 208 * protected boolean isHeldExclusively() {
- 209 * return getState() == 1;
- 210 * }
- 211 *
- 212 * // Acquires the lock if state is zero
- 213 * public boolean tryAcquire(int acquires) {
- 214 * assert acquires == 1; // Otherwise unused
- 215 * if (compareAndSetState(0, 1)) {
- 216 * setExclusiveOwnerThread(Thread.currentThread());
- 217 * return true;
- 218 * }
- 219 * return false;
- 220 * }
- 221 *
- 222 * // Releases the lock by setting state to zero
- 223 * protected boolean tryRelease(int releases) {
- 224 * assert releases == 1; // Otherwise unused
- 225 * if (getState() == 0) throw new IllegalMonitorStateException();
- 226 * setExclusiveOwnerThread(null);
- 227 * setState(0);
- 228 * return true;
- 229 * }
- 230 *
- 231 * // Provides a Condition
- 232 * Condition newCondition() { return new ConditionObject(); }
- 233 *
- 234 * // Deserializes properly
- 235 * private void readObject(ObjectInputStream s)
- 236 * throws IOException, ClassNotFoundException {
- 237 * s.defaultReadObject();
- 238 * setState(0); // reset to unlocked state
- 239 * }
- 240 * }
- 241 *
- 242 * // The sync object does all the hard work. We just forward to it.
- 243 * private final Sync sync = new Sync();
- 244 *
- 245 * public void lock() { sync.acquire(1); }
- 246 * public boolean tryLock() { return sync.tryAcquire(1); }
- 247 * public void unlock() { sync.release(1); }
- 248 * public Condition newCondition() { return sync.newCondition(); }
- 249 * public boolean isLocked() { return sync.isHeldExclusively(); }
- 250 * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
- 251 * public void lockInterruptibly() throws InterruptedException {
- 252 * sync.acquireInterruptibly(1);
- 253 * }
- 254 * public boolean tryLock(long timeout, TimeUnit unit)
- 255 * throws InterruptedException {
- 256 * return sync.tryAcquireNanos(1, unit.toNanos(timeout));
- 257 * }
- 258 * }}</pre>
- 259 *
- 260 * <p>Here is a latch class that is like a
- 261 * {@link java.util.concurrent.CountDownLatch CountDownLatch}
- 262 * except that it only requires a single {@code signal} to
- 263 * fire. Because a latch is non-exclusive, it uses the {@code shared}
- 264 * acquire and release methods.
- 265 *
- 266 * <pre> {@code
- 267 * class BooleanLatch {
- 268 *
- 269 * private static class Sync extends AbstractQueuedSynchronizer {
- 270 * boolean isSignalled() { return getState() != 0; }
- 271 *
- 272 * protected int tryAcquireShared(int ignore) {
- 273 * return isSignalled() ? 1 : -1;
- 274 * }
- 275 *
- 276 * protected boolean tryReleaseShared(int ignore) {
- 277 * setState(1);
- 278 * return true;
- 279 * }
- 280 * }
- 281 *
- 282 * private final Sync sync = new Sync();
- 283 * public boolean isSignalled() { return sync.isSignalled(); }
- 284 * public void signal() { sync.releaseShared(1); }
- 285 * public void await() throws InterruptedException {
- 286 * sync.acquireSharedInterruptibly(1);
- 287 * }
- 288 * }}</pre>
- 289 *
- 290 * @since 1.5
- 291 * @author Doug Lea
- 292 */
- 293 public abstract class AbstractQueuedSynchronizer
- 294 extends AbstractOwnableSynchronizer
- 295 implements java.io.Serializable {
- 296
- 297 private static final long serialVersionUID = 7373984972572414691L;
- 298
- 299 /**
- 300 * Creates a new {@code AbstractQueuedSynchronizer} instance
- 301 * with initial synchronization state of zero.
- 302 */
- 303 protected AbstractQueuedSynchronizer() { }
- 304
- 305 /**
- 306 * Wait queue node class.
- 307 *
- 308 * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
- 309 * Hagersten) lock queue. CLH locks are normally used for
- 310 * spinlocks. We instead use them for blocking synchronizers, but
- 311 * use the same basic tactic of holding some of the control
- 312 * information about a thread in the predecessor of its node. A
- 313 * "status" field in each node keeps track of whether a thread
- 314 * should block. A node is signalled when its predecessor
- 315 * releases. Each node of the queue otherwise serves as a
- 316 * specific-notification-style monitor holding a single waiting
- 317 * thread. The status field does NOT control whether threads are
- 318 * granted locks etc though. A thread may try to acquire if it is
- 319 * first in the queue. But being first does not guarantee success;
- 320 * it only gives the right to contend. So the currently released
- 321 * contender thread may need to rewait.
- 322 *
- 323 * <p>To enqueue into a CLH lock, you atomically splice it in as new
- 324 * tail. To dequeue, you just set the head field.
- 325 * <pre>
- 326 * +------+ prev +-----+ +-----+
- 327 * head | | <---- | | <---- | | tail
- 328 * +------+ +-----+ +-----+
- 329 * </pre>
- 330 *
- 331 * <p>Insertion into a CLH queue requires only a single atomic
- 332 * operation on "tail", so there is a simple atomic point of
- 333 * demarcation from unqueued to queued. Similarly, dequeuing
- 334 * involves only updating the "head". However, it takes a bit
- 335 * more work for nodes to determine who their successors are,
- 336 * in part to deal with possible cancellation due to timeouts
- 337 * and interrupts.
- 338 *
- 339 * <p>The "prev" links (not used in original CLH locks), are mainly
- 340 * needed to handle cancellation. If a node is cancelled, its
- 341 * successor is (normally) relinked to a non-cancelled
- 342 * predecessor. For explanation of similar mechanics in the case
- 343 * of spin locks, see the papers by Scott and Scherer at
- 344 * http://www.cs.rochester.edu/u/scott/synchronization/
- 345 *
- 346 * <p>We also use "next" links to implement blocking mechanics.
- 347 * The thread id for each node is kept in its own node, so a
- 348 * predecessor signals the next node to wake up by traversing
- 349 * next link to determine which thread it is. Determination of
- 350 * successor must avoid races with newly queued nodes to set
- 351 * the "next" fields of their predecessors. This is solved
- 352 * when necessary by checking backwards from the atomically
- 353 * updated "tail" when a node's successor appears to be null.
- 354 * (Or, said differently, the next-links are an optimization
- 355 * so that we don't usually need a backward scan.)
- 356 *
- 357 * <p>Cancellation introduces some conservatism to the basic
- 358 * algorithms. Since we must poll for cancellation of other
- 359 * nodes, we can miss noticing whether a cancelled node is
- 360 * ahead or behind us. This is dealt with by always unparking
- 361 * successors upon cancellation, allowing them to stabilize on
- 362 * a new predecessor, unless we can identify an uncancelled
- 363 * predecessor who will carry this responsibility.
- 364 *
- 365 * <p>CLH queues need a dummy header node to get started. But
- 366 * we don't create them on construction, because it would be wasted
- 367 * effort if there is never contention. Instead, the node
- 368 * is constructed and head and tail pointers are set upon first
- 369 * contention.
- 370 *
- 371 * <p>Threads waiting on Conditions use the same nodes, but
- 372 * use an additional link. Conditions only need to link nodes
- 373 * in simple (non-concurrent) linked queues because they are
- 374 * only accessed when exclusively held. Upon await, a node is
- 375 * inserted into a condition queue. Upon signal, the node is
- 376 * transferred to the main queue. A special value of status
- 377 * field is used to mark which queue a node is on.
- 378 *
- 379 * <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
- 380 * Scherer and Michael Scott, along with members of JSR-166
- 381 * expert group, for helpful ideas, discussions, and critiques
- 382 * on the design of this class.
- 383 */
- 384 static final class Node {
- 385 /** Marker to indicate a node is waiting in shared mode */
- 386 static final Node SHARED = new Node();
- 387 /** Marker to indicate a node is waiting in exclusive mode */
- 388 static final Node EXCLUSIVE = null;
- 389
- 390 /** waitStatus value to indicate thread has cancelled */
- 391 static final int CANCELLED = 1;
- 392 /** waitStatus value to indicate successor's thread needs unparking */
- 393 static final int SIGNAL = -1;
- 394 /** waitStatus value to indicate thread is waiting on condition */
- 395 static final int CONDITION = -2;
- 396 /**
- 397 * waitStatus value to indicate the next acquireShared should
- 398 * unconditionally propagate
- 399 */
- 400 static final int PROPAGATE = -3;
- 401
- 402 /**
- 403 * Status field, taking on only the values:
- 404 * SIGNAL: The successor of this node is (or will soon be)
- 405 * blocked (via park), so the current node must
- 406 * unpark its successor when it releases or
- 407 * cancels. To avoid races, acquire methods must
- 408 * first indicate they need a signal,
- 409 * then retry the atomic acquire, and then,
- 410 * on failure, block.
- 411 * CANCELLED: This node is cancelled due to timeout or interrupt.
- 412 * Nodes never leave this state. In particular,
- 413 * a thread with cancelled node never again blocks.
- 414 * CONDITION: This node is currently on a condition queue.
- 415 * It will not be used as a sync queue node
- 416 * until transferred, at which time the status
- 417 * will be set to 0. (Use of this value here has
- 418 * nothing to do with the other uses of the
- 419 * field, but simplifies mechanics.)
- 420 * PROPAGATE: A releaseShared should be propagated to other
- 421 * nodes. This is set (for head node only) in
- 422 * doReleaseShared to ensure propagation
- 423 * continues, even if other operations have
- 424 * since intervened.
- 425 * 0: None of the above
- 426 *
- 427 * The values are arranged numerically to simplify use.
- 428 * Non-negative values mean that a node doesn't need to
- 429 * signal. So, most code doesn't need to check for particular
- 430 * values, just for sign.
- 431 *
- 432 * The field is initialized to 0 for normal sync nodes, and
- 433 * CONDITION for condition nodes. It is modified using CAS
- 434 * (or when possible, unconditional volatile writes).
- 435 */
- 436 volatile int waitStatus;
- 437
- 438 /**
- 439 * Link to predecessor node that current node/thread relies on
- 440 * for checking waitStatus. Assigned during enqueuing, and nulled
- 441 * out (for sake of GC) only upon dequeuing. Also, upon
- 442 * cancellation of a predecessor, we short-circuit while
- 443 * finding a non-cancelled one, which will always exist
- 444 * because the head node is never cancelled: A node becomes
- 445 * head only as a result of successful acquire. A
- 446 * cancelled thread never succeeds in acquiring, and a thread only
- 447 * cancels itself, not any other node.
- 448 */
- 449 volatile Node prev;
- 450
- 451 /**
- 452 * Link to the successor node that the current node/thread
- 453 * unparks upon release. Assigned during enqueuing, adjusted
- 454 * when bypassing cancelled predecessors, and nulled out (for
- 455 * sake of GC) when dequeued. The enq operation does not
- 456 * assign next field of a predecessor until after attachment,
- 457 * so seeing a null next field does not necessarily mean that
- 458 * node is at end of queue. However, if a next field appears
- 459 * to be null, we can scan prev's from the tail to
- 460 * double-check. The next field of cancelled nodes is set to
- 461 * point to the node itself instead of null, to make life
- 462 * easier for isOnSyncQueue.
- 463 */
- 464 volatile Node next;
- 465
- 466 /**
- 467 * The thread that enqueued this node. Initialized on
- 468 * construction and nulled out after use.
- 469 */
- 470 volatile Thread thread;
- 471
- 472 /**
- 473 * Link to next node waiting on condition, or the special
- 474 * value SHARED. Because condition queues are accessed only
- 475 * when holding in exclusive mode, we just need a simple
- 476 * linked queue to hold nodes while they are waiting on
- 477 * conditions. They are then transferred to the queue to
- 478 * re-acquire. And because conditions can only be exclusive,
- 479 * we save a field by using special value to indicate shared
- 480 * mode.
- 481 */
- 482 Node nextWaiter;
- 483
- 484 /**
- 485 * Returns true if node is waiting in shared mode.
- 486 */
- 487 final boolean isShared() {
- 488 return nextWaiter == SHARED;
- 489 }
- 490
- 491 /**
- 492 * Returns previous node, or throws NullPointerException if null.
- 493 * Use when predecessor cannot be null. The null check could
- 494 * be elided, but is present to help the VM.
- 495 *
- 496 * @return the predecessor of this node
- 497 */
- 498 final Node predecessor() throws NullPointerException {
- 499 Node p = prev;
- 500 if (p == null)
- 501 throw new NullPointerException();
- 502 else
- 503 return p;
- 504 }
- 505
- 506 Node() { // Used to establish initial head or SHARED marker
- 507 }
- 508
- 509 Node(Thread thread, Node mode) { // Used by addWaiter
- 510 this.nextWaiter = mode;
- 511 this.thread = thread;
- 512 }
- 513
- 514 Node(Thread thread, int waitStatus) { // Used by Condition
- 515 this.waitStatus = waitStatus;
- 516 this.thread = thread;
- 517 }
- 518 }
- 519
- 520 /**
- 521 * Head of the wait queue, lazily initialized. Except for
- 522 * initialization, it is modified only via method setHead. Note:
- 523 * If head exists, its waitStatus is guaranteed not to be
- 524 * CANCELLED.
- 525 */
- 526 private transient volatile Node head;
- 527
- 528 /**
- 529 * Tail of the wait queue, lazily initialized. Modified only via
- 530 * method enq to add new wait node.
- 531 */
- 532 private transient volatile Node tail;
- 533
- 534 /**
- 535 * The synchronization state.
- 536 */
- 537 private volatile int state;
- 538
- 539 /**
- 540 * Returns the current value of synchronization state.
- 541 * This operation has memory semantics of a {@code volatile} read.
- 542 * @return current state value
- 543 */
- 544 protected final int getState() {
- 545 return state;
- 546 }
- 547
- 548 /**
- 549 * Sets the value of synchronization state.
- 550 * This operation has memory semantics of a {@code volatile} write.
- 551 * @param newState the new state value
- 552 */
- 553 protected final void setState(int newState) {
- 554 state = newState;
- 555 }
- 556
- 557 /**
- 558 * Atomically sets synchronization state to the given updated
- 559 * value if the current state value equals the expected value.
- 560 * This operation has memory semantics of a {@code volatile} read
- 561 * and write.
- 562 *
- 563 * @param expect the expected value
- 564 * @param update the new value
- 565 * @return {@code true} if successful. False return indicates that the actual
- 566 * value was not equal to the expected value.
- 567 */
- 568 protected final boolean compareAndSetState(int expect, int update) {
- 569 // See below for intrinsics setup to support this
- 570 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- 571 }
- 572
- 573 // Queuing utilities
- 574
- 575 /**
- 576 * The number of nanoseconds for which it is faster to spin
- 577 * rather than to use timed park. A rough estimate suffices
- 578 * to improve responsiveness with very short timeouts.
- 579 */
- 580 static final long spinForTimeoutThreshold = 1000L;
- 581
- 582 /**
- 583 * Inserts node into queue, initializing if necessary. See picture above.
- 584 * @param node the node to insert
- 585 * @return node's predecessor
- 586 */
- 587 private Node enq(final Node node) {
- 588 /*"自旋",将给定的节点插入到同步队列的尾部*/
- 589 for (;;) {
- 590 Node t = tail;
- 591 /*同步队列为空,则dummy哑节点作为同步队列的头结点head,并且将尾节点tail也指向头结点head*/
- 592 if (t == null) {
- 593 /*CAS操作,设置同步队列的头结点*/
- 594 if (compareAndSetHead(new Node())) {
- 595 /*将尾节点设置为头结点,进入下次"自旋"*/
- 596 tail = head;
- 597 }
- 598 }else {
- 599 /*尾部节点不为空,则进行正常添加动作*/
- 600 node.prev = t;
- 601 /*CAS操作,设置同步队列的头结点*/
- 602 if (compareAndSetTail(t, node)) {
- 603 t.next = node;
- 604 return t;
- 605 }
- 606 }
- 607 }
- 608 }
- 609
- 610 /**
- 611 * Creates and enqueues node for current thread and given mode.
- 612 * 以给定的模式包装当前线程节点,将当前节点加入到阻塞队列的队尾
- 613 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
- 614 * @return the new node
- 615 */
- 616 private Node addWaiter(Node mode) {
- 617 /*以给定的模式将当前线程包装成node节点*/
- 618 Node node = new Node(Thread.currentThread(), mode);
- 619
- 620 /*快速采用尾插法,将当前节点插入到同步队列的队尾*/
- 621 Node predNode = tail;
- 622 if (predNode != null) {
- 623 /*preNode <-- node*/
- 624 node.prev = predNode;
- 625 /*采用CAS将node设置阻塞队列的尾节点,设置成功,说明没有并发*/
- 626 if (compareAndSetTail(tail, node)) {
- 627 predNode.next = node;
- 628 /*尾插法插入成功,则直接返回当前节点*/
- 629 return node;
- 630 }
- 631 }
- 632 /*"自旋"将节点加入到队列的尾部,直到成功为止*/
- 633 enq(node);
- 634 return node;
- 635 }
- 636
- 637 /**
- 638 * Sets head of queue to be node, thus dequeuing. Called only by
- 639 * acquire methods. Also nulls out unused fields for sake of GC
- 640 * and to suppress unnecessary signals and traversals.
- 641 *
- 642 * @param node the node
- 643 */
- 644 private void setHead(Node node) {
- 645 head = node;
- 646 node.thread = null;
- 647 node.prev = null;
- 648 }
- 649
- 650 /**
- 651 * Wakes up node's successor, if one exists.
- 652 *
- 653 * @param node the node
- 654 */
- 655 private void unparkSuccessor(Node node) {
- 656 /*
- 657 * If status is negative (i.e., possibly needing signal) try
- 658 * to clear in anticipation of signalling. It is OK if this
- 659 * fails or if status is changed by waiting thread.
- 660 */
- 661 //在这里,这个节点其实是同步队列的头结点,头结点唤醒后继节点之后,使命就完成了,所以应该将其状态置为0
- 662 int ws = node.waitStatus;
- 663 if (ws < 0)
- 664 compareAndSetWaitStatus(node, ws, 0);
- 665
- 666 /*
- 667 * Thread to unpark is held in successor, which is normally
- 668 * just the next node. But if cancelled or apparently null,
- 669 * traverse backwards from tail to find the actual
- 670 * non-cancelled successor.
- 671 */
- 672 Node s = node.next;
- 673 //因为s.next相当于从同步队列的头部遍历所以可能会出现s == null的情况,上面分析过原因,不再赘述了。
- 674 if (s == null || s.waitStatus > 0) {
- 675 s = null;
- 676 //从同步队列的尾部向前遍历,找到当前node节点(头结点)的最近的有效后继节点
- 677 for (Node t = tail; t != null && t != node; t = t.prev)
- 678 if (t.waitStatus <= 0)
- 679 s = t;
- 680 }
- 681
- 682 /**
- 683 * 找到最近的有效后继节点,则唤醒后继节点中的线程在parkAndCheckInterrupt()方法上的阻塞,去尝试竞争共享资源,
- 684 * 这就体现了线程之间的协作,而在这个竞争的过程中也会忽略这个Node.CANCELLED状态的节点,这当前node节点也就放弃了竞争共享资源的机会,相当于出队了。
- 685 */
- 686 if (s != null)
- 687 LockSupport.unpark(s.thread);
- 688 }
- 689
- 690 /**
- 691 * Release action for shared mode -- signals successor and ensures
- 692 * propagation. (Note: For exclusive mode, release just amounts
- 693 * to calling unparkSuccessor of head if it needs signal.)
- 694 */
- 695 private void doReleaseShared() {
- 696 /*
- 697 * Ensure that a release propagates, even if there are other
- 698 * in-progress acquires/releases. This proceeds in the usual
- 699 * way of trying to unparkSuccessor of head if it needs
- 700 * signal. But if it does not, status is set to PROPAGATE to
- 701 * ensure that upon release, propagation continues.
- 702 * Additionally, we must loop in case a new node is added
- 703 * while we are doing this. Also, unlike other uses of
- 704 * unparkSuccessor, we need to know if CAS to reset status
- 705 * fails, if so rechecking.
- 706 */
- 707 for (;;) {
- 708 Node h = head;
- 709 if (h != null && h != tail) {
- 710 int ws = h.waitStatus;
- 711 if (ws == Node.SIGNAL) {
- 712 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- 713 continue; // loop to recheck cases
- 714 unparkSuccessor(h);
- 715 }
- 716 else if (ws == 0 &&
- 717 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- 718 continue; // loop on failed CAS
- 719 }
- 720 if (h == head) // loop if head changed
- 721 break;
- 722 }
- 723 }
- 724
- 725 /**
- 726 * Sets head of queue, and checks if successor may be waiting
- 727 * in shared mode, if so propagating if either propagate > 0 or
- 728 * PROPAGATE status was set.
- 729 *
- 730 * @param node the node
- 731 * @param propagate the return value from a tryAcquireShared
- 732 */
- 733 private void setHeadAndPropagate(Node node, int propagate) {
- 734 //后继节点成功获取了共享锁,队列的"旧head"还没有改变,将其保存下来,锁定到方法的局部变量做后序的判断使用;
- 735 Node h = head; // Record old head for check below
- 736 /**
- 737 * 将这个获取共享锁成功的后继节点设置为同步队列的“新head”,此时同步队列的head发生变化, 此线程还未唤起任何线程。
- 738 */
- 739 setHead(node);
- 740 /**
- 741 * 1、h == null这个条件什么时候成立呢?仔细翻了下AQS中的源码发现:
- 742 * 这个setHeadAndPropagate()方法只在共享锁模式下,同步队列head的后继节点成功获取了共享锁才会调用。
- 743 * 获取到共享锁的当前线程是同步队列的头结点的后继节点,"旧head"有后继节点,说明同步队列不为空,那么"旧head"也必定不为空,
- 744 * 此方法中第一行通过h == head,在执行setHead(node)方法之前将"旧head"保存了下来,所以h == null必定不会成立,
- 745 * 至于为什么这么写呢? 查阅了下资料网上说"发现这个是防止空指针异常发生的标准写法(既如果要取一个对象的某个属性进行判断的时候,首先对这个对象进行null判断)。"
- 746 * 这说的过去吧?
- 747 *
- 748 * 2、(h = head) == null这个条件什么时候成立呢?
- 749 * 这个条件也是不可能成立的,下面这种情况应该是最常见的:
- 750 * (1)、例如有个Semaphore实例s初始化了2个许可,线程A首先调用s.acquire(2)申请了两个许可,成功申请到了许可;
- 751 * (2)、线程B调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列;
- 752 * (3)、线程C调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列;
- 753 * (4)、线程A调用了s.releaseShared(2)方法释放了两个许可,再调用doReleaseShared()方法,进行同步队列唤醒;
- 754 * (5)、首先唤醒了同步队列中的线程B,B线程获取到共享锁:
- 755 * a)、如果此时线程B还未setHead(Node)方法,还未改变同步队列的head头结点,那么线程A的唤醒工作就结束,也仅仅只是唤醒了同步队列中的线程B,
- 756 * 则必定有(h = head) == Node(C) != null成立,线程C的唤醒工作仍然需要线程B去执行;
- 757 * b)、如果此时线程B执行了SetHead(Node)方法,改变了同步队列的head头结点,那么线程A同时也会唤醒线程C,相当于线程A同时唤醒了线程B和线程C:
- 758 * 1)、如果线程C中的setHeadAndPropagate()在线程B前调用完毕(即线程C执行了setHead()方法改变了同步队列的head),那么 (h = head) == Node(C);
- 759 * 2)、如果线程C中的setHeadAndPropagate()在线程B之后才调用(即线程C此时还未执行setHead()方法,未改变同步队列的head),那么 (h = head) == Node(B)
- 760 * 所以综上所述,只要执行过addWaiter()方法,向同步队列中添加过线程,那么(h = head)== null必定不成立。只能理解为“防止空指针的标准写法”。
- 761 */
- 762 if (propagate > 0 || h == null || h.waitStatus < 0 ||
- 763 (h = head) == null || h.waitStatus < 0) {
- 764 Node s = node.next;
- 765 /**
- 766 * s == null这种情况是可能存在的,如果当前唤醒的这个node节点是同步队列的尾节点就可能出现node.next == null;
- 767 * s.isShared()指定是共享锁模式,当前线程获取共享锁之后,是需要尝试唤醒同步队列中的其它线程的。
- 768 */
- 769 if (s == null || s.isShared())
- 770 doReleaseShared();
- 771 }
- 772 }
- 773
- 774 // Utilities for various versions of acquire
- 775
- 776 /**
- 777 * Cancels an ongoing attempt to acquire.
- 778 * @param node the node
- 779 */
- 780 private void cancelAcquire(Node node) {
- 781 //当前节点为空,则说明当前线程永远不会被调度到了,所以直接返回
- 782 if (node == null) {
- 783 return;
- 784 }
- 785
- 786 /**
- 787 * 接下来将点前Node节点从同步队列出队,主要做以下几件事:
- 788 * 1、将当前节点不与任何线程绑定,设置当前节点为Node.CANCELLED状态;
- 789 * 2、将当前取消节点的前置非取消节点和后置非取消节点"链接"起来;
- 790 * 3、如果前置节点释放了锁,那么当前取消节点承担起后续节点的唤醒职责。
- 791 */
- 792
- 793 //1、取消当前节点与线程的绑定
- 794 node.thread = null;
- 795
- 796 //2、找到当前节点的有效前继节点pred
- 797 Node pred = node.prev;
- 798 while (pred.waitStatus > 0) {
- 799 //为什么双向链表从后往前遍历呢?而不是从前往后遍历呢?
- 800 node.prev = pred = pred.prev;
- 801 }
- 802
- 803 //用作CAS操作时候的条件判断需要使用的值
- 804 Node predNext = pred.next;
- 805
- 806 //3、将当前节点设置为取消状态
- 807 node.waitStatus = Node.CANCELLED;
- 808
- 809 /**
- 810 * 接下来就需要将当前取消节点的前后两个有效节点"链接"起来了,"达成让当前node节点出队的目的"。
- 811 * 这里按照node节点在同步队列中的不同位置分了三种情况:
- 812 * 1、node节点是同步队列的尾节点tail;
- 813 * 2、node节点既不是同步队列头结点head的后继节点,也不是尾节点tail;
- 814 * 3、node节点是同步队列头结点head的后继节点;
- 815 */
- 816
- 817 //1、node是尾节点,并且执行过程中没有并发,直接将pred设置为同步队列的tail
- 818 if (node == tail && compareAndSetTail(node, pred)) {
- 819 /*
- 820 * 此时pred已经设置为同步队列的tail,需要通过CAS操作,将pred的next指向null,没有节点再引用node,就完成了node节点的出队
- 821 * 可以看出出队操作会破坏这个同步队列的next指针,这应该“向链表从后往前遍历呢?而不是从前往后遍历呢”的原因吧?
- 822 */
- 823 compareAndSetNext(pred, predNext, null);
- 824 }else {
- 825 /*
- 826 * 2、node不是尾节点,也不是头结点head的后继节点,那么当前节点node出队以后,node的有效前继结点pred,
- 827 * 就有义务在它自身释放资源的时候,唤醒node的有效后继节点successor,即将pred的状态设置为Node.SIGNAL;
- 828 */
- 829 int ws;
- 830 //能执行到这里,说明当前node节点不是head的后继节点,也不是同步队列tail节点
- 831 if (pred != head &&
- 832 ((ws = pred.waitStatus) == Node.SIGNAL ||
- 833 //前继节点状态虽然有效但不是SIGNAL,采用CAS操作设置为SIGNAL确保后继有效节点可以被唤醒
- 834 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
- 835 pred.thread != null) {
- 836 Node next = node.next;
- 837 //只负责唤醒有效后继节点
- 838 if (next != null && next.waitStatus <= 0) {
- 839 /**
- 840 * 下面这段代码相当于将pred-->next,我们提到这个同步队列是个双向队列,那么pred<--next这是谁执行的呢?
- 841 * 答案是其他线程:其它线程在获取共享资源在同步队列中阻塞的时候,调用shouldParkAfterFailedAcquire()方法,
- 842 * 从后向前遍历队列,寻找能唤醒它的有效前继节点,当找到node的时候,因为它的状态已经是Node.CANCELLED,所以会忽略node节点,
- 843 * 直到遍历到有效前继节点pred,将next.prev执行pred,即next--->pred,没有节点再引用node节点,所以node节点至此才完成出队。
- 844 */
- 845 compareAndSetNext(pred, predNext, next);
- 846 }
- 847 }else {
- 848 //3、说明node节点是同步队列head的后继节点,调用unparkSuccessor(Node)"出队"。
- 849 unparkSuccessor(node);
- 850 }
- 851
- 852 node.next = node;//help GC
- 853 }
- 854 }
- 855
- 856 /**
- 857 * Checks and updates status for a node that failed to acquire.
- 858 * Returns true if thread should block. This is the main signal
- 859 * control in all acquire loops. Requires that pred == node.prev.
- 860 *
- 861 * @param pred node's predecessor holding status
- 862 * @param node the node
- 863 * @return {@code true} if thread should block
- 864 */
- 865 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- 866 int ws = pred.waitStatus;
- 867 /*判断前驱结点的状态,只有前驱结点的状态为SIGNAL,后继节点才能被唤醒,所以其可以安心地挂起来了*/
- 868 if (ws == node.waitStatus) {
- 869 return true;
- 870 }
- 871
- 872 /*ws>0表示前驱结点中的线程已经被取消调度了,则认为其是无效节点,继续向前查找,直至找到有效状态的节点*/
- 873 if (ws > 0) {
- 874 do {
- 875 //前驱结点已经被取消,则将前驱结点设置为pred = pred.prev
- 876 node.prev = pred = pred.prev;
- 877 //不断遍历,直到找到第一个不是取消状态的节点
- 878 } while (pred.waitStatus > 0);
- 879 //
- 880 pred.next = node;
- 881 }else {
- 882 /*前驱结点状态正常,将前驱结点状态设置为SIGNAL,则前驱结点释放资源的时候,就可以尝试唤醒它的后继节点了*/
- 883 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- 884 }
- 885 return false;
- 886 }
- 887
- 888 /**
- 889 * Convenience method to interrupt current thread.
- 890 */
- 891 static void selfInterrupt() {
- 892 Thread.currentThread().interrupt();
- 893 }
- 894
- 895 /**
- 896 * Convenience method to park and then check if interrupted
- 897 *
- 898 * @return {@code true} if interrupted
- 899 */
- 900 private final boolean parkAndCheckInterrupt() {
- 901 //将当前线程挂起
- 902 LockSupport.park(this);
- 903 //检测当前线程的中断状态,并且会清除线程的中断标记
- 904 return Thread.interrupted();
- 905 }
- 906
- 907 /*
- 908 * Various flavors of acquire, varying in exclusive/shared and
- 909 * control modes. Each is mostly the same, but annoyingly
- 910 * different. Only a little bit of factoring is possible due to
- 911 * interactions of exception mechanics (including ensuring that we
- 912 * cancel if tryAcquire throws exception) and other control, at
- 913 * least not without hurting performance too much.
- 914 */
- 915
- 916 /**
- 917 * Acquires in exclusive uninterruptible mode for thread already in
- 918 * queue. Used by condition wait methods as well as acquire.
- 919 * 使线程阻塞在同步队列中等待获取资源,直到获取资源成功才返回,此过程中线程发生了中断就返回true,否则就返回false
- 920 * @param node the node
- 921 * @param arg the acquire argument
- 922 * @return {@code true} if interrupted while waiting
- 923 */
- 924 final boolean acquireQueued(final Node node, int arg) {
- 925 /*标记等待过程中是否发生了异常*/
- 926 boolean failed = true;
- 927
- 928 try {
- 929 /*标记线程阻塞的过程中是否发生了中断*/
- 930 boolean interrupted = false;
- 931 /*线程自旋阻塞*/
- 932 for(;;){
- 933 /*获取当前节点的前驱结点*/
- 934 final Node p = node.predecessor();
- 935 /*前驱结点是头结点,则说明当前线程有资格获取共享资源,尝试获取,获取成功,将当前节点设置为头结点*/
- 936 if (p == head && tryAcquire(arg)) {
- 937 /*将当前节点设置为头结点*/
- 938 setHead(node);
- 939 p.next = null; //help GC
- 940 failed = false;
- 941 return interrupted;
- 942 }
- 943
- 944 /*判断当前线程是否可以挂起*/
- 945 if (shouldParkAfterFailedAcquire(p, node)
- 946 /*当前线程可以挂起,则挂起线程,并且线程被unpark()唤醒,检查线程的状态*/
- 947 && parkAndCheckInterrupt()) {
- 948 interrupted = true;
- 949 }
- 950 }
- 951 }finally{
- 952 if (failed) {
- 953 /*阻塞获取同步资源的时候,发生了异常,取消当前线程的在同步队列中的排队*/
- 954 cancelAcquire(node);
- 955 }
- 956 }
- 957 }
- 958
- 959 /**
- 960 * Acquires in exclusive interruptible mode.
- 961 * @param arg the acquire argument
- 962 */
- 963 //尝试获取锁,并响应中断
- 964 private void doAcquireInterruptibly(int arg)
- 965 throws InterruptedException {
- 966 //将当前线程包装成Node节点,加入到同步队列sync queue中
- 967 final Node node = addWaiter(Node.EXCLUSIVE);
- 968 //标记是否获取锁的过程中是否发生了异常
- 969 boolean failed = true;
- 970 try {
- 971 //自旋
- 972 for (;;) {
- 973 //获取当前节点的前驱结点
- 974 final Node p = node.predecessor();
- 975 //如果当前节点的前驱结点是头结点head,尝试获取锁,当前线程才有资格获取排他锁(头结点head是个哑结点,不代表任何线程)
- 976 if (p == head && tryAcquire(arg)) {
- 977 //当前节点获取锁成功,将当前节点设置为头结点head
- 978 setHead(node);
- 979 //将原来的头结点从同步队列sync queue中移除
- 980 p.next = null; // help GC
- 981 failed = false;
- 982 return;
- 983 }
- 984 //尝试获取锁失败,判断是否可以将当前线程安全地挂起,只有当前线程有效的前驱结点状态为Node.SIGNAL,当前线程才可以安全地被挂起,后续也会被及时地唤醒
- 985 if (shouldParkAfterFailedAcquire(p, node) &&
- 986 //将当前线程挂起,并等他唤醒的时候判断是否发生了线程中断,发生了线程中断,则进入finally块中,取消当前线程对锁的尝试获取
- 987 parkAndCheckInterrupt())
- 988 throw new InterruptedException();
- 989 }
- 990 } finally {
- 991 if (failed)
- 992 cancelAcquire(node);
- 993 }
- 994 }
- 995
- 996 /**
- 997 * Acquires in exclusive timed mode.
- 998 *
- 999 * @param arg the acquire argument
- 1000 * @param nanosTimeout max wait time
- 1001 * @return {@code true} if acquired
- 1002 */
- 1003 private boolean doAcquireNanos(int arg, long nanosTimeout)
- 1004 throws InterruptedException {
- 1005 //指定的时间<=0则直接返回false
- 1006 if (nanosTimeout <= 0L)
- 1007 return false;
- 1008 //计算当前线程阻塞的截至时间
- 1009 final long deadline = System.nanoTime() + nanosTimeout;
- 1010 //将当前线程包装成Node节点,加入到同步队列sync queue中
- 1011 final Node node = addWaiter(Node.EXCLUSIVE);
- 1012 boolean failed = true;
- 1013 try {
- 1014 //自旋尝试获取锁
- 1015 for (;;) {
- 1016 //获取当前节点的前驱结点
- 1017 final Node p = node.predecessor();
- 1018 //前驱结点是头结点,则当前节点有资格尝试获取锁,则尝试获取锁
- 1019 if (p == head && tryAcquire(arg)) {
- 1020 //获取锁成功,则将当前节点设置为头结点head
- 1021 setHead(node);
- 1022 //将同步队列sync queue中原来的头结点移除队列
- 1023 p.next = null; // help GC
- 1024 //表示获取锁的过程没有发生异常
- 1025 failed = false;
- 1026 return true;
- 1027 }
- 1028 //计算剩余的等待时间
- 1029 nanosTimeout = deadline - System.nanoTime();
- 1030 //剩余的等待时间<=0,直接返回false
- 1031 if (nanosTimeout <= 0L)
- 1032 return false;
- 1033 //当前线程获取锁失败,判断是否可以将当前线程安全地挂起
- 1034 if (shouldParkAfterFailedAcquire(p, node) &&
- 1035 //如果剩余的等待时间<= spinForTimeoutThreshold,则不用将当前线程挂起,进行自旋即可
- 1036 nanosTimeout > spinForTimeoutThreshold)
- 1037 LockSupport.parkNanos(this, nanosTimeout);
- 1038 //获取锁的过程中发生了中断,则直接抛出InterruptedException异常
- 1039 if (Thread.interrupted())
- 1040 throw new InterruptedException();
- 1041 }
- 1042 } finally {
- 1043 //获取锁的过程中发生了异常,则将当前线程取消
- 1044 if (failed)
- 1045 cancelAcquire(node);
- 1046 }
- 1047 }
- 1048
- 1049 /**
- 1050 * Acquires in shared uninterruptible mode.
- 1051 * @param arg the acquire argument
- 1052 */
- 1053 private void doAcquireShared(int arg) {
- 1054 //将竞争共享资源失败的线程加入到同步队列中,并标记为共享模式
- 1055 final Node node = addWaiter(Node.SHARED);
- 1056 //标记阻塞获取资源的过程中是否发生了异常
- 1057 boolean failed = true;
- 1058
- 1059 try {
- 1060 //标记阻塞获取资源的过程中,是否发生了线程中断请求
- 1061 boolean interrupted = false;
- 1062 //线程阻塞等待获取资源,被有效前继节点唤醒后,尝试竞争共享资源
- 1063 for(;;){
- 1064 /**
- 1065 * 当前线程被唤醒之后,什么时候有资格竞争共享资源呢?
- 1066 * 之后当它的前继节点是头结点(头结点是当前持有共享资源的线程),在唤醒后继节点的过程中,可能释放了资源,所以后继节点尝试获取一次共享资源。
- 1067 */
- 1068 final Node p = node.predecessor();
- 1069 if (p == head) {
- 1070 int r = tryAcquireShared(arg);
- 1071 if (r >= 0) {
- 1072 setHeadAndPropagate(node, r);
- 1073 p.next = null;//help GC
- 1074 if (interrupted) {
- 1075 selfInterrupt();
- 1076 }
- 1077 failed = false;
- 1078 return;
- 1079 }
- 1080 }
- 1081
- 1082 if (shouldParkAfterFailedAcquire(p, node) &&
- 1083 parkAndCheckInterrupt()) {
- 1084 interrupted = true;
- 1085 }
- 1086 }
- 1087 } finally{
- 1088 if (failed) {
- 1089 cancelAcquire(node);
- 1090 }
- 1091 }
- 1092 }
- 1093
- 1094 /**
- 1095 * Acquires in shared interruptible mode.
- 1096 * @param arg the acquire argument
- 1097 */
- 1098 private void doAcquireSharedInterruptibly(int arg)
- 1099 throws InterruptedException {
- 1100 final Node node = addWaiter(Node.SHARED);
- 1101 boolean failed = true;
- 1102 try {
- 1103 for (;;) {
- 1104 final Node p = node.predecessor();
- 1105 if (p == head) {
- 1106 int r = tryAcquireShared(arg);
- 1107 if (r >= 0) {
- 1108 setHeadAndPropagate(node, r);
- 1109 p.next = null; // help GC
- 1110 failed = false;
- 1111 return;
- 1112 }
- 1113 }
- 1114 if (shouldParkAfterFailedAcquire(p, node) &&
- 1115 parkAndCheckInterrupt())
- 1116 throw new InterruptedException();
- 1117 }
- 1118 } finally {
- 1119 if (failed)
- 1120 cancelAcquire(node);
- 1121 }
- 1122 }
- 1123
- 1124 /**
- 1125 * Acquires in shared timed mode.
- 1126 *
- 1127 * @param arg the acquire argument
- 1128 * @param nanosTimeout max wait time
- 1129 * @return {@code true} if acquired
- 1130 */
- 1131 private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
- 1132 throws InterruptedException {
- 1133 if (nanosTimeout <= 0L)
- 1134 return false;
- 1135 final long deadline = System.nanoTime() + nanosTimeout;
- 1136 final Node node = addWaiter(Node.SHARED);
- 1137 boolean failed = true;
- 1138 try {
- 1139 for (;;) {
- 1140 final Node p = node.predecessor();
- 1141 if (p == head) {
- 1142 int r = tryAcquireShared(arg);
- 1143 if (r >= 0) {
- 1144 setHeadAndPropagate(node, r);
- 1145 p.next = null; // help GC
- 1146 failed = false;
- 1147 return true;
- 1148 }
- 1149 }
- 1150 nanosTimeout = deadline - System.nanoTime();
- 1151 if (nanosTimeout <= 0L)
- 1152 return false;
- 1153 if (shouldParkAfterFailedAcquire(p, node) &&
- 1154 nanosTimeout > spinForTimeoutThreshold)
- 1155 LockSupport.parkNanos(this, nanosTimeout);
- 1156 if (Thread.interrupted())
- 1157 throw new InterruptedException();
- 1158 }
- 1159 } finally {
- 1160 if (failed)
- 1161 cancelAcquire(node);
- 1162 }
- 1163 }
- 1164
- 1165 // Main exported methods
- 1166
- 1167 /**
- 1168 * Attempts to acquire in exclusive mode. This method should query
- 1169 * if the state of the object permits it to be acquired in the
- 1170 * exclusive mode, and if so to acquire it.
- 1171 *
- 1172 * <p>This method is always invoked by the thread performing
- 1173 * acquire. If this method reports failure, the acquire method
- 1174 * may queue the thread, if it is not already queued, until it is
- 1175 * signalled by a release from some other thread. This can be used
- 1176 * to implement method {@link Lock#tryLock()}.
- 1177 *
- 1178 * <p>The default
- 1179 * implementation throws {@link UnsupportedOperationException}.
- 1180 *
- 1181 * @param arg the acquire argument. This value is always the one
- 1182 * passed to an acquire method, or is the value saved on entry
- 1183 * to a condition wait. The value is otherwise uninterpreted
- 1184 * and can represent anything you like.
- 1185 * @return {@code true} if successful. Upon success, this object has
- 1186 * been acquired.
- 1187 * @throws IllegalMonitorStateException if acquiring would place this
- 1188 * synchronizer in an illegal state. This exception must be
- 1189 * thrown in a consistent fashion for synchronization to work
- 1190 * correctly.
- 1191 * @throws UnsupportedOperationException if exclusive mode is not supported
- 1192 */
- 1193 protected boolean tryAcquire(int arg) {
- 1194 throw new UnsupportedOperationException();
- 1195 }
- 1196
- 1197 /**
- 1198 * Attempts to set the state to reflect a release in exclusive
- 1199 * mode.
- 1200 *
- 1201 * <p>This method is always invoked by the thread performing release.
- 1202 *
- 1203 * <p>The default implementation throws
- 1204 * {@link UnsupportedOperationException}.
- 1205 *
- 1206 * @param arg the release argument. This value is always the one
- 1207 * passed to a release method, or the current state value upon
- 1208 * entry to a condition wait. The value is otherwise
- 1209 * uninterpreted and can represent anything you like.
- 1210 * @return {@code true} if this object is now in a fully released
- 1211 * state, so that any waiting threads may attempt to acquire;
- 1212 * and {@code false} otherwise.
- 1213 * @throws IllegalMonitorStateException if releasing would place this
- 1214 * synchronizer in an illegal state. This exception must be
- 1215 * thrown in a consistent fashion for synchronization to work
- 1216 * correctly.
- 1217 * @throws UnsupportedOperationException if exclusive mode is not supported
- 1218 */
- 1219 protected boolean tryRelease(int arg) {
- 1220 throw new UnsupportedOperationException();
- 1221 }
- 1222
- 1223 /**
- 1224 * Attempts to acquire in shared mode. This method should query if
- 1225 * the state of the object permits it to be acquired in the shared
- 1226 * mode, and if so to acquire it.
- 1227 *
- 1228 * <p>This method is always invoked by the thread performing
- 1229 * acquire. If this method reports failure, the acquire method
- 1230 * may queue the thread, if it is not already queued, until it is
- 1231 * signalled by a release from some other thread.
- 1232 *
- 1233 * <p>The default implementation throws {@link
- 1234 * UnsupportedOperationException}.
- 1235 *
- 1236 * @param arg the acquire argument. This value is always the one
- 1237 * passed to an acquire method, or is the value saved on entry
- 1238 * to a condition wait. The value is otherwise uninterpreted
- 1239 * and can represent anything you like.
- 1240 * @return a negative value on failure; zero if acquisition in shared
- 1241 * mode succeeded but no subsequent shared-mode acquire can
- 1242 * succeed; and a positive value if acquisition in shared
- 1243 * mode succeeded and subsequent shared-mode acquires might
- 1244 * also succeed, in which case a subsequent waiting thread
- 1245 * must check availability. (Support for three different
- 1246 * return values enables this method to be used in contexts
- 1247 * where acquires only sometimes act exclusively.) Upon
- 1248 * success, this object has been acquired.
- 1249 * @throws IllegalMonitorStateException if acquiring would place this
- 1250 * synchronizer in an illegal state. This exception must be
- 1251 * thrown in a consistent fashion for synchronization to work
- 1252 * correctly.
- 1253 * @throws UnsupportedOperationException if shared mode is not supported
- 1254 */
- 1255 protected int tryAcquireShared(int arg) {
- 1256 for(;;){
- 1257 //计算可用资源量
- 1258 int available = getState();
- 1259 //计算剩余资源量
- 1260 int remaining = available - arg;
- 1261 //计算资源数量,不够直接放回,加入CAS是为了保证state状态一定能够更新成功
- 1262 if (remaining < 0 ||
- 1263 compareAndSetState(available, remaining)) {
- 1264 return remaining;
- 1265 }
- 1266 }
- 1267
- 1268 //throw new UnsupportedOperationException();
- 1269 }
- 1270
- 1271 /**
- 1272 * Attempts to set the state to reflect a release in shared mode.
- 1273 *
- 1274 * <p>This method is always invoked by the thread performing release.
- 1275 *
- 1276 * <p>The default implementation throws
- 1277 * {@link UnsupportedOperationException}.
- 1278 *
- 1279 * @param arg the release argument. This value is always the one
- 1280 * passed to a release method, or the current state value upon
- 1281 * entry to a condition wait. The value is otherwise
- 1282 * uninterpreted and can represent anything you like.
- 1283 * @return {@code true} if this release of shared mode may permit a
- 1284 * waiting acquire (shared or exclusive) to succeed; and
- 1285 * {@code false} otherwise
- 1286 * @throws IllegalMonitorStateException if releasing would place this
- 1287 * synchronizer in an illegal state. This exception must be
- 1288 * thrown in a consistent fashion for synchronization to work
- 1289 * correctly.
- 1290 * @throws UnsupportedOperationException if shared mode is not supported
- 1291 */
- 1292 protected boolean tryReleaseShared(int arg) {
- 1293 throw new UnsupportedOperationException();
- 1294 }
- 1295
- 1296 /**
- 1297 * Returns {@code true} if synchronization is held exclusively with
- 1298 * respect to the current (calling) thread. This method is invoked
- 1299 * upon each call to a non-waiting {@link ConditionObject} method.
- 1300 * (Waiting methods instead invoke {@link #release}.)
- 1301 *
- 1302 * <p>The default implementation throws {@link
- 1303 * UnsupportedOperationException}. This method is invoked
- 1304 * internally only within {@link ConditionObject} methods, so need
- 1305 * not be defined if conditions are not used.
- 1306 *
- 1307 * @return {@code true} if synchronization is held exclusively;
- 1308 * {@code false} otherwise
- 1309 * @throws UnsupportedOperationException if conditions are not supported
- 1310 */
- 1311 protected boolean isHeldExclusively() {
- 1312 throw new UnsupportedOperationException();
- 1313 }
- 1314
- 1315 /**
- 1316 * Acquires in exclusive mode, ignoring interrupts. Implemented
- 1317 * by invoking at least once {@link #tryAcquire},
- 1318 * returning on success. Otherwise the thread is queued, possibly
- 1319 * repeatedly blocking and unblocking, invoking {@link
- 1320 * #tryAcquire} until success. This method can be used
- 1321 * to implement method {@link Lock#lock}.
- 1322 *
- 1323 * @param arg the acquire argument. This value is conveyed to
- 1324 * {@link #tryAcquire} but is otherwise uninterpreted and
- 1325 * can represent anything you like.
- 1326 */
- 1327 public final void acquire(int arg) {
- 1328 /*加塞抢占共享资源(因为同步队列中可能还有其它节点等待),获取成功则直接返回*/
- 1329 if (!tryAcquire(arg)
- 1330 /* 1、抢占共享资源失败,则将当前节点放入到同步队列的尾部,并标记为独占模式;
- 1331 * 2、使线程阻塞在同步队列中获取资源,直到获取成功才返回;如果整个过程中被中断过就返回true,否则就返回false;*/
- 1332 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
- 1333 /*阻塞获取资源的过程中是不响应线程中断的,内部进行了中断检测,所以这块进行线程中断*/
- 1334 selfInterrupt();
- 1335 }
- 1336 }
- 1337
- 1338 /**
- 1339 * Acquires in exclusive mode, aborting if interrupted.
- 1340 * Implemented by first checking interrupt status, then invoking
- 1341 * at least once {@link #tryAcquire}, returning on
- 1342 * success. Otherwise the thread is queued, possibly repeatedly
- 1343 * blocking and unblocking, invoking {@link #tryAcquire}
- 1344 * until success or the thread is interrupted. This method can be
- 1345 * used to implement method {@link Lock#lockInterruptibly}.
- 1346 *
- 1347 * @param arg the acquire argument. This value is conveyed to
- 1348 * {@link #tryAcquire} but is otherwise uninterpreted and
- 1349 * can represent anything you like.
- 1350 * @throws InterruptedException if the current thread is interrupted
- 1351 */
- 1352 public final void acquireInterruptibly(int arg)
- 1353 throws InterruptedException {
- 1354 if (Thread.interrupted())
- 1355 throw new InterruptedException();
- 1356 if (!tryAcquire(arg))
- 1357 doAcquireInterruptibly(arg);
- 1358 }
- 1359
- 1360 /**
- 1361 * Attempts to acquire in exclusive mode, aborting if interrupted,
- 1362 * and failing if the given timeout elapses. Implemented by first
- 1363 * checking interrupt status, then invoking at least once {@link
- 1364 * #tryAcquire}, returning on success. Otherwise, the thread is
- 1365 * queued, possibly repeatedly blocking and unblocking, invoking
- 1366 * {@link #tryAcquire} until success or the thread is interrupted
- 1367 * or the timeout elapses. This method can be used to implement
- 1368 * method {@link Lock#tryLock(long, TimeUnit)}.
- 1369 *
- 1370 * @param arg the acquire argument. This value is conveyed to
- 1371 * {@link #tryAcquire} but is otherwise uninterpreted and
- 1372 * can represent anything you like.
- 1373 * @param nanosTimeout the maximum number of nanoseconds to wait
- 1374 * @return {@code true} if acquired; {@code false} if timed out
- 1375 * @throws InterruptedException if the current thread is interrupted
- 1376 */
- 1377 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
- 1378 throws InterruptedException {
- 1379 //当前线程被中断,直接抛出异常
- 1380 if (Thread.interrupted())
- 1381 throw new InterruptedException();
- 1382 //尝试直接获取锁,获取成功就直接返回
- 1383 return tryAcquire(arg) ||
- 1384 //否则就“阻塞”指定时间,再尝试获取锁
- 1385 doAcquireNanos(arg, nanosTimeout);
- 1386 }
- 1387
- 1388 /**
- 1389 * Releases in exclusive mode. Implemented by unblocking one or
- 1390 * more threads if {@link #tryRelease} returns true.
- 1391 * This method can be used to implement method {@link Lock#unlock}.
- 1392 *
- 1393 * @param arg the release argument. This value is conveyed to
- 1394 * {@link #tryRelease} but is otherwise uninterpreted and
- 1395 * can represent anything you like.
- 1396 * @return the value returned from {@link #tryRelease}
- 1397 */
- 1398 public final boolean release(int arg) {
- 1399 if (tryRelease(arg)) {
- 1400 Node h = head;
- 1401 //当前线程已经释放锁,但是sync queue同步队列不为空,并且头结点head未被取消
- 1402 if (h != null && h.waitStatus != 0)
- 1403 //说明还有其它线程等待竞争锁,则尝试唤醒sync queue队列中等待锁的线程
- 1404 unparkSuccessor(h);
- 1405 return true;
- 1406 }
- 1407 return false;
- 1408 }
- 1409
- 1410 /**
- 1411 * Acquires in shared mode, ignoring interrupts. Implemented by
- 1412 * first invoking at least once {@link #tryAcquireShared},
- 1413 * returning on success. Otherwise the thread is queued, possibly
- 1414 * repeatedly blocking and unblocking, invoking {@link
- 1415 * #tryAcquireShared} until success.
- 1416 *
- 1417 * @param arg the acquire argument. This value is conveyed to
- 1418 * {@link #tryAcquireShared} but is otherwise uninterpreted
- 1419 * and can represent anything you like.
- 1420 */
- 1421 public final void acquireShared(int arg) {
- 1422 if (tryAcquireShared(arg) < 0)
- 1423 doAcquireShared(arg);
- 1424 }
- 1425
- 1426 /**
- 1427 * Acquires in shared mode, aborting if interrupted. Implemented
- 1428 * by first checking interrupt status, then invoking at least once
- 1429 * {@link #tryAcquireShared}, returning on success. Otherwise the
- 1430 * thread is queued, possibly repeatedly blocking and unblocking,
- 1431 * invoking {@link #tryAcquireShared} until success or the thread
- 1432 * is interrupted.
- 1433 * @param arg the acquire argument.
- 1434 * This value is conveyed to {@link #tryAcquireShared} but is
- 1435 * otherwise uninterpreted and can represent anything
- 1436 * you like.
- 1437 * @throws InterruptedException if the current thread is interrupted
- 1438 */
- 1439 public final void acquireSharedInterruptibly(int arg)
- 1440 throws InterruptedException {
- 1441 //当前线程已经被中断,则直接抛出InterruptedException异常
- 1442 if (Thread.interrupted())
- 1443 throw new InterruptedException();
- 1444 if (tryAcquireShared(arg) < 0)
- 1445 doAcquireSharedInterruptibly(arg);
- 1446 }
- 1447
- 1448 /**
- 1449 * Attempts to acquire in shared mode, aborting if interrupted, and
- 1450 * failing if the given timeout elapses. Implemented by first
- 1451 * checking interrupt status, then invoking at least once {@link
- 1452 * #tryAcquireShared}, returning on success. Otherwise, the
- 1453 * thread is queued, possibly repeatedly blocking and unblocking,
- 1454 * invoking {@link #tryAcquireShared} until success or the thread
- 1455 * is interrupted or the timeout elapses.
- 1456 *
- 1457 * @param arg the acquire argument. This value is conveyed to
- 1458 * {@link #tryAcquireShared} but is otherwise uninterpreted
- 1459 * and can represent anything you like.
- 1460 * @param nanosTimeout the maximum number of nanoseconds to wait
- 1461 * @return {@code true} if acquired; {@code false} if timed out
- 1462 * @throws InterruptedException if the current thread is interrupted
- 1463 */
- 1464 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
- 1465 throws InterruptedException {
- 1466 if (Thread.interrupted())
- 1467 throw new InterruptedException();
- 1468 return tryAcquireShared(arg) >= 0 ||
- 1469 doAcquireSharedNanos(arg, nanosTimeout);
- 1470 }
- 1471
- 1472 /**
- 1473 * Releases in shared mode. Implemented by unblocking one or more
- 1474 * threads if {@link #tryReleaseShared} returns true.
- 1475 *
- 1476 * @param arg the release argument. This value is conveyed to
- 1477 * {@link #tryReleaseShared} but is otherwise uninterpreted
- 1478 * and can represent anything you like.
- 1479 * @return the value returned from {@link #tryReleaseShared}
- 1480 */
- 1481 public final boolean releaseShared(int arg) {
- 1482 //尝试释放锁,释放成功,则返回true
- 1483 if (tryReleaseShared(arg)) {
- 1484 //唤醒同步队列中,等待获取锁的线程
- 1485 doReleaseShared();
- 1486 return true;
- 1487 }
- 1488 return false;
- 1489 }
- 1490
- 1491 // Queue inspection methods
- 1492
- 1493 /**
- 1494 * Queries whether any threads are waiting to acquire. Note that
- 1495 * because cancellations due to interrupts and timeouts may occur
- 1496 * at any time, a {@code true} return does not guarantee that any
- 1497 * other thread will ever acquire.
- 1498 *
- 1499 * <p>In this implementation, this operation returns in
- 1500 * constant time.
- 1501 *
- 1502 * @return {@code true} if there may be other threads waiting to acquire
- 1503 */
- 1504 public final boolean hasQueuedThreads() {
- 1505 return head != tail;
- 1506 }
- 1507
- 1508 /**
- 1509 * Queries whether any threads have ever contended to acquire this
- 1510 * synchronizer; that is if an acquire method has ever blocked.
- 1511 *
- 1512 * <p>In this implementation, this operation returns in
- 1513 * constant time.
- 1514 *
- 1515 * @return {@code true} if there has ever been contention
- 1516 */
- 1517 public final boolean hasContended() {
- 1518 return head != null;
- 1519 }
- 1520
- 1521 /**
- 1522 * Returns the first (longest-waiting) thread in the queue, or
- 1523 * {@code null} if no threads are currently queued.
- 1524 *
- 1525 * <p>In this implementation, this operation normally returns in
- 1526 * constant time, but may iterate upon contention if other threads are
- 1527 * concurrently modifying the queue.
- 1528 *
- 1529 * @return the first (longest-waiting) thread in the queue, or
- 1530 * {@code null} if no threads are currently queued
- 1531 */
- 1532 public final Thread getFirstQueuedThread() {
- 1533 // handle only fast path, else relay
- 1534 return (head == tail) ? null : fullGetFirstQueuedThread();
- 1535 }
- 1536
- 1537 /**
- 1538 * Version of getFirstQueuedThread called when fastpath fails
- 1539 */
- 1540 private Thread fullGetFirstQueuedThread() {
- 1541 /*
- 1542 * The first node is normally head.next. Try to get its
- 1543 * thread field, ensuring consistent reads: If thread
- 1544 * field is nulled out or s.prev is no longer head, then
- 1545 * some other thread(s) concurrently performed setHead in
- 1546 * between some of our reads. We try this twice before
- 1547 * resorting to traversal.
- 1548 */
- 1549 Node h, s;
- 1550 Thread st;
- 1551 if (((h = head) != null && (s = h.next) != null &&
- 1552 s.prev == head && (st = s.thread) != null) ||
- 1553 ((h = head) != null && (s = h.next) != null &&
- 1554 s.prev == head && (st = s.thread) != null))
- 1555 return st;
- 1556
- 1557 /*
- 1558 * Head's next field might not have been set yet, or may have
- 1559 * been unset after setHead. So we must check to see if tail
- 1560 * is actually first node. If not, we continue on, safely
- 1561 * traversing from tail back to head to find first,
- 1562 * guaranteeing termination.
- 1563 */
- 1564
- 1565 Node t = tail;
- 1566 Thread firstThread = null;
- 1567 while (t != null && t != head) {
- 1568 Thread tt = t.thread;
- 1569 if (tt != null)
- 1570 firstThread = tt;
- 1571 t = t.prev;
- 1572 }
- 1573 return firstThread;
- 1574 }
- 1575
- 1576 /**
- 1577 * Returns true if the given thread is currently queued.
- 1578 *
- 1579 * <p>This implementation traverses the queue to determine
- 1580 * presence of the given thread.
- 1581 *
- 1582 * @param thread the thread
- 1583 * @return {@code true} if the given thread is on the queue
- 1584 * @throws NullPointerException if the thread is null
- 1585 */
- 1586 public final boolean isQueued(Thread thread) {
- 1587 if (thread == null)
- 1588 throw new NullPointerException();
- 1589 for (Node p = tail; p != null; p = p.prev)
- 1590 if (p.thread == thread)
- 1591 return true;
- 1592 return false;
- 1593 }
- 1594
- 1595 /**
- 1596 * Returns {@code true} if the apparent first queued thread, if one
- 1597 * exists, is waiting in exclusive mode. If this method returns
- 1598 * {@code true}, and the current thread is attempting to acquire in
- 1599 * shared mode (that is, this method is invoked from {@link
- 1600 * #tryAcquireShared}) then it is guaranteed that the current thread
- 1601 * is not the first queued thread. Used only as a heuristic in
- 1602 * ReentrantReadWriteLock.
- 1603 */
- 1604 final boolean apparentlyFirstQueuedIsExclusive() {
- 1605 Node h, s;
- 1606 return (h = head) != null &&
- 1607 (s = h.next) != null &&
- 1608 !s.isShared() &&
- 1609 s.thread != null;
- 1610 }
- 1611
- 1612 /**
- 1613 * Queries whether any threads have been waiting to acquire longer
- 1614 * than the current thread.
- 1615 *
- 1616 * <p>An invocation of this method is equivalent to (but may be
- 1617 * more efficient than):
- 1618 * <pre> {@code
- 1619 * getFirstQueuedThread() != Thread.currentThread() &&
- 1620 * hasQueuedThreads()}</pre>
- 1621 *
- 1622 * <p>Note that because cancellations due to interrupts and
- 1623 * timeouts may occur at any time, a {@code true} return does not
- 1624 * guarantee that some other thread will acquire before the current
- 1625 * thread. Likewise, it is possible for another thread to win a
- 1626 * race to enqueue after this method has returned {@code false},
- 1627 * due to the queue being empty.
- 1628 *
- 1629 * <p>This method is designed to be used by a fair synchronizer to
- 1630 * avoid <a target="_blank" href="https://www.cnblogs.com/AbstractQueuedSynchronizer#barging">barging</a>.
- 1631 * Such a synchronizer's {@link #tryAcquire} method should return
- 1632 * {@code false}, and its {@link #tryAcquireShared} method should
- 1633 * return a negative value, if this method returns {@code true}
- 1634 * (unless this is a reentrant acquire). For example, the {@code
- 1635 * tryAcquire} method for a fair, reentrant, exclusive mode
- 1636 * synchronizer might look like this:
- 1637 *
- 1638 * <pre> {@code
- 1639 * protected boolean tryAcquire(int arg) {
- 1640 * if (isHeldExclusively()) {
- 1641 * // A reentrant acquire; increment hold count
- 1642 * return true;
- 1643 * } else if (hasQueuedPredecessors()) {
- 1644 * return false;
- 1645 * } else {
- 1646 * // try to acquire normally
- 1647 * }
- 1648 * }}</pre>
- 1649 *
- 1650 * @return {@code true} if there is a queued thread preceding the
- 1651 * current thread, and {@code false} if the current thread
- 1652 * is at the head of the queue or the queue is empty
- 1653 * @since 1.7
- 1654 */
- 1655 public final boolean hasQueuedPredecessors() {
- 1656 // The correctness of this depends on head being initialized
- 1657 // before tail and on head.next being accurate if the current
- 1658 // thread is first in queue.
- 1659 Node t = tail; // Read fields in reverse initialization order
- 1660 Node h = head;
- 1661 Node s;
- 1662 return h != t &&
- 1663 ((s = h.next) == null || s.thread != Thread.currentThread());
- 1664 }
- 1665
- 1666
- 1667 // Instrumentation and monitoring methods
- 1668
- 1669 /**
- 1670 * Returns an estimate of the number of threads waiting to
- 1671 * acquire. The value is only an estimate because the number of
- 1672 * threads may change dynamically while this method traverses
- 1673 * internal data structures. This method is designed for use in
- 1674 * monitoring system state, not for synchronization
- 1675 * control.
- 1676 *
- 1677 * @return the estimated number of threads waiting to acquire
- 1678 */
- 1679 public final int getQueueLength() {
- 1680 int n = 0;
- 1681 for (Node p = tail; p != null; p = p.prev) {
- 1682 if (p.thread != null)
- 1683 ++n;
- 1684 }
- 1685 return n;
- 1686 }
- 1687
- 1688 /**
- 1689 * Returns a collection containing threads that may be waiting to
- 1690 * acquire. Because the actual set of threads may change
- 1691 * dynamically while constructing this result, the returned
- 1692 * collection is only a best-effort estimate. The elements of the
- 1693 * returned collection are in no particular order. This method is
- 1694 * designed to facilitate construction of subclasses that provide
- 1695 * more extensive monitoring facilities.
- 1696 *
- 1697 * @return the collection of threads
- 1698 */
- 1699 public final Collection<Thread> getQueuedThreads() {
- 1700 ArrayList<Thread> list = new ArrayList<Thread>();
- 1701 for (Node p = tail; p != null; p = p.prev) {
- 1702 Thread t = p.thread;
- 1703 if (t != null)
- 1704 list.add(t);
- 1705 }
- 1706 return list;
- 1707 }
- 1708
- 1709 /**
- 1710 * Returns a collection containing threads that may be waiting to
- 1711 * acquire in exclusive mode. This has the same properties
- 1712 * as {@link #getQueuedThreads} except that it only returns
- 1713 * those threads waiting due to an exclusive acquire.
- 1714 *
- 1715 * @return the collection of threads
- 1716 */
- 1717 public final Collection<Thread> getExclusiveQueuedThreads() {
- 1718 ArrayList<Thread> list = new ArrayList<Thread>();
- 1719 for (Node p = tail; p != null; p = p.prev) {
- 1720 if (!p.isShared()) {
- 1721 Thread t = p.thread;
- 1722 if (t != null)
- 1723 list.add(t);
- 1724 }
- 1725 }
- 1726 return list;
- 1727 }
- 1728
- 1729 /**
- 1730 * Returns a collection containing threads that may be waiting to
- 1731 * acquire in shared mode. This has the same properties
- 1732 * as {@link #getQueuedThreads} except that it only returns
- 1733 * those threads waiting due to a shared acquire.
- 1734 *
- 1735 * @return the collection of threads
- 1736 */
- 1737 public final Collection<Thread> getSharedQueuedThreads() {
- 1738 ArrayList<Thread> list = new ArrayList<Thread>();
- 1739 for (Node p = tail; p != null; p = p.prev) {
- 1740 if (p.isShared()) {
- 1741 Thread t = p.thread;
- 1742 if (t != null)
- 1743 list.add(t);
- 1744 }
- 1745 }
- 1746 return list;
- 1747 }
- 1748
- 1749 /**
- 1750 * Returns a string identifying this synchronizer, as well as its state.
- 1751 * The state, in brackets, includes the String {@code "State ="}
- 1752 * followed by the current value of {@link #getState}, and either
- 1753 * {@code "nonempty"} or {@code "empty"} depending on whether the
- 1754 * queue is empty.
- 1755 *
- 1756 * @return a string identifying this synchronizer, as well as its state
- 1757 */
- 1758 public String toString() {
- 1759 int s = getState();
- 1760 String q = hasQueuedThreads() ? "non" : "";
- 1761 return super.toString() +
- 1762 "[State = " + s + ", " + q + "empty queue]";
- 1763 }
- 1764
- 1765
- 1766 // Internal support methods for Conditions
- 1767
- 1768 /**
- 1769 * Returns true if a node, always one that was initially placed on
- 1770 * a condition queue, is now waiting to reacquire on sync queue.
- 1771 * @param node the node
- 1772 * @return true if is reacquiring
- 1773 */
- 1774 //判断当前线程是否在sync queue同步队列中。
- 1775 final boolean isOnSyncQueue(Node node) {
- 1776 /**
- 1777 * 1、当前节点node的状态waitStatus为Node.CONDITION,则其必定还在条件队列condition中;
- 1778 * 2、如果当前节点node的node.prev == null,说明当前节点还未被加入到sync queue队列中,因为node节点要加入到sync queue中(无论本次加入是否成功),则本次尝试必有node.prev != null;
- 1779 */
- 1780 if (node.waitStatus == Node.CONDITION || node.prev == null)
- 1781 return false;
- 1782 //如果当前节点node.next != null,说明其已经有后继节点了,则其必定在sync queue同步队列中
- 1783 if (node.next != null) // If has successor, it must be on queue
- 1784 return true;
- 1785 /*
- 1786 * node.prev can be non-null, but not yet on queue because
- 1787 * the CAS to place it on queue can fail. So we have to
- 1788 * traverse from tail to make sure it actually made it. It
- 1789 * will always be near the tail in calls to this method, and
- 1790 * unless the CAS failed (which is unlikely), it will be
- 1791 * there, so we hardly ever traverse much.
- 1792 */
- 1793 /**
- 1794 * 能走到这里来,说明当前节点node.waitStatus != Node.CONDITION && node.prev != null && node.next == null
- 1795 * ,说明已经开始尝试将当前节点node加入到sync queue同步队列中,已经完成了node.prev = tail,但是进行接下来的CAS操作设置sync queue的尾节点,有可能失败
- 1796 * 导致当前节点入队失败,所以从sync queue的尾节点开始遍历,进一步确认当前节点是否成功加入到了sync queue中。
- 1797 */
- 1798 return findNodeFromTail(node);
- 1799 }
- 1800
- 1801 /**
- 1802 * Returns true if node is on sync queue by searching backwards from tail.
- 1803 * Called only when needed by isOnSyncQueue.
- 1804 * @return true if present
- 1805 */
- 1806 //判断指定的node节点是否在sync queue同步队列中
- 1807 private boolean findNodeFromTail(Node node) {
- 1808 Node t = tail;
- 1809 //从链表的尾部开始遍历
- 1810 for (;;) {
- 1811 //遍历到的当前节点等于指定的节点,则返回true
- 1812 if (t == node)
- 1813 return true;
- 1814 //遍历到的当前节点已经为空,说明链表已经遍历结束,未找到指定的节点,则返回false
- 1815 if (t == null)
- 1816 return false;
- 1817 //移动指针,进入下一次自旋遍历
- 1818 t = t.prev;
- 1819 }
- 1820 }
- 1821
- 1822 /**
- 1823 * Transfers a node from a condition queue onto sync queue.
- 1824 * Returns true if successful.
- 1825 * @param node the node
- 1826 * @return true if successfully transferred (else the node was
- 1827 * cancelled before signal)
- 1828 */
- 1829 final boolean transferForSignal(Node node) {
- 1830 /*
- 1831 * If cannot change waitStatus, the node has been cancelled.
- 1832 */
- 1833 //调用signal方法的时候,当前node节点已经取消了等待,则忽略这个节点
- 1834 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- 1835 return false;
- 1836
- 1837 /*
- 1838 * Splice onto queue and try to set waitStatus of predecessor to
- 1839 * indicate that thread is (probably) waiting. If cancelled or
- 1840 * attempt to set waitStatus fails, wake up to resync (in which
- 1841 * case the waitStatus can be transiently and harmlessly wrong).
- 1842 */
- 1843 //如果这个节点node在条件队列condition中正常等待,则将node加入到sync queue同步队列中,并返回这个新结点的前驱结点
- 1844 Node p = enq(node);
- 1845 int ws = p.waitStatus;
- 1846 /**
- 1847 * 同步队列sync queue中的节点是通过前驱结点来唤醒的,前驱结点已经被取消,或者前驱结点的waitStatus设置Node.SIGNAL失败,
- 1848 * 后继节点不能通过前驱结点正常唤醒,则直接唤醒这个刚刚加入到同步队列sync queue中的节点
- 1849 */
- 1850 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- 1851 LockSupport.unpark(node.thread);
- 1852 return true;
- 1853 }
- 1854
- 1855 /**
- 1856 * Transfers node, if necessary, to sync queue after a cancelled wait.
- 1857 * Returns true if thread was cancelled before being signalled.
- 1858 *
- 1859 * @param node the node
- 1860 * @return true if cancelled before the node was signalled
- 1861 */
- 1862 final boolean transferAfterCancelledWait(Node node) {
- 1863 /**
- 1864 * 判断一个node是否被signal()过,最简单有效的方式就是是否离开了condition条件队列,进入到了sync queue同步队列中
- 1865 */
- 1866 //如果一个节点的状态还是Node.CONDITION,说明它还未被signal过,是因为中断导致了它被唤醒(从condition条件队列转移到sync queue同步队列中)
- 1867 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
- 1868 //将当前被唤醒的节点加入到同步队列sync queue中,准备竞争锁,但是还没有将这个节点从条件队列中移除,所以后面的处理一下
- 1869 enq(node);
- 1870 return true;
- 1871 }
- 1872 /*
- 1873 * If we lost out to a signal(), then we can't proceed
- 1874 * until it finishes its enq(). Cancelling during an
- 1875 * incomplete transfer is both rare and transient, so just
- 1876 * spin.
- 1877 */
- 1878 while (!isOnSyncQueue(node))
- 1879 Thread.yield();
- 1880 return false;
- 1881 }
- 1882
- 1883 /**
- 1884 * Invokes release with current state value; returns saved state.
- 1885 * Cancels node and throws exception on failure.
- 1886 * @param node the condition node for this wait
- 1887 * @return previous sync state
- 1888 */
- 1889 //完全释放线程所占用的锁
- 1890 final int fullyRelease(Node node) {
- 1891 boolean failed = true;
- 1892 try {
- 1893 //获取当前锁状态
- 1894 int savedState = getState();
- 1895 //完全释放锁,对于重入锁而言,无论重入了几次都一次释放;同时持有锁的线程不是当前线程时候,也会直接抛出IllegalMonitorStateException异常,直接进入finally块
- 1896 if (release(savedState)) {
- 1897 failed = false;
- 1898 return savedState;
- 1899 } else {
- 1900 throw new IllegalMonitorStateException();
- 1901 }
- 1902 } finally {
- 1903 /**
- 1904 * 如果释放锁失败,则将刚才新加入到条件队列尾节点的node节点的waitStatus设置为Node.CANCELLED,
- 1905 * 后序其它线程被包装成Node节点加入到条件队列中时候,会将这些已经被取消的节点移除
- 1906 */
- 1907 if (failed)
- 1908 node.waitStatus = Node.CANCELLED;
- 1909 }
- 1910 }
- 1911
- 1912 // Instrumentation methods for conditions
- 1913
- 1914 /**
- 1915 * Queries whether the given ConditionObject
- 1916 * uses this synchronizer as its lock.
- 1917 *
- 1918 * @param condition the condition
- 1919 * @return {@code true} if owned
- 1920 * @throws NullPointerException if the condition is null
- 1921 */
- 1922 public final boolean owns(ConditionObject condition) {
- 1923 return condition.isOwnedBy(this);
- 1924 }
- 1925
- 1926 /**
- 1927 * Queries whether any threads are waiting on the given condition
- 1928 * associated with this synchronizer. Note that because timeouts
- 1929 * and interrupts may occur at any time, a {@code true} return
- 1930 * does not guarantee that a future {@code signal} will awaken
- 1931 * any threads. This method is designed primarily for use in
- 1932 * monitoring of the system state.
- 1933 *
- 1934 * @param condition the condition
- 1935 * @return {@code true} if there are any waiting threads
- 1936 * @throws IllegalMonitorStateException if exclusive synchronization
- 1937 * is not held
- 1938 * @throws IllegalArgumentException if the given condition is
- 1939 * not associated with this synchronizer
- 1940 * @throws NullPointerException if the condition is null
- 1941 */
- 1942 public final boolean hasWaiters(ConditionObject condition) {
- 1943 if (!owns(condition))
- 1944 throw new IllegalArgumentException("Not owner");
- 1945 return condition.hasWaiters();
- 1946 }
- 1947
- 1948 /**
- 1949 * Returns an estimate of the number of threads waiting on the
- 1950 * given condition associated with this synchronizer. Note that
- 1951 * because timeouts and interrupts may occur at any time, the
- 1952 * estimate serves only as an upper bound on the actual number of
- 1953 * waiters. This method is designed for use in monitoring of the
- 1954 * system state, not for synchronization control.
- 1955 *
- 1956 * @param condition the condition
- 1957 * @return the estimated number of waiting threads
- 1958 * @throws IllegalMonitorStateException if exclusive synchronization
- 1959 * is not held
- 1960 * @throws IllegalArgumentException if the given condition is
- 1961 * not associated with this synchronizer
- 1962 * @throws NullPointerException if the condition is null
- 1963 */
- 1964 public final int getWaitQueueLength(ConditionObject condition) {
- 1965 if (!owns(condition))
- 1966 throw new IllegalArgumentException("Not owner");
- 1967 return condition.getWaitQueueLength();
- 1968 }
- 1969
- 1970 /**
- 1971 * Returns a collection containing those threads that may be
- 1972 * waiting on the given condition associated with this
- 1973 * synchronizer. Because the actual set of threads may change
- 1974 * dynamically while constructing this result, the returned
- 1975 * collection is only a best-effort estimate. The elements of the
- 1976 * returned collection are in no particular order.
- 1977 *
- 1978 * @param condition the condition
- 1979 * @return the collection of threads
- 1980 * @throws IllegalMonitorStateException if exclusive synchronization
- 1981 * is not held
- 1982 * @throws IllegalArgumentException if the given condition is
- 1983 * not associated with this synchronizer
- 1984 * @throws NullPointerException if the condition is null
- 1985 */
- 1986 public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
- 1987 if (!owns(condition))
- 1988 throw new IllegalArgumentException("Not owner");
- 1989 return condition.getWaitingThreads();
- 1990 }
- 1991
- 1992 /**
- 1993 * Condition implementation for a {@link
- 1994 * AbstractQueuedSynchronizer} serving as the basis of a {@link
- 1995 * Lock} implementation.
- 1996 *
- 1997 * <p>Method documentation for this class describes mechanics,
- 1998 * not behavioral specifications from the point of view of Lock
- 1999 * and Condition users. Exported versions of this class will in
- 2000 * general need to be accompanied by documentation describing
- 2001 * condition semantics that rely on those of the associated
- 2002 * {@code AbstractQueuedSynchronizer}.
- 2003 *
- 2004 * <p>This class is Serializable, but all fields are transient,
- 2005 * so deserialized conditions have no waiters.
- 2006 */
- 2007 public class ConditionObject implements Condition, java.io.Serializable {
- 2008 private static final long serialVersionUID = 1173984872572414699L;
- 2009 /** First node of condition queue. */
- 2010 private transient Node firstWaiter;
- 2011 /** Last node of condition queue. */
- 2012 private transient Node lastWaiter;
- 2013
- 2014 /**
- 2015 * Creates a new {@code ConditionObject} instance.
- 2016 */
- 2017 public ConditionObject() { }
- 2018
- 2019 // Internal methods
- 2020
- 2021 /**
- 2022 * Adds a new waiter to wait queue.
- 2023 * @return its new wait node
- 2024 */
- 2025 //将当前线程包装成Node节点,加入到条件队列中
- 2026 private Node addConditionWaiter() {
- 2027 //条件队列的尾节点
- 2028 Node t = lastWaiter;
- 2029 // If lastWaiter is cancelled, clean out.
- 2030 //条件尾节点不为空,说明条件队列不为空,但是它的尾节点的waitStatus已经不是Node.CONDITION了,说明其已经取消了,则清除掉条件队列中已经取消的节点
- 2031 if (t != null && t.waitStatus != Node.CONDITION) {
- 2032 //清除掉条件队列中已经取消的节点
- 2033 unlinkCancelledWaiters();
- 2034 t = lastWaiter;
- 2035 }
- 2036 //将当前线程包装成Node节点,并将新的node节点的状态设置为Node.CONDITION
- 2037 Node node = new Node(Thread.currentThread(), Node.CONDITION);
- 2038 //如果条件队列为空,则将条件队列的头结点firstWaiter指向刚创建出来的node节点
- 2039 if (t == null)
- 2040 firstWaiter = node;
- 2041 else
- 2042 //否则, 将条件队列旧的尾节点的nextWaiter指向新创建的node节点
- 2043 t.nextWaiter = node;
- 2044 //将新创建的node节点设置为条件队列的尾节点
- 2045 lastWaiter = node;
- 2046 return node;
- 2047 }
- 2048
- 2049 /**
- 2050 * Removes and transfers nodes until hit non-cancelled one or
- 2051 * null. Split out from signal in part to encourage compilers
- 2052 * to inline the case of no waiters.
- 2053 * @param first (non-null) the first node on condition queue
- 2054 */
- 2055 //唤醒条件队列中第一个未被取消的节点
- 2056 private void doSignal(Node first) {
- 2057 do {
- 2058 //如果条件队列中只有一个节点,则将条件队列清空
- 2059 if ( (firstWaiter = first.nextWaiter) == null)
- 2060 //则将条件队列的尾节点lastWaiter也置为null
- 2061 lastWaiter = null;
- 2062 //将当前要被唤醒的节点从条件队列中移除
- 2063 first.nextWaiter = null;
- 2064 //当前节点已经被取消了,并且条件队列中还有节点,则进行下一次循环
- 2065 } while (!transferForSignal(first) &&
- 2066 (first = firstWaiter) != null);
- 2067 }
- 2068
- 2069 /**
- 2070 * Removes and transfers all nodes.
- 2071 * @param first (non-null) the first node on condition queue
- 2072 */
- 2073 private void doSignalAll(Node first) {
- 2074 //清空条件队列
- 2075 lastWaiter = firstWaiter = null;
- 2076 do {
- 2077 //获取条件队列头结点的下一个结点
- 2078 Node next = first.nextWaiter;
- 2079 //将条件队列的头结点从链表上移除
- 2080 first.nextWaiter = null;
- 2081 //将这个刚从条件队列condition中移除的节点转移到sync queue同步队列中
- 2082 transferForSignal(first);
- 2083 //修改指针,准备继续下一次的遍历
- 2084 first = next;
- 2085 //first != null表示链表还未遍历完成,继续下次遍历
- 2086 } while (first != null);
- 2087 }
- 2088
- 2089 /**
- 2090 * Unlinks cancelled waiter nodes from condition queue.
- 2091 * Called only while holding lock. This is called when
- 2092 * cancellation occurred during condition wait, and upon
- 2093 * insertion of a new waiter when lastWaiter is seen to have
- 2094 * been cancelled. This method is needed to avoid garbage
- 2095 * retention in the absence of signals. So even though it may
- 2096 * require a full traversal, it comes into play only when
- 2097 * timeouts or cancellations occur in the absence of
- 2098 * signals. It traverses all nodes rather than stopping at a
- 2099 * particular target to unlink all pointers to garbage nodes
- 2100 * without requiring many re-traversals during cancellation
- 2101 * storms.
- 2102 */
- 2103 private void unlinkCancelledWaiters() {
- 2104 //当前节点,从表示条件队列链表的头结点firstWaiter开始遍历
- 2105 Node t = firstWaiter;
- 2106 //表示条件队列中最后一个waitStatus不为Node.CANCELLED的节点
- 2107 Node trail = null;
- 2108 //条件队列不为空
- 2109 while (t != null) {
- 2110 //当前节点的后继节点
- 2111 Node next = t.nextWaiter;
- 2112 //如果当前节点t已经取消了正常等待,则将节点t从条件队列中移除
- 2113 if (t.waitStatus != Node.CONDITION) {
- 2114 //将当前节点从条件队列移除
- 2115 t.nextWaiter = null;
- 2116 //trail == null表示条件队列当前节点之前的都被取消了,所以将条件队列的头结点firstWaiter重置指向next
- 2117 if (trail == null)
- 2118 firstWaiter = next;
- 2119 else
- 2120 //当前节点已经被取消,trail != null,则需要将trail与当前节点断开,指向next;
- 2121 trail.nextWaiter = next;
- 2122 //如果next == null说明队列已经完成遍历,将原来的条件队列中最后一个正常等待的节点赋值给lastWaiter;
- 2123 if (next == null)
- 2124 lastWaiter = trail;
- 2125 }
- 2126 else{
- 2127 //如果当前节点未被取消,就将当前节点记为队列中最后一个未被取消的节点,令trail = t;
- 2128 trail = t;
- 2129 }
- 2130 //修改当前节点的指针,指向下一个节点next
- 2131 t = next;
- 2132 }
- 2133 }
- 2134
- 2135 // public methods
- 2136
- 2137 /**
- 2138 * Moves the longest-waiting thread, if one exists, from the
- 2139 * wait queue for this condition to the wait queue for the
- 2140 * owning lock.
- 2141 *
- 2142 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
- 2143 * returns {@code false}
- 2144 */
- 2145 public final void signal() {
- 2146 //判断当前线程是否是持有锁的线程,如果不是则直接抛出IllegalMonitorStateException异常
- 2147 if (!isHeldExclusively())
- 2148 throw new IllegalMonitorStateException();
- 2149 Node first = firstWaiter;
- 2150 //条件队列condition不为空
- 2151 if (first != null)
- 2152 //唤醒条件队列condition中的头结点
- 2153 doSignal(first);
- 2154 }
- 2155
- 2156 /**
- 2157 * Moves all threads from the wait queue for this condition to
- 2158 * the wait queue for the owning lock.
- 2159 *
- 2160 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
- 2161 * returns {@code false}
- 2162 */
- 2163 public final void signalAll() {
- 2164 //判断当前线程是否是持有锁的线程,如果不是,则直接抛出IllegalMonitorStateException异常
- 2165 if (!isHeldExclusively())
- 2166 throw new IllegalMonitorStateException();
- 2167 Node first = firstWaiter;
- 2168 //判断条件队列是否为空
- 2169 if (first != null)
- 2170 //唤醒条件队列中进行等待的节点
- 2171 doSignalAll(first);
- 2172 }
- 2173
- 2174 /**
- 2175 * Implements uninterruptible condition wait.
- 2176 * <ol>
- 2177 * <li> Save lock state returned by {@link #getState}.
- 2178 * <li> Invoke {@link #release} with saved state as argument,
- 2179 * throwing IllegalMonitorStateException if it fails.
- 2180 * <li> Block until signalled.
- 2181 * <li> Reacquire by invoking specialized version of
- 2182 * {@link #acquire} with saved state as argument.
- 2183 * </ol>
- 2184 */
- 2185 /**
- 2186 * await()方法中,中断和signal()起到了相同的效果,将当前线程唤醒,但是中断一个正常等待的线程,线程被唤醒后,
- 2187 * 抢到了锁,但是发现等待的条件还未满足,线程会重新被挂起,所以我们希望有一个方法,在等待的时候,不要响应线程中断,
- 2188 * 所以下面awaitUnitertuptibly()方法就实现了这个功能。
- 2189 */
- 2190 public final void awaitUninterruptibly() {
- 2191 //将当前线程包装成Node节点,加入到条件队列condition中
- 2192 Node node = addConditionWaiter();
- 2193 //释放当前线程所持有的锁
- 2194 int savedState = fullyRelease(node);
- 2195 boolean interrupted = false;
- 2196 //判断当前线程是否已经在sync queue同步队列
- 2197 while (!isOnSyncQueue(node)) {
- 2198 //当前线程未在sync queue同步队列中,说明其还没有资格竞争独占锁,将当前线程挂起
- 2199 LockSupport.park(this);
- 2200 /**
- 2201 * 判断当前线程是否是由于被其他线程中断唤醒的,如果是则只是简单记录下其中断标记即可,
- 2202 * 当前线程要跳出此while循环则满足的唯一条件是,被其他线程signal(),进入到同步队列
- 2203 */
- 2204 if (Thread.interrupted())
- 2205 interrupted = true;
- 2206 }
- 2207
- 2208 /**
- 2209 * 这也是一个阻塞操作,当前线程已经加入到同步队列中,尝试竞争锁,竞争失败,找一个合适的地方,
- 2210 * 将当前线程挂起,等待其他线程signal(),如果在竞争锁的过程中,发生了异常,则需要记录一下
- 2211 */
- 2212 if (acquireQueued(node, savedState) || interrupted)
- 2213 selfInterrupt();
- 2214 }
- 2215
- 2216 /*
- 2217 * For interruptible waits, we need to track whether to throw
- 2218 * InterruptedException, if interrupted while blocked on
- 2219 * condition, versus reinterrupt current thread, if
- 2220 * interrupted while blocked waiting to re-acquire.
- 2221 */
- 2222
- 2223 /** Mode meaning to reinterrupt on exit from wait */
- 2224 private static final int REINTERRUPT = 1;
- 2225 /** Mode meaning to throw InterruptedException on exit from wait */
- 2226 private static final int THROW_IE = -1;
- 2227
- 2228 /**
- 2229 * Checks for interrupt, returning THROW_IE if interrupted
- 2230 * before signalled, REINTERRUPT if after signalled, or
- 2231 * 0 if not interrupted.
- 2232 */
- 2233 private int checkInterruptWhileWaiting(Node node) {
- 2234 //判断在等待的过程中,线程是否发生了中断
- 2235 return Thread.interrupted() ?
- 2236 //判断线程中断时发生在signal()之前,还是signal()之后,THROW_IE表示中断发生在signal()之前,REINTERRUPT表示中断发生在signal()之后,中断来得太晚了
- 2237 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
- 2238 0;
- 2239 }
- 2240
- 2241 /**
- 2242 * Throws InterruptedException, reinterrupts current thread, or
- 2243 * does nothing, depending on mode.
- 2244 */
- 2245 private void reportInterruptAfterWait(int interruptMode)
- 2246 throws InterruptedException {
- 2247 //表示还未被其他线程signal()就发生了中断,当前线程是由于中断被唤醒的,所以抛出InterruptedException异常
- 2248 if (interruptMode == THROW_IE)
- 2249 throw new InterruptedException();
- 2250 //表示被其它线程signal()过了之后才发生了线程中断,中断来得太迟了,所以补一下中断标记就行了
- 2251 else if (interruptMode == REINTERRUPT)
- 2252 selfInterrupt();
- 2253 }
- 2254
- 2255 /**
- 2256 * Implements interruptible condition wait.
- 2257 * <ol>
- 2258 * <li> If current thread is interrupted, throw InterruptedException.
- 2259 * <li> Save lock state returned by {@link #getState}.
- 2260 * <li> Invoke {@link #release} with saved state as argument,
- 2261 * throwing IllegalMonitorStateException if it fails.
- 2262 * <li> Block until signalled or interrupted.
- 2263 * <li> Reacquire by invoking specialized version of
- 2264 * {@link #acquire} with saved state as argument.
- 2265 * <li> If interrupted while blocked in step 4, throw InterruptedException.
- 2266 * </ol>
- 2267 */
- 2268 public final void await() throws InterruptedException {
- 2269 //调用await()方法时,当前线程已经被中断,则直接抛出中断异常
- 2270 if (Thread.interrupted())
- 2271 throw new InterruptedException();
- 2272 //将当前线程包装成Node节点,加入到条件队列中
- 2273 Node node = addConditionWaiter();
- 2274 //释放当前线程所占用的锁
- 2275 int savedState = fullyRelease(node);
- 2276 int interruptMode = 0;
- 2277 //判断代表当前线程节点是否在同步队列中,如果不在,说明还没有其它线程调用signal()方法,将当前线程从条件队列Conditon种转移到同步队列中
- 2278 while (!isOnSyncQueue(node)) {
- 2279 //当前线程还在条件队列中,没有资格竞争锁,则直接将当前线程挂起
- 2280 LockSupport.park(this);
- 2281 //能走到这里,说明当前线程被其他线程signal()或者被其它线程中断了
- 2282 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- 2283 break;
- 2284 }
- 2285 //interruptMode != THROW_IE表示当前线程不是由于线程中断而被唤醒的
- 2286 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- 2287 interruptMode = REINTERRUPT;
- 2288 //上面将从条件队列condition中唤醒的节点加入到sync queue条件队列中时候,并没有从条件队列中移除,这里需要清理一下
- 2289 if (node.nextWaiter != null) // clean up if cancelled
- 2290 unlinkCancelledWaiters();
- 2291 //当前节点在等待的过程中发生了中断
- 2292 if (interruptMode != 0)
- 2293 reportInterruptAfterWait(interruptMode);
- 2294 }
- 2295
- 2296 /**
- 2297 * Implements timed condition wait.
- 2298 * <ol>
- 2299 * <li> If current thread is interrupted, throw InterruptedException.
- 2300 * <li> Save lock state returned by {@link #getState}.
- 2301 * <li> Invoke {@link #release} with saved state as argument,
- 2302 * throwing IllegalMonitorStateException if it fails.
- 2303 * <li> Block until signalled, interrupted, or timed out.
- 2304 * <li> Reacquire by invoking specialized version of
- 2305 * {@link #acquire} with saved state as argument.
- 2306 * <li> If interrupted while blocked in step 4, throw InterruptedException.
- 2307 * </ol>
- 2308 */
- 2309 public final long awaitNanos(long nanosTimeout)
- 2310 throws InterruptedException {
- 2311 if (Thread.interrupted())
- 2312 throw new InterruptedException();
- 2313 Node node = addConditionWaiter();
- 2314 int savedState = fullyRelease(node);
- 2315 final long deadline = System.nanoTime() + nanosTimeout;
- 2316 int interruptMode = 0;
- 2317 while (!isOnSyncQueue(node)) {
- 2318 if (nanosTimeout <= 0L) {
- 2319 transferAfterCancelledWait(node);
- 2320 break;
- 2321 }
- 2322 if (nanosTimeout >= spinForTimeoutThreshold)
- 2323 LockSupport.parkNanos(this, nanosTimeout);
- 2324 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- 2325 break;
- 2326 nanosTimeout = deadline - System.nanoTime();
- 2327 }
- 2328 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- 2329 interruptMode = REINTERRUPT;
- 2330 if (node.nextWaiter != null)
- 2331 unlinkCancelledWaiters();
- 2332 if (interruptMode != 0)
- 2333 reportInterruptAfterWait(interruptMode);
- 2334 return deadline - System.nanoTime();
- 2335 }
- 2336
- 2337 /**
- 2338 * Implements absolute timed condition wait.
- 2339 * <ol>
- 2340 * <li> If current thread is interrupted, throw InterruptedException.
- 2341 * <li> Save lock state returned by {@link #getState}.
- 2342 * <li> Invoke {@link #release} with saved state as argument,
- 2343 * throwing IllegalMonitorStateException if it fails.
- 2344 * <li> Block until signalled, interrupted, or timed out.
- 2345 * <li> Reacquire by invoking specialized version of
- 2346 * {@link #acquire} with saved state as argument.
- 2347 * <li> If interrupted while blocked in step 4, throw InterruptedException.
- 2348 * <li> If timed out while blocked in step 4, return false, else true.
- 2349 * </ol>
- 2350 */
- 2351 public final boolean awaitUntil(Date deadline)
- 2352 throws InterruptedException {
- 2353 long abstime = deadline.getTime();
- 2354 if (Thread.interrupted())
- 2355 throw new InterruptedException();
- 2356 Node node = addConditionWaiter();
- 2357 int savedState = fullyRelease(node);
- 2358 boolean timedout = false;
- 2359 int interruptMode = 0;
- 2360 while (!isOnSyncQueue(node)) {
- 2361 if (System.currentTimeMillis() > abstime) {
- 2362 timedout = transferAfterCancelledWait(node);
- 2363 break;
- 2364 }
- 2365 LockSupport.parkUntil(this, abstime);
- 2366 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- 2367 break;
- 2368 }
- 2369 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- 2370 interruptMode = REINTERRUPT;
- 2371 if (node.nextWaiter != null)
- 2372 unlinkCancelledWaiters();
- 2373 if (interruptMode != 0)
- 2374 reportInterruptAfterWait(interruptMode);
- 2375 return !timedout;
- 2376 }
- 2377
- 2378 /**
- 2379 * Implements timed condition wait.
- 2380 * <ol>
- 2381 * <li> If current thread is interrupted, throw InterruptedException.
- 2382 * <li> Save lock state returned by {@link #getState}.
- 2383 * <li> Invoke {@link #release} with saved state as argument,
- 2384 * throwing IllegalMonitorStateException if it fails.
- 2385 * <li> Block until signalled, interrupted, or timed out.
- 2386 * <li> Reacquire by invoking specialized version of
- 2387 * {@link #acquire} with saved state as argument.
- 2388 * <li> If interrupted while blocked in step 4, throw InterruptedException.
- 2389 * <li> If timed out while blocked in step 4, return false, else true.
- 2390 * </ol>
- 2391 */
- 2392 public final boolean await(long time, TimeUnit unit)
- 2393 throws InterruptedException {
- 2394 long nanosTimeout = unit.toNanos(time);
- 2395 if (Thread.interrupted())
- 2396 throw new InterruptedException();
- 2397 Node node = addConditionWaiter();
- 2398 int savedState = fullyRelease(node);
- 2399 final long deadline = System.nanoTime() + nanosTimeout;
- 2400 boolean timedout = false;
- 2401 int interruptMode = 0;
- 2402 while (!isOnSyncQueue(node)) {
- 2403 if (nanosTimeout <= 0L) {
- 2404 timedout = transferAfterCancelledWait(node);
- 2405 break;
- 2406 }
- 2407 if (nanosTimeout >= spinForTimeoutThreshold)
- 2408 LockSupport.parkNanos(this, nanosTimeout);
- 2409 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- 2410 break;
- 2411 nanosTimeout = deadline - System.nanoTime();
- 2412 }
- 2413 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- 2414 interruptMode = REINTERRUPT;
- 2415 if (node.nextWaiter != null)
- 2416 unlinkCancelledWaiters();
- 2417 if (interruptMode != 0)
- 2418 reportInterruptAfterWait(interruptMode);
- 2419 return !timedout;
- 2420 }
- 2421
- 2422 // support for instrumentation
- 2423
- 2424 /**
- 2425 * Returns true if this condition was created by the given
- 2426 * synchronization object.
- 2427 *
- 2428 * @return {@code true} if owned
- 2429 */
- 2430 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
- 2431 return sync == AbstractQueuedSynchronizer.this;
- 2432 }
- 2433
- 2434 /**
- 2435 * Queries whether any threads are waiting on this condition.
- 2436 * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
- 2437 *
- 2438 * @return {@code true} if there are any waiting threads
- 2439 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
- 2440 * returns {@code false}
- 2441 */
- 2442 protected final boolean hasWaiters() {
- 2443 if (!isHeldExclusively())
- 2444 throw new IllegalMonitorStateException();
- 2445 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- 2446 if (w.waitStatus == Node.CONDITION)
- 2447 return true;
- 2448 }
- 2449 return false;
- 2450 }
- 2451
- 2452 /**
- 2453 * Returns an estimate of the number of threads waiting on
- 2454 * this condition.
- 2455 * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
- 2456 *
- 2457 * @return the estimated number of waiting threads
- 2458 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
- 2459 * returns {@code false}
- 2460 */
- 2461 protected final int getWaitQueueLength() {
- 2462 if (!isHeldExclusively())
- 2463 throw new IllegalMonitorStateException();
- 2464 int n = 0;
- 2465 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- 2466 if (w.waitStatus == Node.CONDITION)
- 2467 ++n;
- 2468 }
- 2469 return n;
- 2470 }
- 2471
- 2472 /**
- 2473 * Returns a collection containing those threads that may be
- 2474 * waiting on this Condition.
- 2475 * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
- 2476 *
- 2477 * @return the collection of threads
- 2478 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
- 2479 * returns {@code false}
- 2480 */
- 2481 protected final Collection<Thread> getWaitingThreads() {
- 2482 if (!isHeldExclusively())
- 2483 throw new IllegalMonitorStateException();
- 2484 ArrayList<Thread> list = new ArrayList<Thread>();
- 2485 for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
- 2486 if (w.waitStatus == Node.CONDITION) {
- 2487 Thread t = w.thread;
- 2488 if (t != null)
- 2489 list.add(t);
- 2490 }
- 2491 }
- 2492 return list;
- 2493 }
- 2494 }
- 2495
- 2496 /**
- 2497 * Setup to support compareAndSet. We need to natively implement
- 2498 * this here: For the sake of permitting future enhancements, we
- 2499 * cannot explicitly subclass AtomicInteger, which would be
- 2500 * efficient and useful otherwise. So, as the lesser of evils, we
- 2501 * natively implement using hotspot intrinsics API. And while we
- 2502 * are at it, we do the same for other CASable fields (which could
- 2503 * otherwise be done with atomic field updaters).
- 2504 */
- 2505 private static final Unsafe unsafe = Unsafe.getUnsafe();
- 2506 private static final long stateOffset;
- 2507 private static final long headOffset;
- 2508 private static final long tailOffset;
- 2509 private static final long waitStatusOffset;
- 2510 private static final long nextOffset;
- 2511
- 2512 static {
- 2513 try {
- 2514 stateOffset = unsafe.objectFieldOffset
- 2515 (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
- 2516 headOffset = unsafe.objectFieldOffset
- 2517 (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
- 2518 tailOffset = unsafe.objectFieldOffset
- 2519 (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
- 2520 waitStatusOffset = unsafe.objectFieldOffset
- 2521 (Node.class.getDeclaredField("waitStatus"));
- 2522 nextOffset = unsafe.objectFieldOffset
- 2523 (Node.class.getDeclaredField("next"));
- 2524
- 2525 } catch (Exception ex) { throw new Error(ex); }
- 2526 }
- 2527
- 2528 /**
- 2529 * CAS head field. Used only by enq.
- 2530 */
- 2531 private final boolean compareAndSetHead(Node update) {
- 2532 return unsafe.compareAndSwapObject(this, headOffset, null, update);
- 2533 }
- 2534
- 2535 /**
- 2536 * CAS tail field. Used only by enq.
- 2537 */
- 2538 private final boolean compareAndSetTail(Node expect, Node update) {
- 2539 return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
- 2540 }
- 2541
- 2542 /**
- 2543 * CAS waitStatus field of a node.
- 2544 */
- 2545 private static final boolean compareAndSetWaitStatus(Node node,
- 2546 int expect,
- 2547 int update) {
- 2548 return unsafe.compareAndSwapInt(node, waitStatusOffset,
- 2549 expect, update);
- 2550 }
- 2551
- 2552 /**
- 2553 * CAS next field of a node.
- 2554 */
- 2555 private static final boolean compareAndSetNext(Node node,
- 2556 Node expect,
- 2557 Node update) {
- 2558 return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
- 2559 }
- 2560 }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |