CAS

什么是CAS

**CAS(Compare And Swap)**是由硬件实现的,CAS 可以将 read- modify - write 这类的操作转换为原子操作。

CAS 原理: 在把数据更新到主内存时,再次读取主内存变量的值,如果现在变量的值与期望的值(操作起始时读取的值)一样就更新。

CAS实现简单计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CASTest {

public static void main(String[] args) {
CASCounter casCounter = new CASCounter();
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
System.out.println(casCounter.incrementAndGet());
}).start();
}
}

static class CASCounter {
public volatile long value = 0;

public boolean compareAndSwap(long expectedValue, long newValue) {
synchronized (this) {
if (value == expectedValue) {
value = newValue;
return true;
} else {
return false;
}
}
}

public long incrementAndGet() {
long oldvalue;
long newValue;
do {
oldvalue = value;
newValue = oldvalue + 1;
} while (!compareAndSwap(oldvalue, newValue));
return newValue;
}
}
}

CAS常见问题

CAS ABA问题

  CAS 实现原子操作背后有一个假设: 共享变量的当前值与当前线程提供的期望值相同, 就认为这个变量没有被其他线程修改过,实际上这种假设不一定总是成立,如有共享变量 count = 0,发生了如下情况:

  A 线程对 count 值修改为 10,B 线程对 count 值修改为 20,C 线程对 count 值修改为 0

  当前线程看到 count 变量的值现在是0,现在是否认为 count变量的值没有被其他线程更新呢? 这种结果是否能够接受?

这就是 CAS 中的 ABA 问题,即共享变量经历了 A->B->A 的更新

  是否能够接受 ABA 问题跟实现的算法有关。如果想要规避 ABA 问题,可以为共享变量引入一个修订号(时间戳), 每次修改共享变量时,相应的修订号就会增加1。 ABA 变量更 新过程变量: [A,0] ->[B,1]->[A,2], 每次对共享变量的修改都会导致修订号的增加,通过修订号依然可以准确判断变量是否被其他线程修改过。 AtomicStampedReference 以及 AtomicMarkableReference类就是基于这种思想产生的。

循环时间长开销大

  自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令,那么效率会有一定的提升。pause 指令有两个作用:第一,它可以延迟流水线执行指令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;第二,它可以避免在退出循环的时候因内存顺序冲突(Memory Order Violation)而引起 CPU 流水线被清空(CPU Pipeline Flush),从而提高 CPU 的执行效率。

  比较常见的解决方法为:

  • 分散操作热点,使用 LongAdder 替代基础原子类 AtomicLong,LongAdder 将单个 CAS 热点(value 值)的分散到一个 cells 数组中。
  • 使用队列削峰,将发生 CAS 争用的线程加入一个队列中排队,降低 CAS 争用的激烈程度。JUC 中非常重要的基础类 AQS(抽象队列同步器)就是这么做的。

只能保证一个共享变量的原子操作

  当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量 i=2,j=a,合并一下 ij=2a,然后用CAS 来操作 ij。从 Java 1.5 开始, JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行 CAS 操作。

CAS在JDK中的应用

  CAS 在 java.util.concurrent.atomic 包中的原子类、Java AQS 以及显示锁、CurrentHashMap 等重要并发容器类的实现上,都有非常广泛的应用。

  在 java.util.concurrent.atomic 包的原子类如 AtomicXXX 中,都使用了 CAS 保障对数字成员进行操作的原子性。java.util.concurrent 的大多数类(包括显示锁、并发容器)都基于 AQS 和 AtomicXXX 实现, 而 AQS 通过 CAS 保障其内部双向队列头部、尾部操作的原子性。

常用原子类

  原子变量类基于CAS实现的, 当对共享变量进行read-modify-write 更新操作时,通过原子变量类可以保障操作的原子性与可见性.对变量的 read-modify-write 更新操作是指当前操作不是一个简单的赋值,而 是变量的新值依赖变量的旧值,如自增操作i++. 由于volatile只能保证 可见性,无法保障原子性, 原子变量类内部就是借助一个Volatile变量, 并且保障了该变量的 read-modify-write 操作的原子性, 有时把原子变 量类看作增强的 volatile 变量. 原子变量类有 12 个,如下:

分组 原子变量类
基础数据型 AtomicInteger、 AtomicLong、 AtomicBoolean
数组型 AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
字段更新器 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
引用型 AtomicReference、 AtomicStampedReference、AtomicMarkableReference

  除了用Synchronized 进行同步外,也可以使用原子类来进行实现。

AtomicInteger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AtomicTest02 {

public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new AtomicThread().start();
}
}

