Phaser:

PhaserJDK 1.7 新增的一个同步辅助类,在功能上跟 CyclicBarrierCountDownLatch 差不多,但支持更丰富的用法。与 JUC 中多数同步类不同,它并不是通过 AQS 来实现的,而是用了另外一种同步机制。
我们可以将 Phaser 看成一个一个的阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个阶段。所以 Phaser 特别适合使用在重复执行或者重用的情况。

同步器作用
CountDownLatch倒数计数器,初始时设定计数器值,线程可以在计数器上等待,当计数器值归0后,所有等待的线程继续执行
CyclicBarrier循环栅栏,初始时设定参与线程数,当线程到达栅栏后,会等待其它线程的到达,当到达栅栏的总数满足指定数后,所有等待的线程继续执行
Phaser多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance

phase

CyclicBarrier 中,只有一个栅栏,线程在到达栅栏后会等待其它线程的到达。

Phaser 也有栅栏,在 Phaser 中,栅栏的名称叫做 phase(阶段),在任意时间点,Phaser 只处于某一个 phase(阶段),初始阶段为0,最大达到 Integer.MAX_VALUE,然后再次归零。当所有 parties 参与者都到达后,phase 值会递增。

parties

parties(参与者)其实就是 CyclicBarrier 中的参与线程的概念。

CyclicBarrier 中的参与者在初始构造指定后就不能变更,而 Phaser 既可以在初始构造时指定参与者的数量,也可以中途通过registerbulkRegisterarriveAndDeregister等方法注册/注销参与者。

arrive/advance

Phaser 注册完 parties(参与者)之后,参与者的初始状态是 unarrived 的,当参与者到达(arrive)当前阶段(phase)后,状态就会变成 arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生进阶(advance)——也就是 phase+1

arrive/advance

termination

代表当前 Phaser 对象达到终止状态,有点类似于 CyclicBarrier 中的栅栏被破坏的概念。

tiering

Phaser 支持分层(Tiering) —— 一种树形结构,通过构造函数可以指定当前待构造的 Phaser 对象的父结点。之所以引入 Tiering,是因为当一个 Phaser 有大量参与者(parties)的时候,内部的同步操作会使性能急剧下降,而分层可以降低竞争,从而减小因同步导致的额外开销。
在一个分层 Phasers 的树结构中,注册和撤销子 Phaser 或父 Phaser 是自动被管理的。当一个 Phaser 的参与者(parties)数量变成0时,如果有该 Phaser 有父结点,就会将它从父结点中溢移除。

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Primary state representation, holding four bit-fields:
*
* unarrived -- the number of parties yet to hit barrier (bits 0-15)
* parties -- the number of parties to wait (bits 16-31)
* phase -- the generation of the barrier (bits 32-62)
* terminated -- set if barrier is terminated (bit 63 / sign)
*
* Except that a phaser with no registered parties is
* distinguished by the otherwise illegal state of having zero
* parties and one unarrived parties (encoded as EMPTY below).
*
* To efficiently maintain atomicity, these values are packed into
* a single (atomic) long. Good performance relies on keeping
* state decoding and encoding simple, and keeping race windows
* short.
*
* All state updates are performed via CAS except initial
* registration of a sub-phaser (i.e., one with a non-null
* parent). In this (relatively rare) case, we use built-in
* synchronization to lock while first registering with its
* parent.
*
* The phase of a subphaser is allowed to lag that of its
* ancestors until it is actually accessed -- see method
* reconcileState.
*/
private volatile long state;

/**
* The parent of this phaser, or null if none
*/
private final Phaser parent;

/**
* The root of phaser tree. Equals this if not in a tree.
*/
private final Phaser root;

/**
* Heads of Treiber stacks for waiting threads. To eliminate
* contention when releasing some threads while adding others, we
* use two of them, alternating across even and odd phases.
* Subphasers share queues with root to speed up releases.
*/
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

// Unsafe mechanics

private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
  1. state Phaser 使用一个 longstate 值来标识内部状态:
    • unarrived(低0-15位) 表示未到达 barrierparties
    • parties(中16-31位) 表示等待的 parties
    • phase(中32-62位) 表示 phase 当前的 Generation
    • terminated(高63位) 表示当前 phaser 的终止状态。
  2. parent 表示当前 phaser 的父 phaser, 可能为空
  3. root 表示当前 phaser 数的根 phaser
  4. evenQ/oddQ 等待线程的栈顶元素,根据 phase 取模定义为一个奇数 header 和一个偶数 header
  5. UNSAFE Unsafe 类实例,用于操作内存
  6. stateOffset state 变量的内存偏移量

