AQS

AQSAbstractQuenedSynchronizer 抽象的队列式同步器。是除了 java 自带的 synchronized 关键字之外的锁机制。
AQS 的全称为(AbstractQueuedSynchronizer),这个类在 java.util.concurrent.locks

AQS 提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,如下图所示。AQS 为一系列同步器依赖于一个单独的原子变量(state)的同步器提供了一个非常有用的基础。子类们必须定义改变 state 变量的 protected 方法,这些方法定义了 state 是如何被获取或释放的。鉴于此,本类中的其他方法执行所有的排队和阻塞机制。子类也可以维护其他的 state 变量,但是为了保证同步,必须原子地操作这些变量。

AQS

AQSstate 的操作是原子的, 且不能被覆写,所有的同步机制的实现均依赖于对改变量的原子操作。为了实现不同的同步机制,子类需要提供一个非公有的(non-public internal) 内部辅助类来实现同步逻辑。AQS 提供了 EXCLUSIVE (互斥) 和 SHARE (共享) 两种模式, 一般情况下子类只需要实现其中一种模式, 也有同时实现两种模式的 ReadWriteLock

state

AQS 维护一个 volatilestate 变量来记录状态,保证 state 的可见性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* The synchronization state.
*/
private volatile int state;

/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}

/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}

/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 提供三种方法访问 state ,且无法被重写. 其中 compareAndSetState 方法使用的是 Unsafe 类的 compareAndSwapInt

Node

AQS 是对 CLH 锁也是一种基于链表的可扩展高性能公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

Java AQS 的设计对 CLH 锁进行了优化或者说变体。

Node 类就是基于 CLH 锁的 FIFO 队列的实现。

Node 的两种模式:

1
2
3
4
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

SHARED 共享模式(多个线程可同时执行,如 Semaphore/CountDownLatch ), EXCLUSIVE 独占模式 (只有一个线程能执行,如 ReentrantLock)

Node 的几种等待状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
  • SIGNAL 此节点后面的节点已(或即将)被阻塞(通过park),因此当前节点在释放或取消时必须断开后面的节点, 为了避免竞争,acquire 方法时前面的节点必须是 SIGNAL 状态,然后重试原子 acquire,然后在失败时阻塞。
  • CANCELLED 由于超时或中断,此节点被取消。节点一旦被取消了就不会再改变状态。特别是,取消节点的线程不会再阻塞。
  • CONDITION 此节点当前在条件队列中。标记为 CONDITION 的节点会被移动到一个特殊的条件等待队列(此时状态将设置为0),直到条件时才会被重新移动到同步等待队列 。(此处使用此值与字段的其他用途无关,但简化了机制。)
  • PROPAGATE 传播:应将 releaseShared 传播到其他节点。这是在 doReleaseShared 中设置的(仅适用于头部节点),以确保传播继续,即使此后有其他操作介入。
  • 0 非负值表示节点不需要发出信号。所以,大多数代码不需要检查特定的值,只需要检查符号。对于正常同步节点,该字段初始化为0;对于条件节点,该字段初始化为条件。它是使用CAS修改的(或者在可能的情况下,使用无条件的volatile写入)。

Node 的前驱和后继节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;

当前队列节点的线程

1
2
3
4
5
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;

nextWaiter 下一个等待节点(conditon等待队列的会用到),或者一个特殊的 SHARED 值。

1
2
3
4
5
6
7
8
9
10
11
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;

Node 中的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
  • isShared 判断 SHARED 是否等于 nextWaiter
  • predecessor 获取当前节点的前驱节点, 如果前驱节点为空抛出空指针异常