static class AtomicThread extends Thread {
// 用synchronized不需要volatile修饰
private static AtomicInteger count = new AtomicInteger();

@Override
public void run() {
// 不具备原子性
addCount();
}

//这段代码运行后不是线程安全的,想要线程安全,需要使用 synchronized 进行同步,
// 如果使用 synchronized 同时,也就不需要 volatile 关键了
private static void addCount() {
for (int i = 0; i < 1000; i++) {
//自增的后缀形式
count.getAndIncrement();
}
System.out.println(Thread.currentThread().getName() + " count:" + count);
}
}
}

AtomicIntegerArray

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AtomicIntegerArrayTest {

public static void main(String[] args) {
//1)创建一个指定长度的原子数组
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
System.out.println(atomicIntegerArray); //[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
//2)返回指定位置的元素
System.out.println(atomicIntegerArray.get(0));//0
System.out.println(atomicIntegerArray.get(1));//0
//3)设置指定位置的元素
atomicIntegerArray.set(0, 10);
//在设置数组元素的新值时, 同时返回数组元素的旧值
System.out.println(atomicIntegerArray.getAndSet(1, 11)); //0
System.out.println(atomicIntegerArray); //[10, 11, 0, 0, 0, 0, 0, 0, 0, 0]
//4)修改数组元素的值,把数组元素加上某个值
System.out.println(atomicIntegerArray.addAndGet(0, 22)); //32
System.out.println(atomicIntegerArray.getAndAdd(1, 33)); //11
System.out.println(atomicIntegerArray); //[32, 44, 0, 0, 0, 0, 0, 0, 0, 0]
//5)CAS 操作
//如果数组中索引值为 0 的元素的值是 32 , 就修改为 222
System.out.println(atomicIntegerArray.compareAndSet(0, 32, 222)); //true
System.out.println(atomicIntegerArray); //[222, 44, 0, 0, 0, 0, 0, 0, 0, 0]
System.out.println(atomicIntegerArray.compareAndSet(1, 11, 333)); //false
System.out.println(atomicIntegerArray);
//6)自增/自减
System.out.println(atomicIntegerArray.incrementAndGet(0));//223, 相当于前缀
System.out.println(atomicIntegerArray.getAndIncrement(1));//44, 相当于后缀
System.out.println(atomicIntegerArray); //[223, 45, 0, 0, 0, 0, 0, 0, 0, 0]
System.out.println(atomicIntegerArray.decrementAndGet(2)); //-1
System.out.println(atomicIntegerArray); //[223, 45, -1, 0, 0, 0, 0, 0, 0, 0]
System.out.println(atomicIntegerArray.getAndDecrement(3)); //0
System.out.println(atomicIntegerArray);//[223, 45, -1, -1, 0, 0, 0, 0, 0, 0]
}
}

AtomicIntegerFieldUpdater

  AtomicIntegerFieldUpdater 可以对原子整数字段进行更新,要求:

  1. 字符必须使用 volatile 修饰,使线程之间可见

  2. 只能是实例变量,不能是静态变量,也不能使用 final 修饰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class AtomicIntegerFieldUpdaterTest {

public static void main(String[] args) {
User user = new User(1234, 10);
//开启 10 个线程
for (int i = 0; i < 10; i++) {
new SubThread(user).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(user);
}


static class SubThread extends Thread {
private User user; //要更新的 User 对象
//创建AtomicIntegerFieldUpdater更新器
private static final AtomicIntegerFieldUpdater<User> updater =
AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

public SubThread(User user) {
this.user = user;
}

@Override
public void run() {
//在子线程中对 user 对象的 age 字段自增 10 次
for (int i = 0; i < 10; i++) {
System.out.println(updater.getAndIncrement(user));
}
}
}

static class User {
int id;
volatile int age;

public User(int id, int age) {
this.id = id;
this.age = age;
}

@Override
public String toString() {
return "User{" +
"id=" + id +
", age=" + age +
'}';
}
}
}

AtomicReference

  1. AtomicReference和AtomicInteger非常类似,不同之处就在于AtomicInteger是对整数的封装AtomicReference则对应普通的对象引用。也就是它可以保证你在修改对象引用时的线程安全性。

  2. AtomicReference是作用是对”对象”进行原子操作。 提供了一种读和写都是原子性的对象引用变量。原子意味着多个线程试图改变同一个AtomicReference(例如比较和交换操作)将不会使得AtomicReference处于不一致的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class AtomicReferenceTest {
//创建一个 AtomicReference 对象
static AtomicReference<String> atomicReference = new AtomicReference<>("abc");

public static void main(String[] args) throws InterruptedException {
//创建 100 个线程修改字符串
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
Thread.sleep(new Random().nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (atomicReference.compareAndSet("abc", "def")) {
System.out.println(Thread.currentThread().getName() + "把字符串 abc 更改为 def");
}
}).start();
}
//再创建 100 个线程
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
Thread.sleep(new Random().nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (atomicReference.compareAndSet("def", "abc")) {
System.out.println(Thread.currentThread().getName() + "把字符串 还原为 abc");
}
}).start();
}
Thread.sleep(1000);
System.out.println(atomicReference.get());
}
}