QNode

QNodePhaser 定义的内部等待队列,用于在阻塞时记录等待线程及相关信息。实现了ForkJoinPool 的一个内部接口 ManagedBlocker
Phaser 也可能被 ForkJoinPool 中的任务使用,这样在其他任务阻塞等待一个 phase 时可以保证足够的并行度来执行任务(通过内部实现方法 isReleasableblock)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* Wait nodes for Treiber stack representing wait queue
*/
static final class QNode implements ForkJoinPool.ManagedBlocker {
// 当前 phaser
final Phaser phaser;
// 当前的 phase 值
final int phase;
// 是否可中断
final boolean interruptible;
// 是否支持超时
final boolean timed;
// 是否已中断
boolean wasInterrupted;
// 超时时长,单位纳秒
long nanos;
// 若支持超时,nanos 等于创建 QNode 的系统时间 + nanos,否则 nanos = 0
final long deadline;
// 当前节点线程
volatile Thread thread; // nulled to cancel wait
// QNode 是一个的单向链表,next 是该节点的后继节点
QNode next;

QNode(Phaser phaser, int phase, boolean interruptible,boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}

public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
nanos = deadline - System.nanoTime();
}
if (nanos <= 0L) {
thread = null;
return true;
}
}
return false;
}

public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* Creates a new phaser with no initially registered parties, no
* parent, and initial phase number 0. Any thread using this
* phaser will need to first register for it.
*/
public Phaser() {
this(null, 0);
}

/**
* Creates a new phaser with the given number of registered
* unarrived parties, no parent, and initial phase number 0.
*
* @param parties the number of parties required to advance to the
* next phase
* @throws IllegalArgumentException if parties less than zero
* or greater than the maximum number of parties supported
*/
public Phaser(int parties) {
this(null, parties);
}

/**
* Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}.
*
* @param parent the parent phaser
*/
public Phaser(Phaser parent) {
this(parent, 0);
}

/**
* Creates a new phaser with the given parent and number of
* registered unarrived parties. When the given parent is non-null
* and the given number of parties is greater than zero, this
* child phaser is registered with its parent.
*
* @param parent the parent phaser
* @param parties the number of parties required to advance to the
* next phase
* @throws IllegalArgumentException if parties less than zero
* or greater than the maximum number of parties supported
*/

public Phaser(Phaser parent, int parties) {
if (parties >>> PARTIES_SHIFT != 0) // 无符号右移 16 位,不为 0 说明 parties 超出了最大限制
throw new IllegalArgumentException("Illegal number of parties");
int phase = 0; // 初始 phase 为 0
this.parent = parent;

// 如果 parent 不为空
if (parent != null) {
final Phaser root = parent.root; // 获取 phaser 的根节点
this.root = root; // 将 root 指向根节点
this.evenQ = root.evenQ; // 共用根节点的偶数"无锁栈"
this.oddQ = root.oddQ; // 共用根节点的奇数"无锁栈"
if (parties != 0) // 如果注册的 parties 不为零
phase = parent.doRegister(1); // 向父节点注册 1 个 party 表示自己
}
// 不存在父节点,说明当前节点作为根节点注册
else {
this.root = this;
this.evenQ = new AtomicReference<QNode>();
this.oddQ = new AtomicReference<QNode>();
}
// 计算并更新 state
this.state = (parties == 0) ? (long)EMPTY :
((long)phase << PHASE_SHIFT) |
((long)parties << PARTIES_SHIFT) |
((long)parties);
}

核心方法

register

