《Java并发编程实战》 同步工具类的设计
words: 5.5k views: time: 21min创建状态依赖类最简单的方法通常是在类库中现有状态依赖类的基础上进行构造,如果类库不支持,则可以使用Java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显示的Condition
对象以及AbstractQueuedSynchronizer
框架。
在单线程的程序中调用方法时,如果基于某个状态的前提条件未得到满足,那么这个条件永远无法成真。而在并发程序中,基于状态的条件可能会由于其他线程的操作而改变。
1. 可阻塞的状态依赖操作
1 | acquire lock on object state |
这种加锁模式有些不寻常,因为锁是在操作的执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁来保护,从而使它们在测试前提条件的同时保持不变。如果前提条件尚未满足,就必须释放锁,以便其他线程可以修改对象的状态,否则前提条件永远无法成真,而再次测试前提条件之前,必须重新获得锁。
2. 示例:有界缓存
基于数组实现一个循环缓存BaseBoundedBuffer
,其缓存状态buf
,tail
,head
,count
均由内置锁来保护,并对子类隐藏。isFull()
和isEmpty()
用来提供同步的条件检查,doput
和dotake
方法用来提供同步的put
和take
操作,以便我们可以在子类中执行操作前先检查条件。
1 | public abstract class BaseBoundedBuffer<V> { |
2.1. 反馈失败
1 | public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V>{ |
由于put
和take
中采用先检查再运行的策略,所以尽管父类中的检查和执行两个操作都已经使用同步,但在子类操作上依然需要同步,这里利用了锁的可重入性。
虽然这样直接将前提条件失败转化为异常抛给调用者处理的实现方式很简单,但是并不能抵消在使用时的复杂性。调用者必须做好捕获异常的准备,并且每次缓存操作时都需要重试。
2.2. 轮询重试
1 | public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V>{ |
SleepyBoundedBuffer
通过轮询的方式来进行重试,如果检测成功,则执行操作,而如果检测失败,则休眠一段时间,以便其他线程能够访问缓存,当醒来时再进行尝试。
.要注意的是,检测和执行需要通过锁来保护,但是休眠的时候不可以持有锁。以前经常会提到sleep与wait的区别,这里就能体现出来了,线程在sleep休眠时不会释放它持有的锁,因此将会造成死锁,使其他线程永远获取不到锁来使条件检测变为真。
SleepyBoundedBuffer
简化了对缓存的使用,调用者无须处理失败和重试,但需要处理InterruptedException
,这也是它提供的一个取消策略,当等待条件非真而休眠时,如果中断,则抛出InterruptedException
。另外,SleepyBoundedBuffer
需要选择一个合适的休眠时间,以便在响应性与cpu使用率之间达到平衡,比如休眠越短,则响应性越高,但cpu消耗也越高。
3. 条件队列
通过轮询与休眠来实现阻塞操作的方式需要付出大量的努力,如果存在某种挂起线程的方法(并且挂起前释放锁),并且这种方法能够确保当条件为真时线程立即醒来(响应性),那么将极大地简化实现工作,而这正是条件队列实现的功能。
条件队列的名字来源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。因此,条件队列中的元素是一个个正在等待相关条件的线程,而传统队列中的元素是一个个数据。
正如每个java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,我们称为内置条件队列,比如Object
中的wait
,notify
,notifyAll
方法就构成了内部条件队列的API。Object.wait
会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当挂起的线程醒来时,他将在返回前重新获取锁。
2.3. 改进:使用条件队列
1 | public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { |
BoundedBuffer
不仅简单易用,而且实现了清晰的状态依赖管理,与使用休眠的SleepyBoundedBuffer
相比,条件队列并没有改变原来的语义,它只是在cpu效率、上下文切换开销、响应性等方面进行了优化。
2.4. 改进:优化条件队列使用
BoundedBuffer
中put
和take
采用的通知机制比较保守,每当将一个对象放入缓存或者从缓存中移走一个对象时,就执行一次通知。实际上,仅当缓存从空变为非空或者从满变为非满,而且当put
或take
影响到这些状态转换时,才需要发出通知唤醒线程,即有条件的通知。但是,虽然这样可以提升性能,但很难被正确的实现,在使用时需要谨慎。
1 | public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { |
4. 条件谓词
要正确的使用条件队列,关键是找出对象在哪个条件谓词上等待。条件谓词是使某个操作成为状态依赖操作的前提,比如在BoundedBuffer
中,take
的条件谓词是缓存非空,而put
的条件谓词是缓存非满。因此,条件谓词通常是由类中各个状态变量构成的表达式。
在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个状态变量,这些状态变量由一个锁来保护,因此,在测试条件谓词之前必须先持有这个锁,并且锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。
5. 示例:阀门类
之前介绍的闭锁有两种状态:关闭和打开,但是一旦打开了就无法再关闭,借助条件队列,这里可以设计一个可以重新关闭的闭锁
1 | public class ThreadGate { |
ThreadGate
可以打开和关闭阀门,并提供一个await
方法,该方法能一直阻塞直到阀门打开。在await
中使用的条件谓词比测试isOpen
复杂得多,这是必需的,因为如果当阀门打开时有N个线程正在等待它,那么这些线程都应该允许通过。然而,如果阀门在打开后又非常快速的关闭了,并且await
方法只检查isOpen
,那么线程可能无法释放。因此,每次阀门打开时,递增一个generation
计数器,如果阀门现在是打开的,或者现在阻塞之后阀门打开过,那么线程都可以通过await
。
.await()
中,arrivalGeneration
表示线程到达阀门被阻塞挂起时阀门的打开次数。当线程收到通知最终被唤醒并获取锁后,如果阀门状态是打开的那么直接通过,否则再看一下当前阀门的打开次数generation
,并与被挂起时阀门的打开次数进行比较。
.如果不等,则表示线程在被挂起后,阀门被打开过,虽然当前阀门状态又是关闭的,所以之前阻塞的线程应该通过。这样通过比较arrivalGeneration
与generation
,确保了每个在wait
上阻塞的线程在收到阀门打开通知后,最终都能通过阀门,即便在发出通知后立马又重新关闭。
6. 过早唤醒问题
内置条件队列可以与多个条件谓词一起使用,当一个线程由于调用notifyAll
而醒来时,并不意味着该线程正在等待的条件谓词以已经变成真了。举个例子,如果面包机与咖啡机如果共用一个铃声,那么当响铃后,你并不能确定就是面包烤好了。
在BoundedBuffer
中,条件队列就与缓存非空和缓存非满两个条件谓词相关,设想当一个线程put
成功之后使缓存满了,它希望唤醒的是在take
上等待的线程,但是它同时会唤醒在put
上等待的其他线程,而所有这些线程将一起重新竞争锁,如果put
上的线程竞争成功了,它又将重新挂起。当竞争非常激烈时,将导致很大的上下文切换开销。
另一种情况是,在发出通知的线程调用notifyAll
时,条件谓词为真,但在重新获取锁时又再次变成假。因为在线程被唤醒到wait
重新获取锁这段时间里,可能有其他线程已经获取了这个锁,并修改了条件谓词的状态。
类似的,竞争失败的线程又会重新挂起,而导致不必要的上下文切换开销。notify
是可以从等待队列中只选一个线程进行唤醒,但在BoundedBuffer
中行不通,因为共用了条件谓词,可能导致通知丢失,比如put
的线程希望唤醒一个take
上阻塞的线程,却被另一个在put
上阻塞的线程收到了,而收到唤醒的put
线程又只能重新挂起。最后只能等待重新再来一个新的请求take
操作的线程来解除这个困境。
所以,内置条件队列存在一些缺陷。因为每个内置锁都只能有一个关联的条件队列,因而在像BoundedBuffer
这种类中,多个线程只能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下暴露条件队列对象。
7. Condition对象
1 | public interface Condition{ |
正如Lock
比内置锁提供了更丰富的功能,Condition同样也比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待队列、并支持可中断等待,限时等待,以及公平的或非公平的队列操作。
每个Lock
可以有任意数量的Condition
对象,它们继承了Lock
对象的公平性。
2.5. 改进:使用Condition
1 | public class ConditionBoundedBuffer<T> { |
ConditionBoundedBuffer
的行为与BoundedBuffer
相同,但分析使用多个Condition
的类,比分析使用单一内部队列加多个条件谓词的类要简单。
通过将两个条件谓词分开并放到两个等待线程集中,Condition
使其更容易满足单次通知的需求。signal
比signalAll
更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。与内置锁和条件队列一样,当使用显示的Lock
和Condition
时,也必须满足锁、条件谓词和条件变量之间的三元关系。在条件谓词中包含的变量必须由Lock
来保护,并且在检查条件谓词以及调用await
和signal
时,必须持有Lock
对象。
8. AbstractQueuedSynchronizer
在ReentrantLock
和Semaphore
这两个接口之间存在许多共同点,这两个类都可以用做一个阀门,即每次只允许一定数量的线程通过。或许我们会认为Semaphore
是基于ReentrantLock
实现的,或者认为ReentrantLock
实际上是带有一个许可的Semaphore
,这些实现方式都是可行的。
1 | public class SemaphoreOnLock { |
但事实上,它们的实现都是基于AbstractQueuedSynchronizer
,这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,包括CountDownLatch
、ReentrantReadWriteLock
、SynchronousQueue
和FutureTask
。
在SemaphoreOnLock
中,获取许可的操作可能在两个时刻阻塞:当锁保护信号量状态时,以及当许可不可用时。而在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。AQS在设计时充分考虑了可伸缩性,java.util.concurrent中所有基于AQS构建的同步器都能获得这个优势。
如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理一个整数状态信息,可以通过getState
,setState
以及compareAndSetState
等protected类型方法来进行操作,这个整数可以用于表示任意状态。
在ReentrantLock
中,它用来表示所有者线程已经重复获取该锁的次数,Semaphore
用它来表示剩余的许可数量,FutureTask
用它来表示任务的状态。在同步器类中还可以自行管理一些额外的状态变量,如ReentrantLock
保存了锁的当前所有者信息,这样就能区分某个获取操作是重入还是竞争的。事实上,java.util.concurrent中的所有同步器类都没有直接扩展AQS,而是都将它们的相应功能委托给私有的AQS子类来实现。
- AQS中获取操作和释放操作的标准形式:
1 | boolean acquire() throws InterruptedException{ |
8.1. 示例:一个简单的闭锁
1 | public class OneShotLatch { |
OneShotLatch
中,AQS用来管理闭锁状态:关闭(0)或者打开(1)。
await
方法调用AQS的acquireSharedInterruptibly
,然后接着调用tryAcquireShared
。在tryAcquireShared
的实现中必须返回一个值来表示该获取操作能否执行。如果之前已经打开了闭锁,那么tryAcquireShared
将返回成功并允许线程通过,否则就会返回一个表示获取操作失败的值。acquireSharedInterruptibly
处理失败的方式,是把这个线程放入等待线程队列中。类似的,signal
将调用releaseShared
,接下来又会调用tryReleaseShared
。在tryReleaseShared
中将无条件地把闭锁的状态设置为打开,表示该同步类处于完全释放的状态。
.关于AQS的源码分析,可以参考笔记:https://shanhm1991.github.io/2018/04/06/20180406/
8.2. java.util.concurrent同步器类中的AQS
8.2.1. ReentrantLock
ReentrantLock
只支持独占式的获取操作,因此它实现了tryAcquire
、tryRelease
和isHeldExclusively
。
ReentrantLock
将AQS的同步状态用来保存锁获取操作的次数,并且维护一个owner
变量来保存当前所有者线程的标识符,在tryAcquire
中用来区分获取操作是重入还是竞争的,只有在当前线程刚获取到锁,或者正要释放锁的时候,才会修改这个变量。然后在tryRelease
中检查owner
域,从而确保当前线程在执行unlock
操作之前已经获取了锁
1 | protected boolean tryAcquire(int ignored){ |
当一个线程尝试获取锁时,tryAcquire
将首先检查锁的状态。如果未持有锁,那么它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此,tryAcquire
中使用compareAndSetState
来更新状态。如果锁已经被持有,并且如果当前线程是锁的持有者,那么获取计数会递增。如果当前线程不是锁的拥有者,那么获取操作将失败。
8.2.2. Semaphore与CountDownLatch
Semaphore
将AQS用于保存当前可用许可的数量
1 | protected int tryAcquireShared(int acquires){ |
tryAcquireShared
方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared
会通过compareAndSetState
方式来降低许可的计数。如果这个操作成功,那么将返回一个值表示获取操作的成功。
当没有足够的许可,或者当tryAcquireShared
可以通过原子方式来更新许可计数以响应获取操作时,while
循环将终止。虽然compareAndSetState
的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中将有一个会变为真。
CountDownLatch
使用AQS的方式很相似,在同步状态中保存的是当前的计数值。countDown
方法调用release
,从而计数值递减,当计数值为零时,解除所有等待线程的阻塞。await
调用acquire
,当计数器为零时acquire
立即返回,否则阻塞。
8.2.3. FutureTask
Future.get()
的语义也类似于闭锁:如果发生了某个事件,那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
在FutureTask
中,AQS同步状态被用来保存任务的状态,如:正在执行、已完成或已取消。另外,FutureTask
还维护一些额外的状态变量,用来保存计算结果或者抛出的异常,以及一个指向正在执行计算任务的线程的引用,因此,如果任务取消,则可以中断该线程。
8.2.4. ReentrantReadWriteLock
ReentrantReadWriteLock
使用一个16位的状态来表示写入锁的计数,以及另一个16位的状态来表示读取锁的计数,由单个AQS来管理,然后在读取锁上的操作使用共享的方式获取与释放,在写入锁上的操作将使用独占的方式获取与释放。
AQS在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock
中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程就会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。
参考:
- Copyright ©《java并发编程实战》