0%

深入理解CAS

CAS (Compare-And-Swap) 是一种无锁的非阻塞算法的实现。CAS 包含了 3 个操作数:1. 需要读写的内存值 V,2. 进行比较的值 A 3. 拟写入的新值 B。当且仅当 V 的值等于 A 时,CAS 通过原子方式用新值 B 来更新 V 的值,否则不会执行任何操作。

CAS也是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问。

Java中 AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 的实例各自提供对相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。

AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray 类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方 面也引人注目,这对于普通数组来说是不受支持的。

它们都是通过Unsafe这个类实现的原子操作,例如AtomicInteger部分代码(java8):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;

public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

Unsafe

下面演示了如何使用Unsafe实现CAS操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Test_UnSafe {

int i = 0;
private static Test_UnSafe t = new Test_UnSafe();
public static void main(String[] args) throws Exception {

Field unsafeField = Unsafe.class.getDeclaredFields()[0];
unsafeField.setAccessible(true);
Unsafe unsafe = (Unsafe) unsafeField.get(null);

Field f = Test_UnSafe.class.getDeclaredField("i");
long offset = unsafe.objectFieldOffset(f);
System.out.println(offset);

boolean success = unsafe.compareAndSwapInt(t, offset, 0, 1);
System.out.println(success);
System.out.println(t.i);
}
}

getAndAddInt实现

1
2
3
4
5
6
7
8
9
10
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

从上面的代码中可以看到,如果cas操作失败,则会重新读取数据,并再次进行cas操作。

jdk8u: unsafe.cpp

下面是Unsafe类中compareAndSwapInt的实现,可以看到它是通过Atomic类的cmpxchg方法实现的

1
2
3
4
5
6
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

linux_x86实现

下面是linux_x86实现的cmpxchg方法实现:

jdk8u: atomic_linux_x86.inline.hpp 93行

1
2
3
4
5
6
7
8
inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}

jdk8u: os.hpp is_MP():是否是多核处理器( Multi Processor)

1
2
3
4
5
6
7
8
9
10
static inline bool is_MP() {
// During bootstrap if _processor_count is not yet initialized
// we claim to be MP as that is safest. If any platform has a
// stub generator that might be triggered in this phase and for
// which being declared MP when in fact not, is a problem - then
// the bootstrap routine for the stub generator needs to check
// the processor count directly and leave the bootstrap routine
// in place until called after initialization has ocurred.
return (_processor_count != 1) || AssumeMP;
}

jdk8u: atomic_linux_x86.inline.hpp

1
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

最终实现:

cmpxchg = cas修改变量值

1
lock cmpxchg 指令

汇编实现

汇编指令:

1
lock cmpxchg 指令

lock指令传统方式是锁总线实现,但硬件层级有相应的优化方案。

文档中说的对于Lock指令区分两种实现方法

对于早期的CPU,总是采用的是锁总线的方式。具体方法是,一旦遇到了Lock指令,就由仲裁器选择一个核心独占总线。其余的CPU核心不能再通过总线与内存通讯。从而达到“原子性”的目的。

具体做法是,某一个核心触发总线的“Lock#”那根线,让总线仲裁器工作,把总线完全分给某个核心。

这种方式的确能解决问题,但是非常不高效。为了个原子性结果搞得其他CPU都不能干活了。因此从Intel P6 CPU开始就做了一个优化,改用Ringbus + MESI协议,也就是文档里说的cache conherence机制。这种技术被Intel称为“Cache Locking”。

根据文档原文:如果是P6后的CPU,并且数据已经被CPU缓存了,并且是要写回到主存的,则可以用cache locking处理问题。否则还是得锁总线。因此,lock到底用锁总线,还是用cache locking,完全是看当时的情况。当然能用后者的就肯定用后者。

ABA问题

假设”线程1”正在进行CAS操作,它读取到的指是A,假设再它通过CAS操作把这个值改成B之前,其他线程把A改成C,最终又改成A。然后”线程1“进行CAS依然可以成功,但是这个A已经被修改过了,在某些场景下可能会发生问题。

解决办法:版本号机制(Java中的AtomicStampedReference),但一般基础类型简单值不需要版本号。

更多资料

  1. 8.1.4 Effects of a LOCK Operation on Internal Processor Caches
坚持原创技术分享,您的支持将鼓励我继续创作!