ReentrantReadWriteLock:

ReentrantReadWriteLockLock 的另一种实现方式,我们已经知道了 ReentrantLock 是一个排他锁,同一时间只允许一个线程> 访问,而 ReentrantReadWriteLock 允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提> 高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时 ReentrantReadWriteLock 能够> 提供比排他锁更好的并发性和吞吐量。

ReentrantReadWriteLock

​ 读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock 实现都必须保证 writeLock 操作的内存同步效果也要保持与相关 readLock 的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。

  1. 读写锁支持公平锁和非公平锁
  2. 读写锁是可重入锁,写锁可以继续获取写锁,也可以再获取读锁;但是读锁只能再获取读锁,不能再获取写锁
  3. 写锁可以降级到读锁–先获取读锁再释放写锁。但不支持从读锁升级到写锁
  4. 读锁和写锁都支持锁获取期间的中断
  5. 写锁支持 Condition,而读锁不支持

属性

1
2
3
4
5
6
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

readerLock 读锁、writerLock 写锁和 Sync 基于 AQS 的同步队列

Sync

ReentrantReadWriteLock 提供了两个 Sync 实现,分别是 FairSync(公平锁) 和 NonfairSync (非公平锁)

readerShouldBlock

readerShouldBlock 方法用于判断当前读操作是否应该被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 公平锁
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
// 非公平锁
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}

FairSync下判断当前读操作是否应该被阻塞,调用的是 hasQueuedPredecessors,返回当前节点是否为头节点。

NonfairSync 下的实现是调用 apparentlyFirstQueuedIsExclusive 方法,判断头节点的后继节点是否为共享模式(读)

As a heuristic to avoid indefinite writer starvation,block if the thread that momentarily appears to be head of queue, if one exists, is a waiting writer. This is only a probabilistic effect since a new reader will not block if there is a waiting writer behind other enabled readers that have not yet drained from the queue.

根据官方注释的翻译:

这只是一种启发式地避免写锁无限等待的做法,它在遇到同步队列的 head 后继为写锁节点时,会让 readerShouldBlock 返回 true代表新来的读> 锁(new reader)需要阻塞等待这个 head 后继。但只是一定概率下能起到作用,如果同步队列的 head 后继是一个读锁,之后才是写锁的话,> readerShouldBlock 就肯定会返回 false 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Returns {@code true} if the apparent first queued thread, if one
* exists, is waiting in exclusive mode. If this method returns
* {@code true}, and the current thread is attempting to acquire in
* shared mode (that is, this method is invoked from {@link
* #tryAcquireShared}) then it is guaranteed that the current thread
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

这里就是判断头节点的后继节点是否为非共享模式,且节点绑定的线程不为空。
也就是说头节点的后继节点如果不是共享模式就返回 true,表示当前读操作应该被阻塞。

new reader 意思就是,线程第一次来尝试获取读锁。

写锁无限等待 indefinite writer starvation

考虑 readerShouldBlock 的现有实现的话,写锁节点只需要等待线程 AB 释放读锁后,就可以获得到写锁了。而线程 CDE 作为 new reader,不会去尝试获取读锁,而是将自己包装成读锁节点排在写锁节点的后面。这个具体的流程,请查看 ReentrantReadWriteLock 源码解析的读锁的获取章节内容里面对 fullTryAcquireShared 函数的讲解,我们只需要知道 readerShouldBlock 返回了 true,代表读锁获取应该阻塞,如果这个读锁是个 new reader

heuristic启发式地防止new reader

但尽管 readerShouldBlock 是这样的非公平实现,也无法防止上图第二种情况的 new reader 的获取读锁动作,所以说这只是一定概率下防止 new reader 获取读锁,但有概率的防止总比啥都不做强。

writerShouldBlock

writerShouldBlock 方法用于判断当前写操作是否应该被阻塞。

1
2
3
4
5
6
7
8
// 公平锁
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
// 非公平锁
final boolean writerShouldBlock() {
return false; // writers can always barge
}

在非公平锁中直接返回 false 无需阻塞等待, 公平锁中也是调用 hasQueuedPredecessors

lock

1
2
3
4
5
6
7
8
// 读锁
public void lock() {
sync.acquireShared(1);
}
// 写锁
public void lock() {
sync.acquire(1);
}

lock 读锁通过共享模式获取锁资源,而写锁通过独占模式获取锁资源

共享方式获取锁资源,acquireShare 调用 tryAcquireShare 尝试获取锁资源,如果获取失败再调用 doAcquireShare方法,下面是 ReentrantReadWriteLocktryAcquireShare 的实现。

acquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

1、如果有其他线程持有写锁,获取锁资源失败.
2、如果读操作无需阻塞,且当前持有锁数量相遇最大限制(65536),且 CAS 修改状态成功,返回 1
* 若当前节点状态为 0,设置当前节点为头节点,设置头结点持有锁数量为 1
* 若当前节点为头节点,头节点持有锁数量 +1
* 否则,获取当前线程的持有计数器,并将计数器持有数量 +1
3、调用 fullTryAcquireSHared 方法获取锁资源。

fullTryAcquireShared

完整版本的获取读取,可处理 tryAcquireShared 中未处理的 CAS 丢失和可重入读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

该方法与 tryAcquireShared 部分冗余,但总体上更简单. 由于没有 retriedlazily 读取持有计数之间的复杂交互

独占模式获取锁资源,acquire 调用 tryAcquire 尝试获取锁资源,如果获取失败再调用 acquireQueued方法加入同步队列,下面是 ReentrantReadWriteLocktryAcquire 的实现。

tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

1、若读写锁数量都不为 0 且 其他线程持有锁资源,则获取锁资源失败
2、若持有锁数量超过最大限制,则获取锁资源失败
3、若当前线程是可重入获取或队列策略允许的话,则可以获取锁资源。

unlock

1
2
3
4
5
6
7
8
// 读锁
public void unlock() {
sync.releaseShared(1);
}
// 写锁
public void unlock() {
sync.release(1);
}

共享方式释放锁资源,releaseShared 调用 tryReleaseShared 尝试释放锁资源,如果释放后允许唤醒后续等待结点,调用 doReleaseShared 尝试唤醒 head 后继节点。

tryReleaseShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
  1. 若当前线程为firstReader,判断当前头部节点线程持有的锁数量是否为 1 ,如果是就设置 firstReader = null,否则 firstReaderHoldCount 数量减 1
  2. 否则获取线程的持有锁数量计数器,并将持有数据量减 1
  3. 循环使用 CAS 设置 state,并返回当前 state 是否为 0(读锁释放不影响其他读操作,但如果现在读取和写入锁定均已释放,则可能允许等待的写入器继续进行。)

独占方式释放锁资源,release 调用 tryRelease 尝试释放锁资源,如果释放成功.判断头节点不为空且等待状态不为 0,尝试 unpark 后继节点。

tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
  1. 如果当前线程未持有锁资源,抛出 IllegalMonitorStateException 异常
  2. 如果释放锁资源后,该线程持有锁资源数量为 0 则设置持有排它锁线程为 null
  3. 更新 state 状态并返回当前线程是否已完全释放锁资源(持有锁资源数量为 0)

参考

JUC源码分析-JUC锁:ReentrantLock
深入理解读锁的非公平实现