《Java并发编程实战》 非阻塞的同步

words: 3.4k    views:    time: 13min
CAS


非阻塞算法可以使多个线程在竞争共享的数据时不发生阻塞,因此它能在粒度更细的层次上进行协调,并且极大地减少调度开销。在基于锁的算法中,如果一个线程在休眠或者自旋时持有锁,那么其他线程都无法执行下去,而非阻塞算法则不会受到单个线程失败的影响。

1. 锁的劣势

通常,Jvm会对非竞争锁的获取和释放进行优化,但如果有多个线程同时请求锁,那么Jvm只能借助操作系统,将一些线程挂起然后在稍后恢复运行。当线程恢复执行时,必须等待其他线程执行完它们的时间片,才能被调度执行。

在挂起和恢复线程等过程中存在着很大的开销,并且通常存在着较常时间的中断。如果在基于锁的类中包含细粒度的操作(例如同步器类中大多数方法中只包含了少量操作),那么当在锁上存在着激烈的竞争时,调度开销与工作开销的比值会非常高。因此,锁的方式对于细粒度操作来说是一种高开销的机制,因为,花在锁竞争导致的线程切换开销比起操作本身的开销还要高。

另外,当一个线程在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,那么所有需要这个锁的线程都无法执行下去。那么,如果被阻塞的线程优先级高,而持有锁的线程优先级低,就会导致高优先级的线程被迫等待低优先级线程,称为优先级反转。

2. CAS

CAS包含3个操作数:需要读写的内存位置V,进行比较的值A,以及拟写入的新值B。当且仅当V的值等于A时,CAS才会通过原子的方式用新值B来更新V的值,否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。

CAS的含义是:我认为V的值应该为A,如果是,则将V的值更新为B,否则不修改,并返回V的实际值。CAS是一种乐观的态度,它希望能成功地执行更新操作,并且如果有另一个线程在最近一次检查后更新了该变量,那么CAS能检测到这个错误。

当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。然而,失败的线程并不会被挂起,而是被告知在这次竞争中失败,并可以再次尝试。由于一个线程在竞争CAS时不会阻塞,因此它可以决定是否重新尝试,或者执行一些恢复操作,也或者不执行任何操作。

  • 模拟一个CAS操作
SimulatedCAS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SimulatedCAS { 
private int value;

public synchronized int get(){
return value;
}

public synchronized int compareAndSwap(int expectedValue, int newValue){
int oldValue = value;
if(oldValue == expectedValue){
value = newValue;
}
return oldValue;
}

public synchronized boolean compareAndSet(int expectedValue, int newValue){
return (expectedValue == compareAndSwap(expectedValue, newValue));
}
}

典型的使用方式是:首先从V中读取值A,并根据A计算新值B,然后再通过CAS以原子方式将V中的值由A变成B。由于CAS能检测到来自其他线程的干扰,因此即使不使用锁也能够实现原子的读—改—写操作。

3. 示例:用CAS实现计数器

