08-JUC进阶-常用的原子操作类(18个)

一、常用原子操作类

JDK文档:https://www.runoob.com/manual/jdk11api/java.base/java/util/concurrent/atomic/package-summary.html

1. 为什么需要原子操作类

对于多线程,在此之前已经了解到了一个关键字 volatile, volatile 解决多线程内存不可见问题。对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。

说明:如果是 count++操作,使用如下类实现:

AtomicInteger count = new AtomicInteger(0);
count.addAndGet(1);

如果是 JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观锁的重试次数)。

2. 有哪些原子操作类

原子操作类

  1. AtomicBoolean
  2. AtomicInteger
  3. AtomicIntegerArray
  4. AtomicIntegerFieldUpdater
  5. AtomicLong
  6. AtomicLongArray
  7. AtomicLongFieldUpdater
  8. AtomicMarkableReference
  9. AtomicReference
  10. AtomicReferenceArray
  11. AtomicReferenceFieldUpdater
  12. AtomicStampedReference
  13. DoubleAccumulator
  14. DoubleAdder
  15. LongAccumulator
  16. LongAdder

二、分类学习

1. 基本类型原子类

  • AtomicInteger
  • AtomicBoolean
  • AtomicLong

1.1 常用API简介

  • public final int get() //获取当前的值
  • public final int getAndSet(int newValue) // 获取当前的值,并设置新的值
  • public final int getAndIncrement() // 获取当前的值,并自增
  • public final int getAndDecrement() // 获取当前的值,并自减
  • public final int getAndAdd(int delta) // 获取当前的值,并加上预期的值
  • boolean compareAndSet(int expect, int update) // 如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)

1.2 测试案例

1.2.1 错误案例

开启50个线程,对一个原子类变量进行操作

public class AtomicIntegerTest {

    private static int size = 50; // 50个线程
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(0);

        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    atomicInteger.getAndIncrement();
                }
            }, "t" + i).start();
        }

        System.out.println("运行结果:" + atomicInteger.get());
    }
}

运行结果:

运行结果:31056

AtomicInteger 是一个线程安全的原子操作类,但是执行结果却跟预期的不一致,这是为什么?

原因:在打印结果的时候,50个线程并没有完全运行结束,导致最后获得的结果跟预期不一致。

1.2.2 解决方式

  • 方式一:在代码中添加 sleep,等待其它线程结束后再打印主线程
    • 此方式并不推荐,因为在实际开发中并不确定要等待多久,sleep操作影响系统的执行效率
  • 方式二:使用 CountDownLatch
public class AtomicIntegerTest {

    private static int size = 50; // 50个线程

    private static CountDownLatch countDownLatch = new CountDownLatch(50);
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger(0);

        for (int i = 0; i < size; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 1000; j++) {
                        atomicInteger.getAndIncrement();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, "t" + i).start();
        }

        // 方式一:等待线程运行结果后,再打印
//        TimeUnit.SECONDS.sleep(2);

        // 方式二:使用 countDownLatch
        countDownLatch.await();

        System.out.println("运行结果:" + atomicInteger.get());
    }
}

2. 数组类型原子类

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

2.1 使用案例

public class AtomicIntegerArrayDemo {
    public static void main(String[] args) {
        // 方式一:指定数组长度
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
        // 方式二:指定数组长度
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
        // 方式三:指定数组长度并初始化内容
        //AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1,2,3,4,5});

        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println();
        System.out.println();
        System.out.println();
        int tmpInt = 0;

        // 指定下标设置内容
        tmpInt = atomicIntegerArray.getAndSet(0, 1122);
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0)); // 0 1122
        // 指定下标元素自增
        atomicIntegerArray.getAndIncrement(1);
        atomicIntegerArray.getAndIncrement(1);
        tmpInt = atomicIntegerArray.getAndIncrement(1);
        // 打印下标元素
        System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1)); // 2 3

    }
}

打印结果:

0
0
0
0
0


0	1122
2	3

3. 引用类型原子类

  • AtomicReference
  • AtomicStampedReference
  • AtomicMarkableReference

3.1 AtomicReference

基础使用案例

public class AtomicReferenceTest {

