07-JUC进阶-CAS

一、没有CAS之前

  • 多线程环境不使用原子类保证线程安全(基本数据类型)

    public class T1 {
        volatile int number = 0;
    
        //读取
        public int getNumber() {
            return number;
        }
    
        //写入加锁保证原子性
        public synchronized void setNumber() {
            number++;
        }
    }
  • 多线程环境使用原子类保证线程安全(基本数据类型)

    public class T2 {
        AtomicInteger atomicInteger = new AtomicInteger();
    
        public int getAtomicInteger() {
            return atomicInteger.get();
        }
    
        public void setAtomicInteger() {
            atomicInteger.getAndIncrement();
        }
    }

二、什么是CAS

1. 概念

CAS 即: compare and swap的缩写,中文翻译成比较并交换,实现并发算法时常用到的一种技术。它包含三个操作数——内存位置、预期原值及更新值。

执行CAS操作的时候,将内存位置的值与预期原值比较:

  • 如果相匹配,那么处理器会自动将该位置值更新为新值,
  • 如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。

2. 原理

2.1 概述

CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。

当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来

CAS原理

2.2 硬件级别保证

CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。

它是非阻塞的且自身原子性,也就是说它效率更高且通过硬件保证,更可靠。

CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如 compareAndSwapXXX )底层实现即为CPU指令cmpxchg

执行 cmpxchg 指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现的, 其实在这一点上还是有排他锁的,只是比起用 synchronized, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好

3. CAS代码示例

public class CASDemo {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(5);
        System.out.println(atomicInteger.get());
        
        System.out.println(atomicInteger.compareAndSet(5, 308) + "\t" + atomicInteger.get());

        System.out.println(atomicInteger.compareAndSet(5, 3333) + "\t" + atomicInteger.get());
    }
}

执行结果:

5
true	308
false	308

4. 源码解析

  • compareAndSet(int expect, int update)

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    • unsafe.compareAndSwapInt(this, valueOffset, expect, update)

      public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
      
      public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
      
      public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

    上面三个方法都是类似的,主要对4个参数做一下说明。

    • var1:表示要操作的对象
    • var2:表示要操作对象中属性地址的偏移量
    • var4:表示需要修改数据的期望的值
    • var5/var6:表示需要修改为的新值

那么什么是 unsafe 呢?

三、关于 UnSafe 的理解

1. UnSafe

UnSafe

Unsafe 是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe 相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe 类存在于 sun.misc 包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。

注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务

AtomicInteger

变量valueOffset

变量 valueOffset,表示该变量值在内存中的偏移地址,因为 Unsafe 就是根据内存偏移地址获取数据的。

/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

用volatile修饰,保证了多线程之间的内存可见性。

变量value

2. CPU并发原语

我们知道i++线程不安全的,那 atomicInteger.getAndIncrement() 是如何保证线程安全的呢?

CAS的全称为Compare-And-Swap,它是一条CPU并发原语。它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。
AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。

Compare-And-Swap

CAS并发原语体现在JAVA语言中就是 sun.misc.Unsafe 类中的各个方法。调用 UnSafe 类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。

3. 底层汇编

  • native修饰的方法代表是底层方法

    Unsafe类中的compareAndSwapInt,是一个本地方法,该方法的实现位于unsafe.cpp中

    UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
      UnsafeWrapper("Unsafe_CompareAndSwapInt");
      oop p = JNIHandles::resolve(obj);
    // 先想办法拿到变量value在内存中的地址,根据偏移量valueOffset,计算 value 的地址
      jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
    // 调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是即将更新的值,参数e是原内存的值
      return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
    UNSAFE_END

    (Atomic::cmpxchg(x, addr, e)) == e;

  • cmpxchg

    // 调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是即将更新的值,参数e是原内存的值
    return (jint)(Atomic::cmpxchg(x, addr, e)) == e;

    unsigned Atomic::cmpxchg(unsigned int exchange_value,volatile unsigned int* dest, unsigned int compare_value) {
        assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
      /*
       * 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载函数*/
        return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest, (jint)compare_value);
    }
  • 在不同的操作系统下会调用不同的cmpxchg重载函数,本次用的是win10系统

    inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
      //判断是否是多核CPU
      int mp = os::is_MP();
      __asm {
        //三个move指令表示的是将后面的值移动到前面的寄存器上
        mov edx, dest
        mov ecx, exchange_value
        mov eax, compare_value
        //CPU原语级别,CPU触发
        LOCK_IF_MP(mp)
        //比较并交换指令
        //cmpxchg: 即“比较并交换”指令
        //dword: 全称是 double word 表示两个字,一共四个字节
        //ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元 
        //将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值进行对比,
        //如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中
        cmpxchg dword ptr [edx], ecx
      }
    }

    到这里我们应该理解了CAS真正实现的机制了,它最终是由操作系统的汇编指令完成的。

总结:

你只需要记住:CAS是靠硬件实现的从而在硬件层面提升效率,最底层还是交给硬件来保证原子性和可见性。实现方式是基于硬件平台的汇编指令,在intel的CPU中(X86机器上),使用的是汇编指令cmpxchg指令。

