JUC AbstractQueuedSynchronizer

———— JDK 1.8
words: 5.9k    views:    time: 24min

队列同步器 AbstractQueuedSynchronizer(AQS),是用来构建锁或者其他同步组件的基础框架。它使用一个int变量来表示同步状态,通过CAS操作对同步状态进行修改,确保状态的改变是安全的。通过内置的FIFO(First In First Out)队列来完成资源获取线程的排队工作。

AQS同步与synchronized同步是采用的两种不同的机制:

synchronized在编译后,会在同步块的前后分别添加monitorenter和monitorexit两个字节码指令,这两个指令需要关联一个监视对象,当线程执行monitorenter指令时,需要首先获得获得监视对象的锁,即进入同步块的凭证,才能进入同步块,当线程离开同步块时,执行monitorexit指令,释放对象锁。

AQS同步中,使用一个int类型的变量state来表示当前同步块的状态。以独占式同步为例,state的有效值为0和1,其中0表示当前同步块中没有线程,1表示同步块中已经有线程在执行。当线程要进入同步块时,需要首先判断state的值是否为0,假设为0,会尝试将state修改为1,只有修改成功了之后,线程才可以进入同步块,并通过CAS操作确保同一时刻只有一个线程操作成功。当线程离开同步块时,会修改state的值为0,并唤醒等待的线程。所以在AQS同步中,线程获得锁实际上是指线程成功修改了状态变量state,而释放锁就是是指线程将状态变量置为了可修改的状态,以便其他线程可以再次尝试修改状态变量

1. 提供方法

  • AQS是基于模板方法设计的,使用者需要继承AQS并重写指定的方法,然后在提供的模板方法中调用
protected boolean tryAcquire(int arg) 独占式获取锁,即使用CAS操作设置同步状态
protected boolean tryRelease(int arg) 独占式释放锁,实际也是修改同步变量
protected int tryAcquireShared(int arg) 共享式获取锁,返回值<0表示获取失败,>0表示获取成功,=0表示获取成功,但下一个将获取失败
protected boolean tryReleaseShared(int arg) 共享式释放锁
protected boolean isHeldExclusively() 判断调用该方法的线程是否持有互斥锁
  • AQS提供的模板方法大概分为3类:独占式获取与释放锁,共享式获取与释放锁,以及查询同步队列中的等待线程情况
public final void acquire(int) 独占式获取锁,如果当前线程成功获取锁,那么方法就返回,否则会将当前线程放入同步队列等待,该方法会调用重写的tryAcquire(int arg)来判断是否可以获得锁
public final void acquireInterruptibly(int) 支持中断的独占式锁获取方式,与acquire(int)类似,但是该方法响应中断,当线程在同步队列中等待时,如果线程被中断,会抛出InterruptedException异常并返回
public final boolean tryAcquireNanos(int, long) 支持超时的独占式锁获取方式,在acquireInterruptibly(int)的基础上添加了超时控制,同时支持中断和超时,如果在指定时间内没有获得锁,会返回false,获取到则返回true
public final void acquireShared(int) 共享式获取锁,如果成功获取就返回,否则将当前线程放入同步队列等待,与独占式不同的是,同一时刻可以有多个线程获取
public final acquireSharedInterruptibly(int) 支持中断的共享式获取锁,与上面类似
public final tryAcquireSharedNanos(int, long) 支持超时的共享式获取锁,也与上面类似
public final boolean release(int) 独占式释放锁,该方法会在释放锁后,将同步队列中第一个等待节点唤醒
public final boolean releaseShared(int) 共享式释放锁
public final Collection getQueuedThreads() 获取同步队列中等待的线程集合

2. CAS操作 & unsafe支持

CAS(Compare and Swap),即比较并交换,通过底层硬件平台的特性,实现原子性操作。其涉及到3个操作数,内存地址V,旧的期望值A,修改的新值B,当且仅当内存V中的值和预期值A相同时,才将值修改为B,否则返回失败。

unsafe提供了一些基于CAS操作的api支持,参考笔记[java. Unsafe],这里的CAS操作被用来修改当前AbstractQueuedSynchronizer实例以及Node对象中的属性值,根据传入的对象以及字段的偏移地址便可以定位到属性的内存地址,然后比较并交换对应的值

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));

waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}

private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}

3. 同步队列

AQS内部依赖一个同步队列来完成同步状态的管理,当前线程获取同步状态失败时,会将当前线程以及等待状态等信息构造成一个节点Node并加入同步队列,同时阻塞当前线程,当同步状态释放时,会把队列中第一个等待节点Node1线程唤醒,使其再次尝试获取同步状态

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private transient volatile Node head;