    public static void main(String[] args) {
        User1 user1 = new User1("张三", 20);
        User1 user2 = new User1("george", 22);

        AtomicReference<User1> atomicUser1 = new AtomicReference<>();
        atomicUser1.set(user1);
        System.out.println("当前人员:" + atomicUser1.get());
        // 第一次比较替换,张三 换成 george, 结果成功
        System.out.println(atomicUser1.compareAndSet(user1, user2) + "\t" + atomicUser1.get());
        // 第二次比较替换,张三 换成 george, 结果失败
        System.out.println(atomicUser1.compareAndSet(user1, user2) + "\t" + atomicUser1.get());
    }
}


@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
class User1 {
    private String name;
    private Integer age;
}

实现自旋锁

public class MySpinLock {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    public void lock() {
        // 当没有替换成当前线程,则表示获取锁失败,线程自旋
        while (!atomicReference.compareAndSet(null, Thread.currentThread())) {

        }
        System.out.println(Thread.currentThread().getName() + "获取锁成功");
    }

    public void unlock() {
        while (atomicReference.compareAndSet(Thread.currentThread(), null)) {
            System.out.println(Thread.currentThread().getName() + "解锁成功");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MySpinLock lock = new MySpinLock();
        new Thread(() -> {
            // 获取锁
            lock.lock();
            try {
                // 阻塞3秒
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // 解锁
            lock.unlock();
        }, "t1").start();

        // 程序暂停1秒, 保证t1先拿到锁
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            lock.lock();
            lock.unlock();
        }, "t2").start();
    }
}

3.2 AtomicStampedReference

携带版本号的引用类型原子类,可以解决ABA问题

使用案例:

public class ABATest {

    // 初始值:100   初始版本号:1
    static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);

    public static void main(String[] args) {
        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + " 默认获取到的值:" + stampedReference.getReference() + "\t默认版本号:" + stamp);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            // 修改值,将版本号加1
            boolean flag1 = stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "第一次修改:" + flag1);
            // 修改值,将版本号加1
            boolean flag2 = stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "第二次修改:" + flag2);

            System.out.println("修改后的值:" + stampedReference.getReference() + "\t版本号:" + stampedReference.getStamp());
        }, "t1").start();


        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + " 默认获取到的值:" + stampedReference.getReference() + "\t默认版本号:" + stamp);

            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            // 修改值,将版本号加1
            boolean flag = stampedReference.compareAndSet(100, 2024, stamp, stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "修改:" + flag);

            System.out.println("修改后的值:" + stampedReference.getReference() + "\t版本号:" + stampedReference.getStamp());
        }, "t2").start();
    }
}

3.3 AtomicMarkableReference

原子更新带有标记位的引用类型对象.

解决对象是否修改过的问题,它的定义就是将状态戳简化为 true|false。

public class AtomicMarkableReferenceDemo {
    static AtomicMarkableReference atomicMarkableReference = new AtomicMarkableReference(100, false);

    public static void main(String[] args) {
        new Thread(() -> {
            boolean marked = atomicMarkableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t" + "---默认修改标识:" + marked);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            atomicMarkableReference.compareAndSet(100, 101, marked, !marked);
        }, "t1").start();

        new Thread(() -> {
            boolean marked = atomicMarkableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t" + "---默认修改标识:" + marked);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean b = atomicMarkableReference.compareAndSet(100, 20210308, marked, !marked);

            System.out.println(Thread.currentThread().getName() + "\t" + "---操作是否成功:" + b);
            System.out.println(Thread.currentThread().getName() + "\t" + atomicMarkableReference.getReference());
            System.out.println(Thread.currentThread().getName() + "\t" + atomicMarkableReference.isMarked());

        }, "t2").start();
    }
}

运行结果:

t1	---默认修改标识:false
t2	---默认修改标识:false
t2	---操作是否成功:false
t2	101
t2	true

4. 对象的属性修改原子类

4.1 原子类

  • AtomicIntegerFieldUpdater

    原子更新对象中int类型字段的值

  • AtomicLongFieldUpdater

    原子更新对象中Long类型字段的值

  • AtomicReferenceFieldUpdater

    原子更新引用类型字段的值

4.2 使用目的

以一种线程安全的方式操作非线程安全对象内的某些字段