AtomicReference 可能会出现CAS的ABA问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class AtomicReferenceABATest {
private static AtomicReference<String> atomicReference = new AtomicReference<>("abc");

public static void main(String[] args) throws InterruptedException {
//创建第一个线程,先把 abc 字符串改为"def",再把字符串还原为 abc
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
atomicReference.compareAndSet("abc", "def");
System.out.println(Thread.currentThread().getName() + "--" + atomicReference.get());
atomicReference.compareAndSet("def", "abc");
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.compareAndSet("abc", "ghg"));
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(atomicReference.get());
}
}

// 输出
Thread-0--def
true
ghg

ABA问题解决

AtomicStampedReference方式

在 AtomicStampedReference 原子类中有一个 stamp 印戳(或标记),使用这个印戳可以用来觉察数据是否发生变化,给数据带上了一种实效性的检验。 AtomicStampReference 的 compareAndSet 方法首先检查当前的对象引用值是否等于预期引用, 并且当前印戳(Stamp)标志是否等于预期标志,如果全部相等,则以原子方式将引用值和印戳(Stamp)标志的值更新为给定的更新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class AtomicStampedReferenceTest {
// 定义 AtomicStampedReference 引用操作"abc"字符串,指定初始化版本号为 0
private static AtomicStampedReference<String> stampedReference = new AtomicStampedReference<>("abc", 0);

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
stampedReference.compareAndSet("abc", "def", stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(
Thread.currentThread().getName() + "第一次修改结束,当前值:" + stampedReference.getReference() +
" 当前版本号:" + stampedReference.getStamp()
);
stampedReference.compareAndSet("def", "abc", stampedReference.getStamp(), stampedReference.getStamp() + 1);
System.out.println(
Thread.currentThread().getName() + "第二次修改结束,当前值:" + stampedReference.getReference() +
" 当前版本号:" + stampedReference.getStamp()
);
});
Thread t2 = new Thread(() -> {
// 先获得旧版本号
int stamp = stampedReference.getStamp();
try {
// 延迟一下,确保t1先执行结束,将值重新置为abc
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t2修改是否成功:" + stampedReference.compareAndSet("abc", "ggg", stamp, stamp + 1));
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("最终结果:" + stampedReference.getReference());
}
}
// 运行结果
Thread-0第一次修改结束,当前值:def 当前版本号:1
Thread-0第二次修改结束,当前值:abc 当前版本号:2
t2修改是否成功:false
最终结果:abc

AtomicMarkableReference方式

AtomicMarkableReference 是 AtomicStampedReference 的简化版,不关心修改过几次,仅仅关心是否修改过。因此其标记属性 mark 是 boolean 类型,而不是数字类型,标记属性 mark 仅记录值是否有过修改。

AtomicMarkableReference 适用只要知道对象是否有被修改过,而不适用于对象被反复修改的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class AtomicMarkableReferenceTest {

public static final ExecutorService executorService = Executors.newFixedThreadPool(2);

public static void main(String[] args) throws InterruptedException {

CountDownLatch latch = new CountDownLatch(2);

AtomicMarkableReference<Integer> atomicRef =
new AtomicMarkableReference<Integer>(1, false);

executorService.submit(() -> {
boolean success;
int value = atomicRef.getReference();
boolean mark = getMark(atomicRef);
System.out.println("before sleep 500: value=" + value
+ " mark=" + mark);

//等待500毫秒
try {
// 延迟一下,确保t1先执行结束,将值重新置为abc
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
success = atomicRef.compareAndSet(1, 10,
mark, !mark);

System.out.println("after sleep 500 cas 1: success=" + success
+ " value=" + atomicRef.getReference()
+ " mark=" + getMark(atomicRef));


latch.countDown();
});

executorService.submit(() -> {
boolean success;
int value = atomicRef.getReference();
boolean mark = getMark(atomicRef);
System.out.println("before sleep 1000: value=" + atomicRef.getReference()
+ " mark=" + mark);

//等待1000毫秒
try {
// 延迟一下,确保t1先执行结束,将值重新置为abc
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep 1000: mark = " + getMark(atomicRef));
success = atomicRef.compareAndSet(1, 20, mark, !mark);
System.out.println("after cas 3 1000: success=" + success
+ " value=" + atomicRef.getReference()
+ " mark=" + getMark(atomicRef));
latch.countDown();
});
latch.await();
}

private static boolean getMark(AtomicMarkableReference<Integer> atomicRef) {
boolean[] markHolder = {false};
int value = atomicRef.get(markHolder);
return markHolder[0];
}
}
// 运行结果
before sleep 500: value=1 mark=false
before sleep 1000: value=1 mark=false
after sleep 500 cas 1: success=true value=10 mark=true
after sleep 1000: mark = true
after cas 3 1000: success=false value=10 mark=true

LongAdder提升CAS 操作性能

在争用激烈的场景下,会导致大量的 CAS 空自旋。比如说,在大量的线程同时并发修改一个 AtomicInteger 时,可能有很多线程会不停的自旋,甚至有的线程会进入一个无限重复的循环中。 大量的 CAS 空自旋会浪费大量的 CPU 资源,大大降低了程序的性能。

在高并发场景下如何提升 CAS 操作性能呢?答案就是使用 LongAdder 替代 AtomicInteger及AtomicLong。(jdk >= 1.8)

  Java8 提供一个新的类 LongAdder,以空间换时间的方式提升高并发场景下 CAS 操作性能。 LongAdder 核心思想就是热点分离,和 ConcurrentHashMap 的设计思想类似:将 value 值分离成一个数组,当多线程访问时,通过 hash 算法将线程映射到数组的一个元素进行操作;而获取最终的 value 结果时,则将数组的元素求和。

LongAdder和AtomicLong性能对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class LongAdderVSAtomicLongTest {
// 每条线程的执行轮数
private static final int TURNS = 1000000000;

// 线程池,测试机为12核,可以根据自己计算机核心调整
private static final ExecutorService pool = Executors.newFixedThreadPool(12);

public static void main(String[] args) {
testAtomicLong();
// testLongAdder();
}


public static void testAtomicLong() {
// 并发任务数
final int TASK_AMOUNT = 10;

//定义一个原子对象
AtomicLong atomicLong = new AtomicLong(0);

// 线程同步倒数闩
CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < TASK_AMOUNT; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < TURNS; j++) {
atomicLong.incrementAndGet();
}
// Print.tcfo("本线程累加完成");
} catch (Exception e) {
e.printStackTrace();
}
//倒数闩,倒数一次
countDownLatch.countDown();

});
}