核心思想就是:比较要更新变量的值V和预期值E(compare),相等才会将V的值设为新值N(swap)如果不相等自旋再来。

四、原子引用

除了 AtomicInteger 原子整型,可否有其它原子类型?比如 AtomicBook 、AtomicOrder。

JDK 除了提供了原子整型之外,还提供了原子引用类:AtomicReference

原子引用

public class AtomicReferenceTest {

    public static void main(String[] args) {
        User1 user1 = new User1("张三", 20);
        User1 user2 = new User1("george", 22);

        AtomicReference<User1> atomicUser1 = new AtomicReference<>();
        atomicUser1.set(user1);
        System.out.println("当前人员:" + atomicUser1.get());
        // 第一次比较替换,张三 换成 george, 结果成功
        System.out.println(atomicUser1.compareAndSet(user1, user2) + "\t" + atomicUser1.get());
        // 第二次比较替换,张三 换成 george, 结果失败
        System.out.println(atomicUser1.compareAndSet(user1, user2) + "\t" + atomicUser1.get());
    }
}


@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
class User1 {
    private String name;
    private Integer age;
}

运行结果:

当前人员:User1(name=张三, age=20)
true	User1(name=george, age=22)
false	User1(name=george, age=22)

五、自旋锁

1. 自旋锁概念

自旋锁(spinlock)是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU

OpenJDK源码里面查看下

Unsafe.java

2. 实现一个自旋锁

public class MySpinLock {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    public void lock() {
        // 当没有替换成当前线程,则表示获取锁失败,线程自旋
        while (!atomicReference.compareAndSet(null, Thread.currentThread())) {

        }
        System.out.println(Thread.currentThread().getName() + "获取锁成功");
    }

    public void unlock() {
        while (atomicReference.compareAndSet(Thread.currentThread(), null)) {
            System.out.println(Thread.currentThread().getName() + "解锁成功");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MySpinLock lock = new MySpinLock();
        new Thread(() -> {
            // 获取锁
            lock.lock();
            try {
                // 阻塞3秒
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // 解锁
            lock.unlock();
        }, "t1").start();

        // 程序暂停1秒, 保证t1先拿到锁
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            lock.lock();
            lock.unlock();
        }, "t2").start();
    }
}

运行结果:

t1获取锁成功
t1解锁成功
t2获取锁成功
t2解锁成功

结果:t1先拿到了锁,将 atomicReference 设置成自己,t2 再去获取锁的时候就会获取失败,然后在 while 中自旋, 等待 t1 释放锁后,t2才能获取到锁继续执行。

六、CAS缺点

1. 时间开销大

循环时间长开销很大。

我们可以看到 getAndAddInt 方法执行时,有个 do while

do while

如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。

2. ABA问题

CAS会导致“ABA问题”。

CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。

比如说一个线程t1从内存位置V中取出A,这时候另一个线程t2也从内存中取出A,并且线程t2进行了一些操作将值变成了B,
然后线程t2又将V位置的数据变成A,这时候线程t1进行CAS操作发现内存中仍然是A,然后线程t1操作成功。

public static void abaProblem() {
    new Thread(() -> {
        atomicInteger.compareAndSet(100, 101);
        atomicInteger.compareAndSet(101, 100);
    }, "t1").start();

    //暂停毫秒
    try {
        TimeUnit.MILLISECONDS.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    new Thread(() -> {
        boolean b = atomicInteger.compareAndSet(100, 20210308);
        System.out.println(Thread.currentThread().getName() + "\t" + "修改成功否:" + b + "\t" + atomicInteger.get());
    }, "t2").start();
}

尽管线程t1的CAS操作成功,但是不代表这个过程就是没有问题的。

解决方式:使用 AtomicStampedReference, 带有版本号的原子操作类。

public class ABATest {

    // 初始值:100   初始版本号:1
    static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);

    public static void main(String[] args) {
        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + " 默认获取到的值:" + stampedReference.getReference() + "\t默认版本号:" + stamp);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            // 修改值,将版本号加1
            boolean flag1 = stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "第一次修改:" + flag1);
            // 修改值,将版本号加1
            boolean flag2 = stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "第二次修改:" + flag2);

            System.out.println("修改后的值:" + stampedReference.getReference() + "\t版本号:" + stampedReference.getStamp());
        }, "t1").start();


        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + " 默认获取到的值:" + stampedReference.getReference() + "\t默认版本号:" + stamp);

            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            // 修改值,将版本号加1
            boolean flag = stampedReference.compareAndSet(100, 2024, stamp, stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "修改:" + flag);

            System.out.println("修改后的值:" + stampedReference.getReference() + "\t版本号:" + stampedReference.getStamp());
        }, "t2").start();
    }
}

运行结果

t1 默认获取到的值:100	默认版本号:1
t2 默认获取到的值:100	默认版本号:1
t1第一次修改:true
t1第二次修改:true
修改后的值:100	版本号:3
t2修改:false
修改后的值:100	版本号:3

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 george_95@126.com