4.3 使用要求

更新的对象属性必须使用 public volatile 修饰符。

因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

4.4 使用案例

AtomicIntegerFieldUpdater : Integer 类型字段的原子修改

public class AtomicIntegerFieldUpdaterTest {
    public static void main(String[] args) throws InterruptedException {
        Bank bank = new Bank();
        // 创建10个线程,每个线程操作1000次
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    bank.add(bank);
                }
            }).start();
        }

        // 主线程等待其它线程运行结束
        TimeUnit.SECONDS.sleep(3);

        System.out.println("账户还剩:" + bank.money);
    }
}

class Bank {
    //以一种线程安全的方式操作非线程安全对象内的某些字段
    //1 更新的对象属性必须使用 public volatile 修饰符。
    public volatile int money = 0;

    public String name;

    //2 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须
    // 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
    AtomicIntegerFieldUpdater<Bank> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Bank.class, "money");

    public void add(Bank bank) {
        // 每次增加10
        fieldUpdater.getAndAdd(bank, 10);
    }
}

运行结果:

账户还剩:100000

AtomicReferenceFieldUpdater: 引用类型字段的原子修改

class MyVar {
    public volatile MyVar var = null;
    AtomicReferenceFieldUpdater<MyVar, MyVar> updater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class, MyVar.class, "var");

    public void init(MyVar myVar) {

        if (updater.compareAndSet(myVar, null, new MyVar())) {
            System.out.println(Thread.currentThread().getName() + "\t" + "---start init");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "\t" + "---end init");
        } else {
            System.out.println(Thread.currentThread().getName() + "\t" + "---抢夺失败,已经有线程在修改中");
        }
    }

}


/**
 * @auther zzyy
 * @create 2021-03-22 15:20
 * 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
 */
public class AtomicReferenceFieldUpdaterDemo {
    public static void main(String[] args) {
        MyVar myVar = new MyVar();

        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                myVar.init(myVar);
            }, String.valueOf(i)).start();
        }
    }
}

运行结果:

1	---start init
4	---抢夺失败,已经有线程在修改中
3	---抢夺失败,已经有线程在修改中
2	---抢夺失败,已经有线程在修改中
5	---抢夺失败,已经有线程在修改中
1	---end init

三、JDK8新增的原子操作类

1. 原子类

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

2. 常用API

  • void add(long x) 将当前的value加x

  • void increment() 将当前的value加1

  • void decrement() 将当前的value减1

  • long sum()

    返回当前值。特别注意,在没有并发更新value的情况下,sum会返回一个精确值,在存在并发的情况下,sum不保证返回精确值。

  • void reset()

    将value重置为0,可用于替代重新new一个 LongAdder,但此方法只可以在没有并发更新的情况下使用。

  • long sumThenReset()

    获取当前value,并将value重置为0

3. 入门讲解

LongAdder 只能用来计算加法,且从零开始计算.

LongAccumulator 提供了自定义的函数操作.

public class LongAccumulatorDemo {
    //LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y,0);
    // 初始值为10,每次调用,将传进来的参数与10相减
    LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
        @Override
        public long applyAsLong(long left, long right) {
            return left - right;
        }
    }, 10); // 10 是初始值

    public void add_LongAccumulator() {
        // 每次减1
        longAccumulator.accumulate(1);
    }

    public static void main(String[] args) {
        LongAccumulatorDemo demo = new LongAccumulatorDemo();
        // 第一次操作
        demo.add_LongAccumulator();
        // 第二次操作
        demo.add_LongAccumulator();
        // 打印结果
        System.out.println("执行结果:" + demo.longAccumulator.longValue()); // 结果:8
    }

}

LongAdder 使用案例

public class LongAdderAPIDemo {
    public static void main(String[] args) {
        LongAdder longAdder = new LongAdder();//只能做加法

        longAdder.increment();
        longAdder.increment();
        longAdder.increment();

        System.out.println(longAdder.longValue()); // 3
    }
}

4. LongAdder 高性能对比

对比在原子操作中,使用 synchronized、AtomicInteger、AtomicLong、LongAdder、LongAccumulator的性能差距

4.1 测试代码:

class ClickNumber {
    int number = 0;

