手记

JDK成长记17:Atomic类的原理—CAS+valotile

经过volatile和synchronize关键字的底层原理的洗礼,不知道你是否有一种感觉,要想弄明白它们的原理是一个非常难的过程。为什么费这么大的力气要弄明白这些并发基础知识呢?

其实是为了之后更好的掌握并发组件、并发集合这些内容。JDK中的juc(并发包)的知识大体可以分为如下几块:

并发基础中除了volatile、synchronied、线程状态变化之外,还有很重要的两个知识CAS和AQS。而其他并发组件和集合都是基于这些知识来实现的工具而已。

这一节我们通过Atomic类来学习下它的底层原理,实际它的底层通过CAS+volatile的原理来实现的。我们来具体看看:

  • JDK中的CAS如何实现的?又是如何应用在Atomic中的?
  • CAS的自旋性能问题怎么优化?
  • CAS的ABA问题怎么解决?

在Java代码中,CAS是如何应用在Atomic中的?

在Java代码中,CAS是如何应用在Atomic中的?

之前学习synchronized的时候,就接触过CAS。CAS一般称作Compare And Swap操作。操作流程如下:

上面CAS操作简单描述可以如下:当更新一个值从E->V时,更新的时候需要读取E最新的值N,如果发生了变化,也就是当E!=N,就不会更新成功,重新尝试,否则更新值成功,变为V。

来看一个Demo:

private static AtomicInteger j = new AtomicInteger(0);

public static void main(String[] args) {
  for (int i = 0; i < 10; i++) {
    new Thread(() -> {
      System.out.println(AtomicIntegerDemo.j.incrementAndGet());
    }).start();
  }
}

这段程序有10个线程,它们可能同时更新J的值。但是输出结果是按顺序输出了0到10的数字。这是因为在AtomicInteger底层,每个线程的更新都是CAS操作,它保证了多线程修改同一个值的原子性。

首先你可以先看一下AtomicInteger 整体脉络:

根据之前分析源码的思想,先脉络后细节,你应该可以看出,它核心脉络就是一个Unsafe对象,int的value。如下

public class AtomicInteger extends Number implements java.io.Serializable {
  // setup to use Unsafe.compareAndSwapInt for updates
  private static final Unsafe unsafe = Unsafe.getUnsafe();

  private static final long valueOffset;


  static {
    try {
      valueOffset = unsafe.objectFieldOffset
        (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
  }

  private volatile int value;

}

核心变量:

  • value是一个volatile的int值。通过volatile的可见性,可以保证有一个线程更新了,其他线程可以得到最新值。
  • valueOffset,类初始化的时候,来进行执行的,valueOffset,value这个字段在AtomicInteger这个类中的偏移量,在底层,这个类是有自己对应的结构的,无论是在磁盘的.class文件里,还是在JVM内存中。大概可以理解为:value这个字段具体是在AtomicInteger这个类的哪个位置,offset,偏移量,这个是很底层的操作,是通过unsafe来实现的。刚刚在类初始化的时候,就会完成这个操作的,final的,一旦初始化完毕,就不会再变更了。
  • Usafe类,进行真正CAS操作的类

当通过new AtomicInteger(0)这句话创建的对象,实际是给value赋值了一个初始值0而已。(int默认就是0)

 public AtomicInteger(int initialValue) {
    value = initialValue;
  }

现在你可以得到AtomicInteger的核心组件图如下:

接着每个线程会调用incrementAndGet这个方法:

  public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
  }

可以看到直接调用了Unsafe的getAndAddInt方法,如下:

  public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
      v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
  }