private transient volatile Node tail;

static final class Node {

static final Node SHARED = new Node(); // 表示共享模式
static final Node EXCLUSIVE = null; // 表示独占模式

// 线程等待状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile Thread thread; // 关联的线程
volatile int waitStatus; // 关联的线程状态

// 前后结点
volatile Node prev;
volatile Node next;

Node nextWaiter; // 指向下一个在某个条件上等待的节点,或者指向SHARE节点,表明当前处于共享模式

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() {} // 作为初始化头结点或者共享标志

Node(Thread thread, Node mode) { // 只有addWaiter调用,mode=EXCLUSIVE
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

关于节点中线程的4种状态:

0 初始状态,节点在释放锁会将自己状态置为0,表示自己的后继节点不需要阻塞
CANCELED:1 获取锁异常的线程会将节点置为CANCELED,并从链表中删除
SIGNAL:-1 表示当前节点的后继节点需要阻塞,它由后继节点在尝试获取锁失败时设置
CONDITION:-2 表明当前节点在条件队列中,因为等待某个条件而被阻塞
PROPAGATE:-3 共享模式中,可以理解成是线程释放资源时没能执行唤醒,从而委托给获取资源的线程的一个状态标志

4. 主要实现

4.1. 独占锁的获取和释放

  1. 首先尝试获取锁tryAcquire,比如将状态0改为状态1,交给子类实现;

  2. 如果获取失败,则addWaiter创建节点并追加到队列尾部,从链表的角度可以将追加过程看成两个原子动作,第一步让新节点指向当前tail,并将新节点设置为tail,第二步让原先的tail节点指向新节点,然后进入acquireQueued,判断是否再次尝试获取,以及是否进行阻塞

  3. 如果阻塞是由于中断而退出,那么通过selfInterrupt()来恢复中断请求。要注意的是,一旦节点线程进入了阻塞,将不可能自己醒来,所以已经获得锁的线程在执行完成时,务必主动进行释放锁并唤醒帮忙唤醒阻塞的线程

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 首先尝试一下
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 如果尝试失败,则交给enq进行循环重试
return node;
}

// 循环尝试将当前节点添加到链表结尾,直至成功
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node())) // 设置头节点,表示当前已经获取到锁的线程
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 将自己追加到链表末尾
t.next = node;
return t;
}
}
}
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 如果前驱节点是head,则尝试获取
setHead(node); // 如果获取成功,则将当前节点置为head节点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) // 如果tryAcquire异常,则通过if分支取消
cancelAcquire(node);
}
}

// shouldParkAfterFailedAcquire检查是否应该对线程进行阻塞,如果是则调用parkAndCheckInterrupt挂起线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果前驱节点为SIGNAL,则对当前节点进行阻塞
return true;