CyclicBarrierCountDownLatch 中,我们使用计数器来控制程序的顺序执行,同样的在 Phaser 中也是通过计数器来控制。
Phaser 中计数器叫做 parties, 我们可以通过 Phaser 的构造函数或者 register() 方法来注册。
通过调用 register() 方法,我们可以动态的控制 phaser 的个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Adds a new unarrived party to this phaser. If an ongoing
* invocation of {@link #onAdvance} is in progress, this method
* may await its completion before returning. If this phaser has
* a parent, and this phaser previously had no registered parties,
* this child phaser is also registered with its parent. If
* this phaser is terminated, the attempt to register has
* no effect, and a negative value is returned.
*
* @return the arrival phase number to which this registration
* applied. If this value is negative, then this phaser has
* terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
*/
public int register() {
return doRegister(1);
}

/**
* Adds the given number of new unarrived parties to this phaser.
* If an ongoing invocation of {@link #onAdvance} is in progress,
* this method may await its completion before returning. If this
* phaser has a parent, and the given number of parties is greater
* than zero, and this phaser previously had no registered
* parties, this child phaser is also registered with its parent.
* If this phaser is terminated, the attempt to register has no
* effect, and a negative value is returned.
*
* @param parties the number of additional parties required to
* advance to the next phase
* @return the arrival phase number to which this registration
* applied. If this value is negative, then this phaser has
* terminated, in which case registration has no effect.
* @throws IllegalStateException if attempting to register more
* than the maximum supported number of parties
* @throws IllegalArgumentException if {@code parties < 0}
*/
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
return doRegister(parties);
}

registerbulkRegister 都是可以动态注册指定数量的 Phaser,其核心都由 doRegister 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Implementation of register, bulkRegister
*
* @param registrations number to add to both parties and
* unarrived fields. Must be greater than zero.
*/
private int doRegister(int registrations) {
// adjustment to state
// 首先计算注册后当前State要调整的值adjust
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState(); // reconcileState()调整当前 Phaser 的 State 与root一致
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT; // 参与者数目
int unarrived = counts & UNARRIVED_MASK; // 未到达的数目
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s >>> PHASE_SHIFT); // 当前Phaser所处的阶段phase
if (phase < 0)
break;
if (counts != EMPTY) { // CASE1: 当前Phaser已经注册过参与者
if (parent == null || reconcileState() == s) {
// 参与者已全部到达栅栏, 当前 Phaser 正在 Advance, 需要阻塞等待这一过程完成
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
// 否则,直接更新 State
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
// CASE2: 当前 Phaser 未注册过参与者(第一次注册),且没有父结点
else if (parent == null) { // 1st root registration
long next = ((long)phase << PHASE_SHIFT) | adjust;
// CAS更新当前 Phaser 的 state 值
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
// CASE3: 当前 Phaser 未注册过参与者(第一次注册),且有父结点
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
phase = parent.doRegister(1); // 向父结点注册一个参与者
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}
  1. 如果当前操作不是首次注册,那么直接在当前 phaser 上更新注册 parties 数 (如果当前未到达数为0,说明上一代 phase 正在进行到达操作,此时调用 internalAwaitAdvance() 方法等待其他任务完成到达操作)
  2. 如果是首次注册,并且当前 phaser 没有父节点,说明是 root 节点注册,直接更新 phase
  3. 如果当前操作是首次注册,并且当前 phaser 由父节点,则注册操作交由父节点,并更新当前 phaserphase。由于子 Phaserphase 在没有被真正使用之前,允许滞后于它的 root 节点。非首次注册时,如果 Phaser 有父节点,则调用 reconcileState()方法解决 root 节点的 phase 延迟传递问题。

reconcileState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Resolves lagged phase propagation from root if necessary.
* Reconciliation normally occurs when root has advanced but
* subphasers have not yet done so, in which case they must finish
* their own advance by setting unarrived to parties (or if
* parties is zero, resetting to unregistered EMPTY state).
*
* @return reconciled state
*/
private long reconcileState() {
final Phaser root = this.root;
long s = state;
if (root != this) {
int phase, p;
// CAS to root phase with current parties, tripping unarrived
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
((phase < 0) ? (s & COUNTS_MASK) :
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
s = state;
}
return s;
}

root 节点的 phase 已经 advance 到下一代,但是子节点 phaser 还没有,这种情况下它们必须通过更新未到达 parties 数 完成它们自己的 advance 操作(如果 parties0,重置为 EMPTY 状态)。

internalAwaitAdvance

internalAwaitAdvance 的主要逻辑就是:当前参与者线程等待 Phaser 进入下一个阶段(就是 phase 值变化).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase 返回新的阶段
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
// 清空不用的Treiber Stack(奇偶Stack交替使用)
releaseWaiters(phase-1); // ensure old queue clean
// 入队标识
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
// 已完成或中止
else if (node.isReleasable()) // done or aborted
break;
// 将结点压入栈顶
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
// 阻塞等待
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}

if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}

第二个参数 node,如果不为空,则说明等待线程需要追踪中断状态超时状态。以 doRegister 中的调用为例,不考虑线程争用,internalAwaitAdvance 大概流程如下:

  1. 首先调用 releaseWaiters 唤醒上一代所有等待线程,确保旧队列中没有遗留的等待线程。
  2. 循环 SPINS_PER_ARRIVAL 指定的次数或者当前线程被中断,创建 node 记录等待线程及相关信息。
  3. 继续循环调用 ForkJoinPool.managedBlock 运行被阻塞的任务
  4. 继续循环,阻塞任务运行成功被释放,跳出循环
  5. 最后唤醒当前 phase 的线程

arrive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Arrives at this phaser, without waiting for others to arrive.
*
* <p>It is a usage error for an unregistered party to invoke this
* method. However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*
* @return the arrival phase number, or a negative value if terminated
* @throws IllegalStateException if not terminated and the number
* of unarrived parties would become negative
*/
public int arrive() {
return doArrive(ONE_ARRIVAL);
}

/**
* Main implementation for methods arrive and arriveAndDeregister.
* Manually tuned to speed up and minimize race windows for the
* common case of just decrementing unarrived field.
*
* @param adjust value to subtract from state;
* ONE_ARRIVAL for arrive,
* ONE_DEREGISTER for arriveAndDeregister
*/
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}