(可以下载OpenJDK源码或访问http://hg.openjdk.java.net/jdk/jdk/file/a1ee9743f4ee/jdk/src/share/classes/sun/misc/Unsafe.java)

上面这段代码从脉络上可以看出来,是经典的CAS操作,首先通过一次volatile读,读到最新的v值,之后再通过compareAndSwapInt这个方法,进行CAS操作。如果CAS操作失败,会返回false,while循环继续执行,进行自旋,重新尝试CAS操作。如果CAS更新操作成功,返回原值。

返回原值之后,incrementAndGet进行了+1操作,incrementAndGet方法返回,就会的得到更新后的值。

如下图所示:

在Java代码层面,很多Atomic原子类底层核心的原理就是CAS,有人也把这种操作称为无锁化,乐观锁,自旋锁之类的。(称呼这种东西是用来交流的,大家能明白就好,不要太过较真)。

CAS操作每次尝试修改的时候,就对比一下,有没有人修改过这个值,没有人修改,自己就修改,如果有人修改过,就重新查出来最新的值,再次重复那个过程。

而AtomicLong、AtomicLongArray、AtomicBoolean这些原子操作类和AtomicIneger底层是类似的,都是通过Unsafe类来做到CAS操作的。

也正是这种操作,可以保证多线程更新的时候的原子性,如下图所示:

在JVM中的CAS如何实现的?

在JVM中的CAS如何实现的?

具体的CAS操作方法都是native的方法,是通过C++的代码实现的。

  public native int   getIntVolatile(Object o, long offset);
  
  public final native boolean compareAndSwapInt(Object o, long offset,
                         int expected,
                         int x);

这就需要我们进一步深入JVM来看下CAS是如何实现的。我给大家找到了对应的C++代码如下:

// unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))

 UnsafeWrapper("Unsafe_CompareAndSwapInt");

 oop p = JNIHandles::resolve(obj);

 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);

 return (jint)(Atomic::cmpxchg(x, addr, e)) == e;

UNSAFE_END

通过观察,可以发现,核心的CAS操作是通过Atomic::cmpxchg(x, addr, e)这个方法实现的。这个方法从名字上cmpxchg就可以猜到是compare and exchange和compare and swap(CAS)是一个意思。

但是你继续跟踪代码会发现,这个Atomic::cmpxchg方法有很多实现。如下图右侧:

因为不同的操作系统和CPU对应的CAS指令可能有所不同,所以有了不同的CAS实现。

这里可以看一下atomic_linux_x86.inline.hpp 93行的这个实现方法。代码如下:

#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "

inline jint   Atomic::cmpxchg  (jint   exchange_value, volatile jint*   dest, jint   compare_value) {

 int mp = os::is_MP();

 __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
          : "=a" (exchange_value)
          : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
          : "cc", "memory");

 return exchange_value;
}

上面这段代码其实就是CAS最底层的实现方式。

首先通过is_MP 这个判断表示是否是多核CPU处理的意思(Multi Processor ) 。

接着如果是的话,asm表示汇编指令,发送一条原子性的汇编指令。也就是说CAS底层实际通过类似于lock cmpxchgl 这种汇编的指令,通知CPU进行原子性的更新。这其实一个轻量的loc指令,可以让CPU保证原子性的操作,所以说CAS是自旋锁,是有道理的。

好了,到这里CAS在JVM层面的具体实现你应该已经了解了。接下来我们来看CAS的产生的两个问题。

CAS的自旋无限循环性能问题怎么优化?

CAS的自旋无限循环性能问题怎么优化?

不知道大家想到过没,CAS操作有一个很大的问题,如果在高并发,多线程更新的时候,会造成大量线程进行自旋,消耗CPU资源。这样的性能是很不好的(当然比synchronized还是要好很多的)。

为了解决这一个问题,Java 8提供的一个对AtomicLong改进后的一个类,LongAdder。它具备一个分段CAS的原子操作类。让我们来看一下它的分段CAS优化的思想

什么叫分段CAS,或者说是分段迁移呢?

意思就是,当某一个线程如果对一个值更新是,可以看对这个值进行分段更新,每一段叫做一个Cell,在更新每一个Cell的时候,发现说出现了很难更新它的值,出现了多次 CAS失败了,自旋的时候,进行自动迁移段,它会去尝试更新别的分段Cell的值,这样的话就可以让一个线程不会盲目的CAS自旋等待一个更新分段cell的值。

LongAdder正是基于这个思想来实现的。基本原理如下图所示:

LongAdder先尝试一次cas更新,如果失败会转而通过Cell[]的方式更新值,如果计算index的方式足够散列,那么在并发量大的情况下,多个线程定位到同一个cell的概率也就越低,这有点类似于分段锁的意思。