// 否则本轮不对当前线程进行阻塞
if (ws > 0) { // 如果前驱为CANCELED,则向前删除所有状态为CANCELED的节点,再次进入for循环便走else分支
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 如果前驱不是SIGNAL或CANCELED,则将其状态置为SIGNAL,再次进入for循环将会对当前线程进行阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

// 对当前线程进行阻塞,并在退出阻塞时保留中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

对于获取锁流程,可以画一个简单的流程图进行说明:

如果tryAcquire获取锁异常,则进行取消,即将自己状态置为CANCELLED,并进行删除,如果自身是头节点则唤醒后继节点

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void cancelAcquire(Node node) {
if (node == null)
return;

node.thread = null; // 首先将node关联的线程对象置为空
Node pred = node.prev; // 删除node前面所有状态为CANCELED的节点,直至出现状态不为CANCELED的节点,记为pred
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

Node predNext = pred.next; // 将pred的后继节点记为predNext
node.waitStatus = Node.CANCELLED; // 将node状态置为CANCELLED

// 如果node是当前尾节点,尝试将尾节点置为pred,并删除node自己
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果pred不是头结点,同时线程不为空且状态为SIGNAL,那么尝试将pred的后继节点改为node的后继节点
if (pred != head
&& ((ws = pred.waitStatus) == Node.SIGNAL
|| (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 否则唤醒后继节点
}
node.next = node; // help GC
}
}

独占模式下释放锁,是没有其他线程竞争的,所以处理会简单一些。首先尝试释放锁,如果失败就直接返回(失败不是因为多线程竞争,而是线程本身就不拥有锁),而且之前获取锁之后,首先会将自身设为头节点。那么在释放时头节点一定是自己,然后先将状态改为0,并寻找需要唤醒的节点

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final boolean release(int arg) {
if (tryRelease(arg)) { // 释放成功
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) { // 将node的后继节点记为s,如果为null或状态是CANCELLED,则反过来找
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从尾部向前,找到一个不是CANCELLED的节点,记为s
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒节点关联的线程
}

这里要说一下寻找后继需要唤醒的节点的思路:由于上面追加节点的过程并非原子动作,而是分为两步,那么只能保证链表从后往前是连续的,而从前往后则有可能是断开的,因此会先尝试判断当前节点的下一个节点,如果不行,则从tail开始,向前找到最前面一个状态为非CANCELLED的节点

但是还有一个问题,如果有一个线程B,在当前线程A进行tryRelease之前获取锁失败,然后却在线程A唤醒之后将节点追加到链表,那么线程B将会错过线程A的唤醒。不过线程B在正式进入阻塞之前,会检查如果前驱节点是头节点,那么会再次尝试一下tryAcquire,如果能获取成功,则不会进入阻塞,这样就形成了闭环。而且链表结构没有变化,虽然线程B追加了一个节点,但是其在获取成功后又会将原来的头节点挤出链表

可以简要如下图所示:其中2.1在1.3之前,其它动作次序无论如何排列,都一定能得到正确的结果

下面假设一个场景,线程A、B、C同时获取锁,按照上面流程走一遍:

线程A:tryAcquire CAS(0->1)成功,继续执行代码
线程B:tryAcquire CAS(0->1)失败
线程C:tryAcquire CAS(0->1)失败
线程B:链表:(2步)head = tail = node(status=0)
线程B:链表:(2步)node(status=0) - nodeB(status=0),head = node(status=0),tail = nodeB(status=0)
线程C:链表:(2步)node(status=0) - nodeB(status=0) - nodeC(status=0),head = node(status=0),tail = nodeC(status=0)
线程B:链表:node(status=SIGNAL) - nodeB(status=0) - nodeC(status=0),head = node(status=SIGNAL),tail = nodeC(status=0)
线程B:park阻塞
线程C:链表:node(status=SIGNAL) - nodeB(status=SIGNAL) - nodeC(status=0),head = node(status=SIGNAL),tail = nodeC(status=0)
线程C:park阻塞
线程A:tryRelease CAS(1->0)
线程A:链表:node(status=0) - nodeB(status=SIGNAL) - nodeC(status=0),head = node(status=0),tail = nodeC(status=0)
线程A:unpark(nodeB.thread)
线程B:tryAcquire CAS(0->1)成功
线程B:链表:nodeB(status=SIGNAL) - nodeC(status=0),head = nodeB(status=SIGNAL),tail = nodeC(status=0)
线程B:tryRelease CAS(1->0)
线程B:链表:nodeB(status=0) - nodeC(status=0),head = nodeB(status=0),tail = nodeC(status=0)
线程B:unpark(nodeC.thread)
线程C:tryAcquire CAS(0->1)成功
线程C:链表:head = tail = nodeC(status=0)
线程C:tryRelease CAS(1->0)

4.2. 共享锁获取和释放

获取共享锁大体与获取独占锁的过程类似,即如果获取失败了就向链表中追加节点,只是类型为SHARED,然后如果再次尝试失败就将前驱置为SIGNAL,并进入阻塞,区别在于如果获取成功了,除了将自己置为头节点之外,还可能尝试对后继节点进行唤醒,并且唤醒具有传递性,即后面的节点同样会尝试向后进行唤醒。当然这样做主要还是考虑到共享锁在释放时的竞争问题,可能会丢失一些唤醒,具体下面会说明

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取成功后会检查是否需要向后进行唤醒
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 复用独占锁的处理
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记住之前的head
setHead(node);

// tryAcquireShared > 0 || setHead之前的head状态<0 || 当前的head即自己状态<0
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 除非遇到独占锁
doReleaseShared();
}
}

这里先不看具体实现,想一下共享锁的实现需要考虑的几个问题:

首先由于是共享锁,即允许有多个线程同时持有资源,那么在释放时必然存在竞争的可能。即如果同时多个线程进行释放,那么它们在唤醒时可能看到的是同一个head,也就可能释放了多个资源,但只唤醒了一个等待节点。

于是,doReleaseShared中首先通过CAS与轮询,保证了就算有多个线程同时检测到状态为SIGNALhead,也只会有一个线程可以CAS成功,即head(SIGNAL -> 0),然后去进行唤醒。其它线程全部打回,然后下次重新检测head。如果还能检测状态为SIGNALhead,那一定是唤醒后的节点重置了head,这样就将出现在同一个节点上的唤醒操作错开了。

但是,在CAS操作head(SIGNAL -> 0)与被唤醒的线程重新设置head之间,可能其它的线程对head状态进行了检测,并看到head状态为0,这样不管它做不做唤醒的操作都没有意义,因为即便唤醒也必然是与CAS操作成功的线程唤醒的同一个节点,也就是说这些线程在释放资源的时候将不能唤醒等待的线程,于是,希望能够将唤醒操作委托给成功获取资源的线程,但是如何让获取资源的线程感知呢?

同样是通过CAS操作head(0 -> PROPAGATE)如果获取资源的线程在setHead之后,检测到之前的head状态为PROPAGATE,那么说明在它被唤醒之后,到setHead之间,一定有其它线程释放了资源,但是却没能执行唤醒操作,所以自己有义务帮忙唤醒后面的节点

代码实现中比较保守,其实就是宁愿存在一些不必要的唤醒也要保证队列不可能被阻塞住。网上有很多讨论关于PROPAGATE状态的文章,即如果没有这个状态则可能造成阻塞,但是我根据代码的逻辑反复走读,感觉即便PROPAGATE状态也不会造成阻塞(要是有同学觉得不对,一定帮忙指导一下),下面我简单描述一下这样说的理由:

首先,如果线程释放资源存在阻塞节点,那么head状态必然是-1。所以主要就看next节点的状态,如果它也是-1,那么它后面必然还有阻塞的节点,那么不管怎样,next节点在唤醒时一定会唤醒它的后继节点,因为它在setHead之后,将会看到自己当前的状态为-1,然后将状态改为0并唤醒下一个节点,即便没有检测到状态-1,也必然是由其它释放资源的线程帮忙做了这件事

下面再看下next的状态为0的情况,假设所有的资源都已被持有,并且只有一个阻塞节点,然后有一个线程释放资源并唤醒阻塞节点,阻塞节点被唤醒后将资源持有,但是在其setHead之前刚好有个线程D来申请资源,那么线程D将申请失败,而向链表中添加节点:

下面还有两个动作比较关键:首先线程 D 在进入阻塞之前,会先将前面 next 状态置为 -1,然后再次进行检测如果发现前驱节点就是 head ,那么还会尝试获取一次。另外,当前获取资源的线程在 setHead 之后,如果看到自己的状态为 -1,那么便会尝试唤醒后继节点。所以,如果想看到线程 D 被阻塞,只能是发生在 setHead 之后,但这时确实也没有资源可获取,本身唤醒也应该阻塞直到有资源释放。而且这整个期间,只要有任何别的线程释放了资源,不管以怎样的动作次序,线程 D 都不会被阻塞住,可以将所有的原子动作都列出来然后逐个尝试,这里不再赘述。

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 链表中至少有两个以上节点,否则没有阻塞线程,不需要唤醒
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // 同时检测到head状态为SIGNAL,但是竞争失败,于是回头重试
unparkSuccessor(h); // 只有当head状态为SIGNAL时,后面才有线程阻塞,才需要唤醒
}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 同时检测到head状态为0,但是委托失败了,也回头重试