arrive 方法手动调整到达数,使当前线程到达 phaserarrivearriveAndDeregister 都调用了 doArrive 实现,大概流程如下:

  1. 首先更新 state(state - adjust)
  2. 如果当前不是最后一个未到达的任务,直接返回 phase
  3. 如果当前是最后一个未到达的任务
    • 如果当前是 root 节点,判断是否需要终止 phaserCAS 更新 phase,最后释放等待的线程;
    • 如果是分层结构,并且已经没有下一代未到达的parties,则交由父节点处理 doArrive 逻辑,然后更新 stateEMPTY

arriveAndAwaitAdvance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Arrives at this phaser and awaits others. Equivalent in effect
* to {@code awaitAdvance(arrive())}. If you need to await with
* interruption or timeout, you can arrange this with an analogous
* construction using one of the other forms of the {@code
* awaitAdvance} method. If instead you need to deregister upon
* arrival, use {@code awaitAdvance(arriveAndDeregister())}.
*
* <p>It is a usage error for an unregistered party to invoke this
* method. However, this error may result in an {@code
* IllegalStateException} only upon some subsequent operation on
* this phaser, if ever.
*
* @return the arrival phase number, or the (negative)
* {@linkplain #getPhase() current phase} if terminated
* @throws IllegalStateException if not terminated and the number
* of unarrived parties would become negative
*/
public int arriveAndAwaitAdvance() {
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); // 获取未到达数
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) { // 更新state
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null); // 阻塞等待其他任务
if (root != this)
return parent.arriveAndAwaitAdvance(); // 子Phaser交给父节点处理
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived)) // 全部到达,检查是否可销毁
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE; // 计算下一代phase
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) // 更新state
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase); // 释放等待phase的线程
return nextPhase;
}
}
}

使当前线程到达 phaser 并等待其他任务到达,等价于 awaitAdvance(arrive())。如果需要等待中断或超时,可以使用 awaitAdvance 方法完成一个类似的构造。如果需要在到达后取消注册,可以使用 awaitAdvance(arriveAndDeregister())。效果类似于CyclicBarrier.await。大概流程如下:

  1. 更新 state(state - 1)
  2. 如果未到达数大于1,调用 internalAwaitAdvance 阻塞等待其他任务到达,返回当前 phase
  3. 如果为分层结构,则交由父节点处理 arriveAndAwaitAdvance 逻辑
  4. 如果未到达数 <=1,判断 phaser 终止状态,CAS 更新 phase 到下一代,最后释放等待当前 phase 的线程,并返回下一代 phase

awaitAdvance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Awaits the phase of this phaser to advance from the given phase
* value, returning immediately if the current phase is not equal
* to the given phase value or this phaser is terminated.
*
* @param phase an arrival phase number, or negative value if
* terminated; this argument is normally the value returned by a
* previous call to {@code arrive} or {@code arriveAndDeregister}.
* @return the next arrival phase number, or the argument if it is
* negative, or the (negative) {@linkplain #getPhase() current phase}
* if terminated
*/
public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}

awaitAdvance 用于阻塞等待线程到达,直到 phase 前进到下一代,返回下一代的 phase number。方法很简单,不多赘述。awaitAdvanceInterruptibly 方法是响应中断版的 awaitAdvance,不同之处在于,调用阻塞时会记录线程的中断状态。