Node 的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
Node() {    // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}

Node 有三个构造函数,默认无参构造函数、指定模式构造函数(用于新增等待节点),指定等待状态的构造函数(用于 conditon 等待队列)

AQS 维护一个双向链表的 FIFO队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

自定义资源共享方式

获取资源

共享模式资源获取:

  1. 多个线程通过调用 tryAcquireShared 方法获取共享资源,返回值大于等于0则获取资源成功,返回值小于0则获取失败。
  2. 当前线程获取共享资源失败后,通过调用 addWaiter 方法把该线程封装为 Node 节点,并设置该节点为共享模式。然后把该节点添加到队列的尾部。
  3. 添加到尾部后,判断该节点的前驱节点是不是头节点,如果前驱节点是头节点,那么该节点的前驱节点出队列并获取共享资源,同时调用 setHeadAndPropagate 方法把该节点设置为新的头节点,同时唤醒队列中所有共享类型的节点,去获取共享资源。如果获取失败,则再次加入到队列中。
  4. 如果该节点的前驱节点不是头节点,那么通过for循环进行自旋转等待,直到当前节点的前驱节点是头节点,结束自旋。

独占模式资源获取:

  1. 多个线程通过调用 tryAcquire 方法获取独占资源,返回值为 true 则获取资源成功,返回值为 false 则获取失败。
  2. 当前线程获取共享资源失败后,通过调用 addWaiter 方法把该线程封装为 Node 节点,并设置该节点为独占模式。然后把该节点添加到队列的尾部。
  3. 添加到尾部后,判断该节点的前驱节点是不是头节点,如果前驱节点是头节点,继续调用 tryAcquire 获取资源,获取成功将当前节点设置为头节点并返回 interrupted(是否被中断)
  4. 如果该节点的前驱节点不是头节点,那么通过for循环进行自旋转等待,直到当前节点的前驱节点是头节点,结束自旋。

由此可见共享模式和独占模式获取资源的流程不同点就是在设置头节点。共享模式下回调用 setHeadAndPropagete 方法,该方法会同时唤醒队列中所有共享类型的节点,去获取共享资源

释放资源

共享模式资源释放:

  1. 通过调用 tryReleaseShared 方法释放共享资源,返回值为 true 则释放资源成功,返回值为 false 则释放失败
  2. 当前线程获取释放资源成功后,调用 doReleaseShared 方法,唤醒队列中所有共享类型的节点,去获取共享资源

独占模式资源释放:

  1. 通过调用 tryRelease 方法释放共享资源,返回值为 true 则释放资源成功,返回值为 false 则释放失败。
  2. 当前线程获取释放资源成功后,且后继节点的等待状态不为0,调用 unparkSuccessor 方法,唤醒后继节点线程

不同的自定义同步器争用资源的方式也不同。自定义同步器在实现时只需要实现资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false
  • tryRelease(int):独占方式。尝试释放资源,成功则返回 true,失败则返回 false
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Main exported methods

/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return {@code true} if successful. Upon success, this object has
* been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

/**
* Attempts to set the state to reflect a release in shared mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

/**
* Returns {@code true} if synchronization is held exclusively with
* respect to the current (calling) thread. This method is invoked
* upon each call to a non-waiting {@link ConditionObject} method.
* (Waiting methods instead invoke {@link #release}.)
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}. This method is invoked
* internally only within {@link ConditionObject} methods, so need
* not be defined if conditions are not used.
*
* @return {@code true} if synchronization is held exclusively;
* {@code false} otherwise
* @throws UnsupportedOperationException if conditions are not supported
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

接下来我们开始看 AQS 的源码实现。依照 acquire-releaseacquireShared-releaseShared 的次序来。

tryAcquire

需要独占模式的自定义 AQS 子类去实现的

acquire(int)

acquire 是一种以独占方式获取资源,如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。该方法是独占模式下线程获取共享资源的顶层入口。获取到资源后,线程就可以去执行其临界区代码了。下面是 acquire() 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  1. 首先调用 tryAcquire(int) 尝试竞争锁资源
  2. 如果竞争失败就会调用 acquireQueued, addWaiter 将该线程加入等待队列的尾部,并标记为独占模式;
  3. 如果等待过程中线程被终端就调用 selfInterrupt,调用当前线程的 interrupt 方法

addWaiter(Node)

该方法用于将当前线程根据不同的模式(Node.EXCLUSIVE 互斥模式、Node.SHARED 共享模式)加入到等待队列的队尾,并返回当前线程所在的结点。
如果队列不为空,则以通过 compareAndSetTail 方法以 CAS 的方式将当前线程节点加入到等待队列的末尾。否则,通过 enq(node) 方法初始化一个等待队列,并返回当前节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

enq(Node)

enq(node) 用于将当前节点插入等待队列,如果队列为空,则初始化当前队列。整个过程以 CAS 自旋的方式进行,直到成功加入队尾为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued(Node, int)

acquireQueued() 用于队列中的线程自旋地以独占且不可中断的方式获取同步状态(acquire),直到拿到锁之后再返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  1. 如果当前节点已经成为头结点,尝试获取锁(tryAcquire)成功,然后返回
  2. 否则检查当前节点是否应该被 park,然后将该线程 park 并且检查当前线程是否可以被中断。

shouldParkAfterFailedAcquire(Node, Node)

shouldParkAfterFailedAcquire 方法通过对当前节点的前一个节点的状态进行判断,对当前节点做出不同的操作,至于每个 Node 的状态表示,可以参考接口文档。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
  1. 如果前驱节点状态为 SIGNAL, 返回 true 标识当前节点需要被 park
  2. 如果前驱节点状态大于 0,即前驱节点状态为 CANCELLED, 循环遍历对前驱节点遍历, 移除 CANCELLED 节点
  3. 否则就说明前驱节点状态为 0 或者 PROPAGATE 状态, 通过 CAS 设置前驱节点的状态为 SIGNAL

parkAndCheckInterrup()

该方法让线程去休息,真正进入等待状态。park() 会让当前线程进入 waiting 状态。在此状态下,可以通过 unpark()interrupt()。需要注意的是,Thread.interrupted() 会清除当前线程的中断标记位。

1
2
3
4
5
6
7
8
9
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

tryRelease

需要独占模式的自定义 AQS 子类去实现的。

release(int)

release 方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即 state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是 unlock() 的语义,当然不仅仅只限于 unlock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

unparkSuccessor(Node)

如果存在后继节点, 唤醒后继节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
  1. 如果当前节点的等待状态小于 0,通过 CAS 将等待状态置 0
  2. 如果后继节点为 null 或 后继节点等待状态为 CANCELLED, 从队列尾部开始寻找最靠前的不为 null 且等待状态不为 CANCELLED 的节点
  3. 如果找到指定节点, 就 unpark 唤醒该节点的线程

tryAcquireShared

需要共享模式的自定义 AQS 子类去实现的。

acquireShared(int)

该方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

doAcquireShared(int)

将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

此方法在 setHead() 的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!

tryReleaseShared

需要共享模式的自定义 AQS 子类去实现的。

releaseShared(int)

该方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

注意:

独占模式下的 tryRelease() 在完全释放掉资源(state=0)后,才会返回 true 去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的 releaseShared() 则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。

doReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
  • 逻辑是一个死循环,每次循环中重新读取一次 head,然后保存在局部变量h中,再配合 if(h == head) break;,这样,循环检测到head 没有变化时就会退出循环。注意,head 变化一定是因为:acquire thread 被唤醒,之后它成功获取锁,然后 setHead 设置了新 head。而且注意,只有通过 if(h == head) break; 即 head 不变才能退出循环,不然会执行多次循环。
  • if (h != null && h != tail) 判断队列是否至少有两个 node,如果队列从来没有初始化过(headnull),或者 head 就是 tail,那么中间逻辑直接不走,直接判断 head 是否变化了。
  • 如果队列中有两个或以上个 node,那么检查局部变量h的状态:
    • 如果状态为 SIGNAL,说明 h 的后继是需要被通知的。通过对 CAS 操作结果取反,将 compareAndSetWaitStatus(h, Node.SIGNAL, 0)unparkSuccessor(h) 绑定在了一起。说明了只要 head 成功得从 SIGNAL 修改为 0,那么 head 的后继的代表线程肯定会被唤醒了。
    • 如果状态为 0,说明h的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于 acquire thread 获取锁失败再次设置 headSIGNAL 并再次阻塞,要么由于 acquire thread 获取锁成功而将自己(head 后继)设置为新 head并且只要 head 后继不是队尾,那么新 head 肯定为 SIGNAL。所以设置这种中间状态的 headstatusPROPAGATE,让其 status 又变成负数,这样可能被被唤醒线程检测到。
    • 如果状态为 PROPAGATE,直接判断 head 是否变化。
  • 两个 continue 保证了进入那两个分支后,只有当 CAS 操作成功后,才可能去执行 if(h == head) break;,才可能退出循环。
    if(h == head) break; 保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新 head,就会再次循环。目的当然是为了再次执行 unparkSuccessor(h),即唤醒队列中第一个等待的线程。

Condition

AQS 中不光有等待队列,还有一个条件队列,AQS 提供了一个 Condition 接口的实现 ConditionObject

Condition 的作用是对锁进行更精确的控制。Condition 中的 await()signal()signalAll()方法相当于 Objectwait()notify()notifyAll() 方法。不同的是,Object 中的 wait()notify()notifyAll() 方法是和"同步锁"( synchronized 关键字)捆绑使用的;而 Condition 是需要与 Lock 捆绑使用的。

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
  1. 如果当前线程被中断,抛出 InterruptedException异常
  2. 首先调用addConditionWaiter方法把当前节点(线程)添加到条件队列
  3. 调用fullyRelease方法释放已经持有的锁(就是在调用 condition.await() 之前持有的 lock.lock()锁),并返回释放前的锁状态
  4. 调用 isOnSyncQueue 方法检查当前节点是否在同步队列中,直到当前节点被加入到同步队列或者被中断
  5. 调用 acquireQueued 方法将当前节点加入到同步队列中
  6. 如果 nextWaiter 不为空,说明节点在获取锁时由于遭遇异常或者被中断而被取消,此时需要清除 CANCELLED 节点
  7. 如果 interruptMode 不等于 0,说明当前节点被中断, 调用 reportInterruptAfterWait 处理中断

addConditionWaiter

创建一个 Condition 节点,并加入到 firstWaiter.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

signal

signal 方法唤醒条件队列中的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
  1. 如果当前节点已经获取到锁资源,抛出 IllegalMonitorStateException 异常
  2. 获取等待头节点,如果该节点不为空, doSignal唤醒节点线程

doSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
  1. 更新 firstWaiter 为条件队列的下一个等待节点;
  2. 解除当前节点的链接;
  3. 调用 transferForSignal 方法把当前节点转移到等待队列,等待获取锁

transferForSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
  1. CAS 将节点等待状态设为 0,如果设置失败,返回 false
  2. enq 将节点插入到等待队列,并返回前驱节点
  3. 如果前继节点被取消或者设置前继节点的状态失败,则直接唤醒当前节点线程尝试获取锁

参考