CasCounter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CasCounter {
private SimulatedCAS value;

public int getValue(){
return value.get();
}

public int increment(){
int v;
do{
v = value.get();
}while(v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}

CasCounter使用CAS来设置新值,如果CAS失败,那么立即重试。通常,反复重试是一种合理的策略,但在一些竞争很激烈的情况下,更好的方式是在重试之前首先等待一段时间或者回退,从而避免造成活锁问题。

CAS的主要缺点是:它要求调用者自己处理竞争问题,而在锁中能自动处理竞争问题。但是,在程序内部执行CAS不需要执行Jvm代码、系统调用或线程调度操作,省去了一些调用开销。

虽然Java语言的锁定语句比较简洁,但Jvm和操作系统在管理锁时需要完成的工作却并不简单。在实现锁定时需要遍历Jvm中一条非常复杂的代码路径,并可能导致操作系统级的锁定、线程挂起以及上下文却换等动作。在最好的情况下,锁定时至少需要一次CAS,因此,虽然在使用锁时没有用到CAS,但实际上也无法节约任何执行开销。

4. JVM对CAS的支持

Java5.0中引入了底层的支持,在int,long和对象引用等类型上都公开了CAS操作,并且Jvm把它们编译为底层硬件提供的最有效方法。在支持CAS的平台上,运行时把它们编译为相应的机器指令,如果平台不支持,那么jvm将使用自旋锁。

在原子变量类中,使用了这些底层的Jvm支持为数字类型和引用类型提供一种高效的CAS操作,并且在java.util.concurrent中大多数类的实现中都直接或间接地使用了这些原子变量类。

.Java中对于CAS的支持,主要都封装在Unsafe类中,可以参考笔记:https://shanhm1991.github.io/2017/06/06/20170606/

5. 利用CAS代替锁解决状态依赖

《线程安全类的设计》前中说过,如果一个类能保证它的底层状态是线程安全且彼此独立的,那么可以直接发布它的底层状态变量,但实际大多情况下,底层状态间都会存在一些不变性约束。这时保证单个状态的线程安全已经没有意义,需用使用锁来保证操作的原子性,以维护状态间的约束关系。

不过,有些时候可以利用CAS操作代替锁的使用,这里还是考虑之前的区间问题:区间的上下限可以动态设置,提供一个判断给定数字是否在区间内的方法

CASNumberRange
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class CASNumberRange {

private static class IntPair{

final int lower;

final int upper;

IntPair(int lower, int upper){
this.lower = lower;
this.upper = upper;
}
}

private final AtomicReference<IntPair> values = new AtomicReference<>(new IntPair(0, 0));

public void setLower(int i){
while(true){
IntPair oldV = values.get();
if(i > oldV.upper){
throw new IllegalArgumentException();
}

IntPair newV = new IntPair(i, oldV.upper);
if(values.compareAndSet(oldV, newV)){
return;
}
}
}

public void setUpper(int i){
while(true){
IntPair oldV = values.get();
if(i < oldV.lower){
throw new IllegalArgumentException();
}

IntPair newV = new IntPair(oldV.lower, i);
if(values.compareAndSet(oldV, newV)){
return;
}
}
}

public boolean isInRange(int i){
IntPair v = values.get();
return (i >= v.lower && i <= v.upper);
}
}

6. CAS与锁的性能比较

当交通拥堵时,交通信号灯能够实现更高的吞吐量,而在低拥堵时,环岛能实现更高的吞吐量。在以太网中使用的竞争机制在低通信流量的情况下运行得很好,但在高通信流量的情况下,令牌环网络中的令牌传递机制则表现得更好。

同样,在高度竞争的情况下,锁的性能将超过原子变量,但在更真实的竞争情况下,基于CAS实现的机制的性能会更好。由于CAS在遇到竞争时将立即重试,这通常是一种正确的做法,但在激烈竞争的场景下反而会导致更多的竞争。不过,任何一个真实的程序都不会除了竞争锁或原子变量,其他什么工作都不做,因此,在实际情况中,通常基于CAS实现的机制的可伸缩性要高于锁。

7. 设计非阻塞的栈

栈是最简单的链式数据结构:每个元素仅指向下一个元素,并且每个元素也只被一个元素引用。

栈是由Node元素构成的一个链表,根节点为栈顶top,每个元素中都包含一个值以及指向下一个元素的链接。push方法创建一个新的节点,该节点的next域指向当前的栈顶,然后使用CAS把这个新节点放入栈顶,如果栈顶没有变化,那么CAS就会成功,否则将会失败,然后根据新的栈顶进行再次尝试。

ConcurrentStack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ConcurrentStack<E> {

AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

public void push(E item){
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do{
oldHead = top.get();
newHead.next = oldHead;
}while(!top.compareAndSet(oldHead, newHead));
}

public E pop(){
Node<E> newHead;
Node<E> oldHead;
do{
oldHead = top.get();
if(oldHead == null){
return null;
}
newHead = oldHead.next;
}while(!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}

private static class Node<E>{
public final E item;
public Node<E> next;

public Node(E item){
this.item = item;
}
}
}

ConcurrentStack中能够保证线程安全性,因为compareAndSet像锁定机制一样,既能提供原子性,又能提供可见性。当一个线程需要修改栈的状态时,将调用compareAndSet,这个方法与写入volatile类型的变量有着相同的内存效果。

8. 设计非阻塞的链表

链表队列比栈复杂,它必须支持对头节点和尾节点的快速访问,因此,它需要单独维护头指针head和尾指针tail

考虑链表的尾部插入问题,这个操作不好简单地直接通过原子变量来实现,因为它包含两个CAS动作:将当前tail.next指向新插入的节点,以及将tail更新为新插入的节点。

如果第一个成功,第二个失败,那么队列将处于不一致的状态,即使两个CAS都成功了,那么在执行两个CAS动作之间,仍然可能有另一个线程会访问这个队列。

因此,在设计时需要考虑上面两种场景,思路是:在线程B到达时,如果发现A正在执行更新,那么线程B就可以知道有一个操作已部分完成,并且不能立即执行自己的更新操作。然后B可以等待并直到A完成更新。这样可以保证不同的线程轮流访问数据结构,并且不会造成破坏。

但如果有一个线程在更新操作中失败了,那么其他等待的线程都无法再访问到队列正确的状态了。所以,还要保证当一个线程失败时不会妨碍其他线程继续执行下去。想法是:如果B到达时发现A正在修改数据结构,那么在数据结构中应该有一些信息,使得B能够完成A的更新操作。如果B帮助A完成了更新操作,那么B就可以继续执行自己的操作,而不用等待A的操作完成,当A恢复后再试图完成其操作时,会发现B已经替它完成了。

Node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static class Node<E>{

final E item;

final AtomicReference<Node<E>> next;

public Node(E item,Node<E> next){
this.item = item;
this.next = new AtomicReference<Node<E>>(next);
}

private final Node<E> dummy = new Node<E>(null,null);

private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy);

private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy);

private boolean put(E item){
Node<E> newNode = new Node<E>(item,null);
while(true){
Node<E> curTail = tail.get();
Node<E> tailNext = curTail.next.get();
if(curTail == tail.get()){
if(tailNext != null){ //A
//队列处于中间状态,推进尾节点
tail.compareAndSet(curTail, tailNext); //B
}else{
//处于稳定状态。尝试插入新节点
if(curTail.next.compareAndSet(null, newNode)){ //C
//插入成功,尝试推进尾节点,这一步如果没来得及完成,可由别的线程帮忙
tail.compareAndSet(curTail, newNode); //D
return true;
}
}
}
}
}
}

实现的关键在于,如果队列处于稳定状态,tail.next应该为空,如果队列处于中间状态,tail.next将为非空。因此,任何线程都能通过检查tail.next来获取队列当前的状态。而且,当队列处于中间状态时,可以通过推进尾节点,使得队列恢复为稳定状态。在ConcurrentLinkedQueue中就是使用的类似算法,只是实现上略有区别。

9. ABA问题

问题描述:CAS存在一种场景,如果V的值首先由A变成B,再由B变成A,那么就算两次看到的值都是A,依然无法确定值有没有被修改过,只是,大多数情况下,这样的问题并不影响结果的正确性。

一种简单的办法是:引入一个版本号,每次同时更新值和版本号,通过版本号确定值有没有变化过。但这样就需要要同时更新两个值,可以使用AtomicStampedReference或者AtomicMarkableReference,它们都支持在两个变量上执行原子的条件更新。


参考:

  1. Copyright ©《java并发编程实战》