    /**
     * 对比1:使用 synchronized
     */
    public synchronized void add_Synchronized() {
        number++;
    }

    AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 对比2: AtomicInteger
     */
    public void add_AtomicInteger() {
        atomicInteger.incrementAndGet();
    }

    AtomicLong atomicLong = new AtomicLong();

    /**
     * 对比3:AtomicLong
     */
    public void add_AtomicLong() {
        atomicLong.incrementAndGet();
    }

    LongAdder longAdder = new LongAdder();

    /**
     * 对比4:LongAdder
     */
    public void add_LongAdder() {
        longAdder.increment();
        //longAdder.sum();
    }

    LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);

    /**
     * 对比5:LongAccumulator
     */
    public void add_LongAccumulator() {
        longAccumulator.accumulate(1);
    }

}


/**
 * 50个线程,每个线程100W次,总点赞数出来
 */
public class LongAdderCalcDemo {
    public static final int SIZE_THREAD = 50;
    public static final int _1W = 10000;

    public static void main(String[] args) throws InterruptedException {
        ClickNumber clickNumber = new ClickNumber();
        long startTime;
        long endTime;

        CountDownLatch countDownLatch1 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD);
        CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD);
        //========================

        /****************** synchronized *******************/
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= SIZE_THREAD; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_Synchronized();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch1.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t add_Synchronized" + "\t" + clickNumber.number);


        /****************** AtomicInteger *******************/
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= SIZE_THREAD; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_AtomicInteger();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch2.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t add_AtomicInteger" + "\t" + clickNumber.atomicInteger.get());

        /****************** AtomicLong *******************/
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= SIZE_THREAD; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_AtomicLong();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch3.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t add_AtomicLong" + "\t" + clickNumber.atomicLong.get());

        /****************** LongAdder *******************/
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= SIZE_THREAD; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_LongAdder();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch4.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t add_LongAdder" + "\t" + clickNumber.longAdder.longValue());


        /****************** LongAccumulator *******************/
        startTime = System.currentTimeMillis();
        for (int i = 1; i <= SIZE_THREAD; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.add_LongAccumulator();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch5.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch5.await();
        endTime = System.currentTimeMillis();
        System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t add_LongAccumulator" + "\t" + clickNumber.longAccumulator.longValue());
    }
}

打印结果:

----costTime: 2591 毫秒	 add_Synchronized	50000000
----costTime: 702 毫秒	 add_AtomicInteger	50000000
----costTime: 622 毫秒	 add_AtomicLong	50000000
----costTime: 89 毫秒	     add_LongAdder	50000000
----costTime: 137 毫秒	 add_LongAccumulator	50000000

4.2 结论

LongAdder、LongAccumulator 在高并发程序中的性能表现要远好于其它原子操作。

四、源码、原理分析

解释:原理(LongAdder为什么这么快)?

1. 原理解析

1.1 原理概述

LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。

sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

1.2 图示

实现原理如图:

分散热点

1.3 数学表达

$$
value = base + \sum_{i=0}^n Cell[i]
$$

内部有一个base变量,一个Cell[]数组。

  • base变量:非竞态条件下,直接累加到该变量上
  • Cell[]数组:竞态条件下,累加个各个线程自己的槽Cell[i]中

2. LongAdder 架构

image-20240930081126088

LongAdder是Striped64的子类

LongAdder

3. Striped64

3.1 成员变量

Striped64有几个比较重要的成员变量

/** 计算当前CPU数量,Cell[] 扩容时会用到 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
 * Table of cells. When non-null, size is a power of 2.
 */
transient volatile Cell[] cells;

/**
 * 类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
 */
transient volatile long base;

/**
 * 初始化cells或者扩容cells需要获取锁,0:表示无锁状态 1:表示其他线程已经持有了锁
 */
transient volatile int cellsBusy;

3.2 成员方法

Striped64有几个比较重要的成员方法

/**
 * 通过CAS操作修改 cellsBusy 的值,CAS成功代表获取锁,返回true
 */
final boolean casCellsBusy() {
    return CELLSBUSY.compareAndSet(this, 0, 1);
}

/**
 * 获取当前线程的hash值
 */