try {
//等待倒数闩完成所有的倒数操作
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
System.out.println("运行的时长为:" + time);
System.out.println("累加结果为:" + atomicLong.get());
}

public static void testLongAdder() {
// 并发任务数
final int TASK_AMOUNT = 10;

//定义一个LongAdder 对象
LongAdder longAdder = new LongAdder();
// 线程同步倒数闩
CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < TASK_AMOUNT; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < TURNS; j++) {
longAdder.add(1);
}
} catch (Exception e) {
e.printStackTrace();
}
//倒数闩,倒数一次
countDownLatch.countDown();
});
}

try {
//等待倒数闩完成所有的倒数操作
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
System.out.println("运行的时长为:" + time);
System.out.println("累加结果为:" + longAdder.longValue());
}
}

  将程序中的TURNS值由10000按照每次10倍的比例调整到1000000000,可以发现,LongAdder和AtomicLong在数据在并发程度低的情况下,效率相差几乎可以忽略不计,但是随着并发量的提升,AtomicLong的效率就变的异常的低了,两者诧异就很大了,多次测试,情况如下:

类型\执行次数 10000 * 10 100000 * 10 1000000 * 10 10000000 * 10 100000000 * 10 1000000000 * 10
AtomicLong 0.037 s 0.07 s 0.232 s 1.739 s 15.989 s 185.284 s
LongAdder 0.039 s 0.068 s 0.072 s 0.185 s 1.191 s 10.681 s