示例

通过 Phaser 控制多个线程的执行时机:有时候我们希望所有线程到达指定点后再同时开始执行,我们可以利用CyclicBarrierCountDownLatch 来实现,这里给出使用 Phaser 的版本。

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
Phaser phaser = new Phaser();
for (int index = 0; index < 10; index++) {
phaser.register(); // 注册各个参与者线程
new Thread(() -> {
int phase = phaser.arriveAndAwaitAdvance(); // 等待其它参与者线程到达
// TODO do something
System.out.println(String.format("【%s】: 执行完任务,当前 phase = %s", Thread.currentThread().getName(), phase));
}, "Thread-" + index).start();
}
}

以上示例中,创建了10个线程,并通过 register 方法注册 Phaser 的参与者数量为 10。当某个线程调用arriveAndAwaitAdvance 方法后,arrive 数量会加1,如果数量没有满足总数(参与者数量10),当前线程就是一直等待,当最后一个线程到达后,所有线程都会继续往下执行。

注意
arriveAndAwaitAdvance 方法是不响应中断的,也就是说即使当前线程被中断,arriveAndAwaitAdvance 方法也不会返回或抛出异常,而是继续等待。如果希望能够响应中断,可以参考 awaitAdvanceInterruptibly 方法。

通过 Phaser 实现开关。我们希望一些外部条件得到满足后,然后打开开关,线程才能继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SneakyThrows
public static void main(String[] args) {
Phaser phaser = new Phaser(1); // 注册主线程,当外部条件满足时,由主线程打开开关
for (int i = 0; i < 10; i++) {
phaser.register(); // 注册各个参与者线程
new Thread(() -> {
int phase = phaser.arriveAndAwaitAdvance(); // 等待其它参与者线程到达
// TODO do something
System.out.println(String.format("【%s】: 执行完任务,当前 phase = %s", Thread.currentThread().getName(), phase));
}, "Thread-" + i).start();
}

// 外部条件:等待用户输入命令
System.out.println("Press ENTER to continue");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
reader.readLine();

// 打开开关
phaser.arriveAndDeregister();
System.out.println("主线程打开了开关");
}

Phaser 支持分层功能,我们先来考虑下如何用利用 Phaser 的分层来实现高并发时的优化

phaser

如果任务数继续增大,那么同步产生的开销会非常大,利用 Phaser 分层的功能,我们可以限定每个 Phaser 对象的最大使用线程(任务数)

phaser

可以看到,上述 Phasers 其实构成了一颗多叉树,如果任务数继续增多,还可以将 Phaser 的叶子结点继续分裂,然后将分裂出的子结点供工作线程使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public static void main(String[] args) {
int repeats = 3; // 指定任务最多执行的次数
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format(
"---------------PHASE[%s],Parties[%s] ---------------",
phase, registeredParties));
return phase + 1 >= repeats || registeredParties == 0;
}
};

Runnable[] tasks = new Runnable[10];
build(tasks, 0, tasks.length, phaser); // 根据任务数,为每个任务分配 Phaser 对象

for (int index = 0; index < tasks.length; index++) {
new Thread(new Tasker(index, phaser)).start();
}
}

private static void build(Runnable[] taskers, int lo, int hi, Phaser phaser) {
int tasksPerPhaser = 4; // 每个Phaser对象对应的工作线程(任务)数
if (hi - lo > tasksPerPhaser) {
for (int i = lo; i < hi; i += tasksPerPhaser) {
int j = Math.min(i + tasksPerPhaser, hi);
build(taskers, i, j, new Phaser(phaser));
}
} else {
for (int i = lo; i < hi; ++i)
taskers[i] = () -> {

};
}
}

static class Tasker implements Runnable {
private final Phaser phaser;
private int count;

Tasker(Phaser phaser) {
this.phaser = phaser;
this.phaser.register();
}

Tasker(int count, Phaser phaser) {
this(phaser);
this.count = count;
}

@Override
public void run() {
while (!phaser.isTerminated()) { //只要 Phaser 没有终止, 各个线程的任务就会一直执行
int phase = phaser.arriveAndAwaitAdvance(); // 等待其它参与者线程到达
// TODO do something
System.out.println(String.format("【%s】: 执行完任务,当前 count = %s", Thread.currentThread().getName(), count));
}
}
}

参考