static final int getProbe() {
    return (int) THREAD_PROBE.get(Thread.currentThread());
}

/**
 * 重置当前线程的hash值
 */
static final int advanceProbe(int probe) {
    probe ^= probe << 13;   // xorshift
    probe ^= probe >>> 17;
    probe ^= probe << 5;
    THREAD_PROBE.set(Thread.currentThread(), probe);
    return probe;
}

4. Cell

是 java.util.concurrent.atomic 下 Striped64 的一个内部类, 它是LongAdder分散热点实现的具体载体

Cell内部类

五、LongAdder源码解读深度分析

1. 源码解读概述

LongAdder在无竞争的情况,跟 AtomicLong 一样,对同一个base 进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

LongAdder并发处理

数学表达
$$
value = base + \sum_{i=0}^n Cell[i]
$$

2. longAdder.increment() 方法解析

2.1 调用流程

java.util.concurrent.atomic.LongAdder#increment —> java.util.concurrent.atomic.LongAdder#add —> java.util.concurrent.atomic.Striped64#longAccumulate

2.2 add() 方法

/**
 * Adds the given value.
 *
 * @param x the value to add
 */
public void add(long x) {
    Cell[] cs; // cells 的引用
    long b; // 获取的base值
    long v; // 期望值
    int m; // cell数组的长度
    Cell c; // 当前线程命中的cell单元格

    // 首次首线程 (cs = cells) != null 一定是false,此时走casBase方法,以CAS的方式更新base值,且只有当cas失败时,才会走到if中
    // 条件1: cells不为室,说明出现过竞争,cell[]己创建
    // 条件2:cas操作base失败,说明其它线程先一步修改了base正在出现竞争
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true; // true表示当前线程cas更新成功, false表示cas更新失败,线程处于竞争中
        // 条件1: 表示cells为空,且线程处于竞争状态中,因为经过casBase方法返回false
        // 条件2: cells数组长度小于0,这个应该不会出现
        // 条件3: 当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell
        // 条件4: 更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到了同一个cell,应扩容
        // getProbe()方法返回的是线程中的threadLocalRandomProbe字段,它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
        if (cs == null || (m = cs.length - 1) < 0 ||
                (c = cs[getProbe() & m]) == null ||
                !(uncontended = c.cas(v = c.value, v + x)))
            // 调用striped64中的方法处理
            longAccumulate(x, null, uncontended);
    }
}

add方法调用流程

  1. 最初无竞争时只更新base;
  2. 如果更新base失败后,首次新建一个Cell[]数组
  3. 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[]扩容

2.3 longAccumulate() 方法

2.3.1 longAccumulate入参说明

  • long x : 需要增加的值,一般默认都是1
  • LongBinaryOperatorfn : 默认传递的是null
  • wasUncontended : 竞争标识,如果是false则代表有竞争。只有cels初始化之后,并且当前线程CAS竞争修改失败,才会是false

2.3.2 Striped64中一些变量或者方法的定义

  • base : 类似于AtomicLong中全局的value值。在没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
  • collide : 表示扩容意向,false一定不会扩容,true可能会扩容。cellsBusy:初始化cells或者扩容cells需要获取锁,0:表示无锁状态 1:表示其他线程已经持有了锁
  • casCellsBusy() : 通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
  • NCPU : 当前计算机CPU数量,Cell数组扩容时会使用到
  • getProbe() : 获取当前线程的hash值
  • advanceProbe() : 重置当前线程的hash值

2.3.3 线程hash值:probe

调用getProbe

定义和初始化

getProbe

2.3.4 longAccumulate 源码总体概览

longAccumulate 源码总体概览

  • CASE1:Cell[]数组已经初始化
  • CASE2:Cell[]数组未初始化(首次新建)
  • CASE3:Cell[]数组正在初始化中

2.3.5 详细解读