// 除非检测到状态PROPAGATE,说明其它线程已经委托过了,所以自己可以放心去了
}

// 如果head发生变化,则重新检查新的head
if (h == head)
break;
}
}

可以简单画一张流程图,来示意一下线程被唤醒后的主要操作,以及执行唤醒的具体过程:

4.3. 中断

先说一下LockSupport.park(this)是可以因为中断而退出的,而在AQS中是否支持中断就是看对于中断的处理,如果支持中断,就是检测到线程有中断申请就立即抛出InterruptedException而退出,并在退出之前执行下清理,即cancelAcquire,否则就像上面一样只是记录下中断请求,然后一直等到获取资源成功后,再恢复线程的中断。

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

4.4. 超时

超时是在中断的基础上加了一层时间限制,当然首先也是依赖于LockSupport.parkNanos(this, nanosTimeout)的支持,如果检测到超时,则直接放弃获取锁,执行清理cancelAcquire

java.util.concurrent.locks.AbstractQueuedSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}


参考:

  1. https://www.zhihu.com/question/50724462
  2. https://blog.csdn.net/anlian523/article/details/106319538
  3. https://blog.csdn.net/ThinkWon/article/details/102469112
  4. https://zhuanlan.zhihu.com/p/91408261
  5. https://cloud.tencent.com/developer/article/1706109