LongAdder 的原理

  AtomicLong 使用内部变量 value 保存着实际的 long 值,所有的操作都是针对该 value 变量进行。也就是说,高并发环境下,value 变量其实是一个热点,也就是 N 个线程竞争一个热点。重试线程越多,意味着 CAS 的失败几率更高,CAS 失败几率就越高,从而进入恶性 CAS 空自旋状态。

  LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行 CAS 操作。这样热点就被分散了,冲突的概率就小很多。

  使用 LongAdder,即使线程数再多也不担心,各个线程会分配到多个元素上去更新,增加元素个数,就可以降低 value 的“热度”,AtomicLong 中的恶性 CAS 空自旋就解决了。 如果要获得完整的 LongAdder 存储的值,只要将各个槽中的变量值累加,返回最终的累加之后的值即可。

  LongAdder 的实现思路,与 ConcurrentHashMap 中分段锁基本原理非常相似,本质上,都是不同的线程在不同的单元上进行操作,这样减少了线程竞争,提高了并发效率。用空间换时间的思想,不过在实际高并发情况中消耗的空间可以忽略不计。

  LongAdder 的设计体现了空间换时间的思想,不过在实际高并发场景下,数组元素所消耗的空间可以忽略不计。

img

LongAdder源码

  LongAdder 继承于 Striped64 类,base 值和 cells 数组都在 Striped64 类定义。基类 Striped64内部三个重要的成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** 
* 成员一:存放 Cell 的 hash 表,大小为 2 的幂。
*/
transient volatile Cell[] cells;
/**
* 成员二:基础值,
* 1. 在没有竞争时会更新这个值;
* 2. 在 cells 初始化时 cells 不可用,也会尝试将通过 cas 操作值累加到 base。
*/
transient volatile long base;
/**
* 自旋锁,通过 CAS 操作加锁,为 0 表示 cells 数组没有处于创建、扩容阶段 * 为 1 用于表示正在创建或者扩展 Cell 数组,不能进行新 Cell 元素的设置操作。
*/
transient volatile int cellsBusy;

  Striped64 内部包含一个 base 和一个 Cell[]类型的 cells 数组,cells 数组又叫 hash 表。 在没有竞争的情况下,要累加的数通过 CAS 累加到 base 上;如果有竞争的话,会将要累加的数累加到 Cells 数组中的某个 cell 元素里面。所以 Striped64 的整体值 value 为 base+∑[0~n]cells。

Striped64 的整体值 value 的获取函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public long longValue() {
// longValue()方法调用了 sum(), 累加所有 cell 的值
return sum();
}
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
// 累加所有 cell 的值
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

  Striped64 的设计核心思路就是通过内部的分散计算来避免竞争,以空间换时间。LongAdder的 base 类似于 AtomicInteger 里面的 value,在没有竞争的情况,cells 数组为 null,这时只使用 base 做累加;而一旦发生竞争,cells 数组就上场了。

  cells 数组第一次初始化长度为 2,以后每次扩容都是变为原来的两倍,一直到 cells 数组的长度大于等于当前服务器 CPU 的核数。为什么呢?同一时刻,能持有 CPU 时间片而去并发操作同一个内存地址的最大线程数,最多也就是 CPU 的核数。

  在存在线程争用的时候,每个线程被映射到 cells[threadLocalRandomProbe & cells.length]位置的 Cell 元素,该线程对 value 所做的累加操作,就执行在对应的 Cell 元素的值上,最终相当于将线程绑定到了 cells 中的某个 cell 对象上。

1
2
3
4
5
6
7
8
9
10
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

casBase 方法

  casBase 方法就是通过 UNSAFE 类的 CAS 设置成员变量 base 的值为 base+x(要累加的值),casBase 方法的代码如下:

1
2
3
4
5
6
/**
* 使用 CAS 来更 base 值
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

casCellsBusy 方法

  casCellsBusy 方法就是将 cellsBusy 成员的值改为 1,表示目前的 cells 数组在初始化或扩容中,具体的代码如下:

1
2
3
final boolean casCellsBusy() { 
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

  casCellsBusy( )方法相当于锁的功能:当线程需要 cells 数组初始化或扩容时,需要调用casCellsBusy 方法,通过 CAS 方式将 cellsBusy 成员的值改为 1,如果修改失败,表示其他的线程正在进行数组初始化、或扩容的操作。只有 CAS 操作成功,cellsBusy 成员的值被改为 1,当前线程才能执行 cells 数组初始化、或扩容的操作。在 cells 数组初始化、或扩容的操作执行完成之后,cellsBusy 成员的值被改为 0,这时候不需要进行 CAS 修改,直接修改即可,因为不存在争用。

  当在 cellsBusy 成员值为 1 时,表示 cells 数组正被某个线程执行初始化、或扩容操作,其他线程不能进行以下操作:

  • 对 cells 数组执行初始化
  • 对 cells 数组执行扩容
  • 如果 cells 数组中某个元素为 null,为该元素创建新的 Cell 对象。因为数组的结构正在修改,所以其他线程不能创建新的 Cell 对象。