/**
 * long型数据原子方式增长
 * @param x 需要增加的值, 一般默认都是1
 * @param fn 默认传递是null
 * @param wasUncontended 竞争标识,如果是false表示有竞争,只有cells初始化之后,并且当前线程CAS竞争修改失败,才会是false
 */
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 存储线程的probe值
    int h;
    // 如果getProbe()方法返回0,说明随机数未初始化
    if ((h = getProbe()) == 0) {
        // 使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
        ThreadLocalRandom.current(); // force initialization
        // 重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasuncontended竞争状态为true。
        h = getProbe();
        // 重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈wasuncontended竞争状态为true
        wasUncontended = true;
    }
    boolean collide = false; // 用于标识是否发生碰撞
    done: for (;;) {
        Cell[] cs; // cell数组
        Cell c; // 单个cell
        int n; // cell数组长度
        long v; // cell中存储的值
        // CASE1: cells已经被初始化(cell数组不为空,且长度>0)
        if ((cs = cells) != null && (n = cs.length) > 0) {
            if ((c = cs[(n - 1) & h]) == null) { // 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
                if (cellsBusy == 0) {       // Cell[]数组没有正在扩容, 尝试创建一个新的cell
                    Cell r = new Cell(x);   // 创建一个Cell单元,值为x
                    if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁,成功后cellsBusy == 1
                        try {               // Recheck under lock
                            Cell[] rs; // 新的cell数组
                            int m; // 新cell数组的长度
                            int j; // 新的cell在cell数组中的索引下标
                            // rs = cells 避免了对全局变量的直接引用, 提高安全性和效率,同时rs 和 cells指向同一个数组,rs变化,cells也会同步
                            if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                rs[j] = r; // 将新创建的cell r 放入cell数组的计算位置
                                break done;
                            }
                        } finally {
                            cellsBusy = 0; // cell创建完成后,重新将cellsBusy置为0,非竞争状态
                        }
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // 如果前一次CAS更新Cell单元失败了
                wasUncontended = true;      // 重新置为true,后面会重新计算线程的hash值
            else if (c.cas(v = c.value,
                    (fn == null) ? v + x : fn.applyAsLong(v, x))) // 试CAS更新Cell单元值
                break;
            else if (n >= NCPU || cells != cs) // 当Cell数组的大小超过CPU核数后,不再进行扩容
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁进行扩容
                try {
                    if (cells == cs) // cells和局部变量cs相同,表示没有其他线程扩容过
                        cells = Arrays.copyOf(cs, n << 1); // 扩容后的大小==当前容量*2
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h); // 计算线程新的hash值,重新参与下一轮竞争中
        }
        // CASE2: cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化
        else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
            try {                           // Initialize table
                if (cells == cs) { // 进行两次校验, 由于cells是共有的对象,可能出现多线程并发修改导致对象状态发生变化,两次校验确保数据状态一致
                    Cell[] rs = new Cell[2]; // 新建一个容量为2的cell数组
                    rs[h & 1] = new Cell(x); // 找到当前线程hash到数组中的位置,并创建对应的cell
                    cells = rs;
                    break done;
                }
            } finally {
                cellsBusy = 0;
            }
        }
        //  // CASE3: cells正在进行初始化,则尝试直接在base上进行累加操作
        else if (casBase(v = base,
                (fn == null) ? v + x : fn.applyAsLong(v, x)))
            break done;
    }
}

代码流程

3. sum() 方法

sum()会将所有Cell数组中的value和base累加作为返回值。
核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。

为什么在并发情况下sum的值不精确

sum执行时,并没有限制对base和cells的更新(关键点)。所以LongAdder不是强一致性的,它是最终一致性的。

  • sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。
  • sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性的,它是最终一致性的。

sum方法

六、总结

1. AtomicLong与LongAdder对比

AtomicLong

  • 线程安全,可允许一些性能损耗,要求高精度时可使用
  • 保证精度,性能代价
  • AtomicLong是多个线程针对单个热点值value进行原子操作

LongAdder

  • 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
  • 保证性能,精度代价
  • LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作

2. AtomicLong

原理

  • CAS+自旋
  • incrementAndGet

场景

  • 低并发下的全局计算
  • AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。

缺陷

  • 高并发后性能急剧下降,AtomicLong的自旋会成为瓶颈

    N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。

3. LongAdder

原理

  • CAS+Base+Cell数组分散
  • 空间换时间并分散了热点数据

场景

高并发下的全局计算

缺陷

sum求和后还有计算线程修改结果的话,最后结果不够准确


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 george_95@126.com