但是要注意的是sum方法在并发情况下sum的值不精确,reset方法不是线程安全的。

  public void reset() {
    Cell[] as = cells; Cell a;
    base = 0L;
   if (as != null) {
      for (int i = 0; i < as.length; ++i) {
        if ((a = as[i]) != null)
          a.value = 0L;
      }
    }
  }


  public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
      for (int i = 0; i < as.length; ++i) {
        if ((a = as[i]) != null)
          sum += a.value;
      }
    }
    return sum;

  }

关于sum不精确

sum方法的实现很简单,其实就是 base + sum(cells)。由上述源码可以发现,sum执行时,最终返回的是sum局部变量,初始被复制为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致。其次,这里对cell的读取也无法保证是最后一次写入的值。

所以,sum方法在没有并发的情况下,可以获得正确的结果。

关于reset不线程安全

因为reset方法并不是原子操作,它先将base重置成0,假设此时CPU切走执行sum,此时的sum就变成了减去base后的值。也就是说,在并发量大的情况下,执行完此方法并不能保证其他线程看到的是重置后的结果。所以要慎用。

在实际工作中,可根据LongAdder和AtomicLong的特点来使用这两个工具。 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用LongAdder(例如网站访问人数计数)。 当需要保证线程安全,可允许一些性能损耗,要求高精度时,可以使用AtomicLong。LongAdder,替代AtomicLong,完全可以对心跳计数器来使用LongAdder。

CAS的ABA问题怎么解决?

CAS的ABA问题怎么解决?

最后我们聊一下CAS产生的ABA问题。

什么是ABA问题?

如果某个值一开始是A,后来变成了B,然后又变成了A,你本来期望的是值如果是第一个A才会设置新值,结果第二个A一比较也ok,也设置了新值,跟期望是不符合的。

怎么解决呢?

解决办法也很简单:加一个类似于版本号的东西,比如邮戳int stamp之类的。记录更新的次数即可,比较的时候不光比较value也要比较邮戳。

所以atomic包里有AtomicStampedReference类,就是会比较两个值的引用是否一致,如果一致,才会设置新值

你可以自己研究一下它的的源码。它CAS核心代码如下:

  // AtomicStampedReference
  public boolean compareAndSet(V  expectedReference,
                 V  newReference,
                 int expectedStamp,
                 int newStamp) {

    Pair<V> current = pair;
    return
      expectedReference == current.reference &&
      expectedStamp == current.stamp &&
      ((newReference == current.reference &&
       newStamp == current.stamp) ||
       casPair(current, Pair.of(newReference, newStamp)));
  }

  private boolean casPair(Pair<V> cmp, Pair<V> val) {
    return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
  }

最后还要说明的一点,就是Atomic类不能保证多变量原子问题,一般的AtomicInteger,只能保证一个变量的原子性。

但是如果多个变量?你可以用AtomicReference,这个是封装自定义对象的,多个变量可以放一个自定义对象里,然后他会检查这个对象的引用是不是一个。(注意对象中的变量如果不是Atomic,操作的时候不保证原子性,只能保证操作AtomicReference泛型对应的这个对象的引用时是原子的。)

小结

小结

今天,了解Atomic的类。这里给大家小结下Atomic类底层原理都是CAS,原理都是类似的。主要分为如下几类:

1、AtomicInteger/AtomicLong/AtomicBoolean/AtomicReference是关于对变量的原子CAS更新。

2、AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray是关于对数组的原子CAS更新。

3、AtomicIntegerFieldUpdater/AtomicLongFieldUpdater/AtomicReferenceFieldUpdater<T,V>是基于反射的CAS原子更新某个类的字段。

学完这一节你应该掌握了如下知识

  • CAS的java层面原理(Unsafe+volatile value)
  • CAS的JVM层面(C++/CPU汇编指令lock cmpxchgl前缀指令)
  • ABA问题、无限循环性能问题、多变量原子更新问题
  • 分段迁移CAS思想、邮戳版本号思想、原子引用思想
0人推荐
随时随地看视频
慕课网APP