队列同步器 AbstractQueuedSynchronizer(AQS),是用来构建锁或者其他同步组件的基础框架。它使用一个int变量来表示同步状态,通过CAS操作对同步状态进行修改,确保状态的改变是安全的。通过内置的FIFO(First In First Out)队列来完成资源获取线程的排队工作。
AQS同步与synchronized同步是采用的两种不同的机制:
synchronized在编译后,会在同步块的前后分别添加monitorenter和monitorexit两个字节码指令,这两个指令需要关联一个监视对象,当线程执行monitorenter指令时,需要首先获得获得监视对象的锁,即进入同步块的凭证,才能进入同步块,当线程离开同步块时,执行monitorexit指令,释放对象锁。
AQS同步中,使用一个int类型的变量state来表示当前同步块的状态。以独占式同步为例,state的有效值为0和1,其中0表示当前同步块中没有线程,1表示同步块中已经有线程在执行。当线程要进入同步块时,需要首先判断state的值是否为0,假设为0,会尝试将state修改为1,只有修改成功了之后,线程才可以进入同步块,并通过CAS操作确保同一时刻只有一个线程操作成功。当线程离开同步块时,会修改state的值为0,并唤醒等待的线程。所以在AQS同步中,线程获得锁实际上是指线程成功修改了状态变量state,而释放锁就是是指线程将状态变量置为了可修改的状态,以便其他线程可以再次尝试修改状态变量
1. 提供方法
AQS是基于模板方法设计的,使用者需要继承AQS并重写指定的方法,然后在提供的模板方法中调用
protected boolean tryAcquire(int arg)
独占式获取锁,即使用CAS操作设置同步状态
protected boolean tryRelease(int arg)
独占式释放锁,实际也是修改同步变量
protected int tryAcquireShared(int arg)
共享式获取锁,返回值<0表示获取失败,>0表示获取成功,=0表示获取成功,但下一个将获取失败
protected boolean tryReleaseShared(int arg)
共享式释放锁
protected boolean isHeldExclusively()
判断调用该方法的线程是否持有互斥锁
AQS提供的模板方法大概分为3类:独占式获取与释放锁,共享式获取与释放锁,以及查询同步队列中的等待线程情况
public final void acquire(int)
独占式获取锁,如果当前线程成功获取锁,那么方法就返回,否则会将当前线程放入同步队列等待,该方法会调用重写的tryAcquire(int arg)来判断是否可以获得锁
public final void acquireInterruptibly(int)
支持中断的独占式锁获取方式,与acquire(int)类似,但是该方法响应中断,当线程在同步队列中等待时,如果线程被中断,会抛出InterruptedException异常并返回
public final boolean tryAcquireNanos(int, long)
支持超时的独占式锁获取方式,在acquireInterruptibly(int)的基础上添加了超时控制,同时支持中断和超时,如果在指定时间内没有获得锁,会返回false,获取到则返回true
public final void acquireShared(int)
共享式获取锁,如果成功获取就返回,否则将当前线程放入同步队列等待,与独占式不同的是,同一时刻可以有多个线程获取
public final acquireSharedInterruptibly(int)
支持中断的共享式获取锁,与上面类似
public final tryAcquireSharedNanos(int, long)
支持超时的共享式获取锁,也与上面类似
public final boolean release(int)
独占式释放锁,该方法会在释放锁后,将同步队列中第一个等待节点唤醒
public final boolean releaseShared(int)
共享式释放锁
public final Collection getQueuedThreads()
获取同步队列中等待的线程集合
2. CAS操作 & unsafe支持 CAS(Compare and Swap),即比较并交换,通过底层硬件平台的特性,实现原子性操作。其涉及到3个操作数,内存地址V,旧的期望值A,修改的新值B,当且仅当内存V中的值和预期值A相同时,才将值修改为B,否则返回失败。
unsafe提供了一些基于CAS操作的api支持,参考笔记[java. Unsafe ],这里的CAS操作被用来修改当前AbstractQueuedSynchronizer
实例以及Node
对象中的属性值,根据传入的对象以及字段的偏移地址便可以定位到属性的内存地址,然后比较并交换对应的值
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } private final boolean compareAndSetHead (Node update) { return unsafe.compareAndSwapObject(this , headOffset, null , update); } private final boolean compareAndSetTail (Node expect, Node update) { return unsafe.compareAndSwapObject(this , tailOffset, expect, update); } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); } private static final boolean compareAndSetNext (Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } private static final boolean compareAndSetWaitStatus (Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
3. 同步队列 AQS内部依赖一个同步队列来完成同步状态的管理,当前线程获取同步状态失败时,会将当前线程以及等待状态等信息构造成一个节点Node并加入同步队列,同时阻塞当前线程,当同步状态释放时,会把队列中第一个等待节点Node1线程唤醒,使其再次尝试获取同步状态
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private transient volatile Node head;private transient volatile Node tail;static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile Thread thread; volatile int waitStatus; volatile Node prev; volatile Node next; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() {} Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
关于节点中线程的4种状态:
0
初始状态,节点在释放锁会将自己状态置为0,表示自己的后继节点不需要阻塞
CANCELED:1
获取锁异常的线程会将节点置为CANCELED,并从链表中删除
SIGNAL:-1
表示当前节点的后继节点需要阻塞,它由后继节点在尝试获取锁失败时设置
CONDITION:-2
表明当前节点在条件队列中,因为等待某个条件而被阻塞
PROPAGATE:-3
共享模式中,可以理解成是线程释放资源时没能执行唤醒,从而委托给获取资源的线程的一个状态标志
4. 主要实现 4.1. 独占锁的获取和释放
首先尝试获取锁tryAcquire
,比如将状态0改为状态1,交给子类实现;
如果获取失败,则addWaiter创建节点并追加到队列尾部,从链表的角度可以将追加过程看成两个原子动作,第一步让新节点指向当前tail,并将新节点设置为tail,第二步让原先的tail节点指向新节点 ,然后进入acquireQueued,判断是否再次尝试获取,以及是否进行阻塞
如果阻塞是由于中断而退出,那么通过selfInterrupt()来恢复中断请求。要注意的是,一旦节点线程进入了阻塞,将不可能自己醒来,所以已经获得锁的线程在执行完成时,务必主动进行释放锁并唤醒帮忙唤醒阻塞的线程
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ selfInterrupt(); } } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
对于获取锁流程,可以画一个简单的流程图进行说明:
如果tryAcquire获取锁异常,则进行取消,即将自己状态置为CANCELLED,并进行删除,如果自身是头节点则唤醒后继节点
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
独占模式下释放锁,是没有其他线程竞争的,所以处理会简单一些。首先尝试释放锁,如果失败就直接返回(失败不是因为多线程竞争,而是线程本身就不拥有锁),而且之前获取锁之后,首先会将自身设为头节点。那么在释放时头节点一定是自己,然后先将状态改为0,并寻找需要唤醒的节点
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
这里要说一下寻找后继需要唤醒的节点的思路 :由于上面追加节点的过程并非原子动作,而是分为两步,那么只能保证链表从后往前是连续的,而从前往后则有可能是断开的,因此会先尝试判断当前节点的下一个节点,如果不行,则从tail开始,向前找到最前面一个状态为非CANCELLED的节点
但是还有一个问题,如果有一个线程B,在当前线程A进行tryRelease之前获取锁失败,然后却在线程A唤醒之后将节点追加到链表,那么线程B将会错过线程A的唤醒。 不过线程B在正式进入阻塞之前,会检查如果前驱节点是头节点,那么会再次尝试一下tryAcquire,如果能获取成功,则不会进入阻塞,这样就形成了闭环。而且链表结构没有变化,虽然线程B追加了一个节点,但是其在获取成功后又会将原来的头节点挤出链表
可以简要如下图所示:其中2.1在1.3之前,其它动作次序无论如何排列,都一定能得到正确的结果
下面假设一个场景,线程A、B、C同时获取锁,按照上面流程走一遍:
线程A:tryAcquire CAS(0->1)成功,继续执行代码 线程B:tryAcquire CAS(0->1)失败 线程C:tryAcquire CAS(0->1)失败 线程B:链表:(2步)head = tail = node(status=0) 线程B:链表:(2步)node(status=0) - nodeB(status=0) ,head = node(status=0),tail = nodeB(status=0) 线程C:链表:(2步)node(status=0) - nodeB(status=0) - nodeC(status=0) ,head = node(status=0),tail = nodeC(status=0) 线程B:链表:node(status=SIGNAL) - nodeB(status=0) - nodeC(status=0) ,head = node(status=SIGNAL),tail = nodeC(status=0) 线程B:park阻塞 线程C:链表:node(status=SIGNAL) - nodeB(status=SIGNAL) - nodeC(status=0) ,head = node(status=SIGNAL),tail = nodeC(status=0) 线程C:park阻塞 线程A:tryRelease CAS(1->0) 线程A:链表:node(status=0) - nodeB(status=SIGNAL) - nodeC(status=0) ,head = node(status=0),tail = nodeC(status=0) 线程A:unpark(nodeB.thread) 线程B:tryAcquire CAS(0->1)成功 线程B:链表:nodeB(status=SIGNAL) - nodeC(status=0) ,head = nodeB(status=SIGNAL),tail = nodeC(status=0) 线程B:tryRelease CAS(1->0) 线程B:链表:nodeB(status=0) - nodeC(status=0) ,head = nodeB(status=0),tail = nodeC(status=0) 线程B:unpark(nodeC.thread) 线程C:tryAcquire CAS(0->1)成功 线程C:链表:head = tail = nodeC(status=0) 线程C:tryRelease CAS(1->0)
4.2. 共享锁获取和释放 获取共享锁大体与获取独占锁的过程类似,即如果获取失败了就向链表中追加节点,只是类型为SHARED,然后如果再次尝试失败就将前驱置为SIGNAL,并进入阻塞,区别在于如果获取成功了,除了将自己置为头节点之外,还可能尝试对后继节点进行唤醒,并且唤醒具有传递性,即后面的节点同样会尝试向后进行唤醒。当然这样做主要还是考虑到共享锁在释放时的竞争问题,可能会丢失一些唤醒,具体下面会说明
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
这里先不看具体实现,想一下共享锁的实现需要考虑的几个问题:
首先由于是共享锁,即允许有多个线程同时持有资源,那么在释放时必然存在竞争 的可能。即如果同时多个线程进行释放,那么它们在唤醒时可能看到的是同一个head
,也就可能释放了多个资源,但只唤醒了一个等待节点。
于是,doReleaseShared
中首先通过CAS与轮询,保证了就算有多个线程同时检测到状态为SIGNAL
的head
,也只会有一个线程可以CAS成功,即head(SIGNAL -> 0)
,然后去进行唤醒。其它线程全部打回,然后下次重新检测head。如果还能检测状态为SIGNAL
的head
,那一定是唤醒后的节点重置了head,这样就将出现在同一个节点上的唤醒操作错开了。
但是,在CAS操作head(SIGNAL -> 0)
与被唤醒的线程重新设置head之间,可能其它的线程对head状态进行了检测,并看到head状态为0,这样不管它做不做唤醒的操作都没有意义,因为即便唤醒也必然是与CAS操作成功的线程唤醒的同一个节点,也就是说这些线程在释放资源的时候将不能唤醒等待的线程,于是,希望能够将唤醒操作委托给成功获取资源的线程 ,但是如何让获取资源的线程感知呢?
同样是通过CAS操作head(0 -> PROPAGATE)
,如果获取资源的线程在setHead
之后,检测到之前的head状态为PROPAGATE
,那么说明在它被唤醒之后,到setHead
之间,一定有其它线程释放了资源,但是却没能执行唤醒操作,所以自己有义务帮忙唤醒后面的节点 。
代码实现中比较保守,其实就是宁愿存在一些不必要的唤醒也要保证队列不可能被阻塞住。网上有很多讨论关于PROPAGATE
状态的文章,即如果没有这个状态则可能造成阻塞,但是我根据代码的逻辑反复走读,感觉即便PROPAGATE
状态也不会造成阻塞(要是有同学觉得不对,一定帮忙指导一下),下面我简单描述一下这样说的理由:
首先,如果线程释放资源存在阻塞节点,那么head状态必然是-1
。所以主要就看next
节点的状态,如果它也是-1
,那么它后面必然还有阻塞的节点,那么不管怎样,next
节点在唤醒时一定会唤醒它的后继节点,因为它在setHead
之后,将会看到自己当前的状态为-1
,然后将状态改为0
并唤醒下一个节点,即便没有检测到状态-1
,也必然是由其它释放资源的线程帮忙做了这件事
下面再看下next的状态为0的情况,假设所有的资源都已被持有,并且只有一个阻塞节点,然后有一个线程释放资源并唤醒阻塞节点,阻塞节点被唤醒后将资源持有,但是在其setHead
之前刚好有个线程D来申请资源,那么线程D将申请失败,而向链表中添加节点:
下面还有两个动作比较关键:首先线程 D 在进入阻塞之前,会先将前面 next 状态置为 -1
,然后再次进行检测如果发现前驱节点就是 head
,那么还会尝试获取一次。另外,当前获取资源的线程在 setHead
之后,如果看到自己的状态为 -1
,那么便会尝试唤醒后继节点。所以,如果想看到线程 D 被阻塞,只能是发生在 setHead
之后,但这时确实也没有资源可获取,本身唤醒也应该阻塞直到有资源释放。而且这整个期间,只要有任何别的线程释放了资源,不管以怎样的动作次序,线程 D 都不会被阻塞住,可以将所有的原子动作都列出来然后逐个尝试,这里不再赘述。
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); }else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
可以简单画一张流程图,来示意一下线程被唤醒后的主要操作,以及执行唤醒的具体过程:
4.3. 中断 先说一下LockSupport.park(this)
是可以因为中断而退出的,而在AQS中是否支持中断就是看对于中断的处理,如果支持中断,就是检测到线程有中断申请就立即抛出InterruptedException
而退出,并在退出之前执行下清理,即cancelAcquire
,否则就像上面一样只是记录下中断请求,然后一直等到获取资源成功后,再恢复线程的中断。
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
4.4. 超时 超时是在中断的基础上加了一层时间限制,当然首先也是依赖于LockSupport.parkNanos(this, nanosTimeout)
的支持,如果检测到超时,则直接放弃获取锁,执行清理cancelAcquire
java.util.concurrent.locks.AbstractQueuedSynchronizer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return true ; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
参考:
https://www.zhihu.com/question/50724462
https://blog.csdn.net/anlian523/article/details/106319538
https://blog.csdn.net/ThinkWon/article/details/102469112
https://zhuanlan.zhihu.com/p/91408261
https://cloud.tencent.com/developer/article/1706109