当前位置 : 主页 > 编程语言 > java >

既生AtomicXXX,何生LongAdder?

来源:互联网 收集:自由互联 发布时间:2022-10-15
既生AtomicXXX,何生LongAdder? LongAdder是JDK1.8在​​java.util.concurrent.atomic​​包下新引入的 为了高并发下实现高性能统计的类。 不是有​​AtomicInteger​​ 和 ​​AtomicLong​​嘛,咋还蹦出

既生AtomicXXX,何生LongAdder?

LongAdder是JDK1.8在​​java.util.concurrent.atomic​​包下新引入的 为了高并发下实现高性能统计的类。

不是有​​AtomicInteger​​ 和 ​​AtomicLong​​嘛,咋还蹦出来个​​LongAdder​​???

首先,我们要知道的是,无论是​​AtomicInteger​​还是​​AtomicLong​​,本质上都是利用CAS的思想,来保证的变量的原子性,虽然说比较好用,但是在高并发环境下,多个线程争夺同一个资源时,如果自旋一直不成功,会一直循环,一直占用CPU,这会极大的浪费CPU资源。

public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

所以,​​LongAdder​​应运而生,其类似于分治的思想,将这种并发竞争给分散开来,分而治之,从而达到减少竞争,降低冲突的目的。


LongAdder原理解析

LongAdder内部维护一个 volatile变量​​base​​和一个​​Cell​​类型的数组,每个​​Cell​​类内部也会维护一个volatile字段。

在没有竞争的前提下,跟​​AtomicXXX​​一样,通过CAS ​​base​​​这个变量来完成数据的更新。

一旦发生冲突,即CAS失败后,将会启用​​Cell​​​数组,通过将当前线程的hash值与​​Cell​​​数组长度取模,获取到其对应的Cell类,再对此Cell类的内部变量进行CAS,从而达到数据的更新和减少竞争的目的。

图示如下:

既生AtomicXXX,何生LongAdder?_初始化


add

源码解读

public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

final boolean casBase(long cmp, long val) {
// CAS更新base值
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

通过对上面源码的解读,我们可以了解到

  • 在没有冲突的情况下,也就是​​casBase​​​一直成功的时候,会直接返回
  • 一旦CAS失败返回false,会去使用​​Cell​​​数组
  • 在​​if​​​语句中我们可以看到一个小的优化点,就是如果在​​Cell​​数组合法及计算出当前线程对应得​​​Cell​​​不为null的情况下,会再次去尝试一次CAS,如果能成功的话就不去调用​​​longAccumulate​​​方法了

  • longAccumulate

    进入时机:

  • 第一次发生竞争时,​​Cell​​数组为null的情况下
  • 之后发生竞争时,当前线程CAS对应​​Cell​​类失败的情况下
  • 源码解读

    /**
    * 扩容和初始化Cell数组时,通过CAS实现的一个自旋锁
    */
    transient volatile int cellsBusy;

    /**
    * Cell数组最大长度,为核数
    */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    final boolean casCellsBusy() {
    // 获取锁,将cellsBusy置为1
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

    final void longAccumulate(long x, LongBinaryOperator fn,
    boolean wasUncontended) {
    int h;
    // 对当前线程的hash值做一次校验,如果hash值没有被初始化,则强制初始化一下
    // 线程hash默认是0,也就是说当前线程是第一次进入该方法
    if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); // force initialization
    h = getProbe();
    // 当前线程第一次进入该方法,表示当前线程还未参与竞争,参与竞争的线程hash都不为0
    wasUncontended = true;
    }
    boolean collide = false; // True if last slot nonempty
    for (;;) {
    Cell[] as; Cell a; int n; long v;
    // Cell数组已经初始化过
    if ((as = cells) != null && (n = as.length) > 0) {
    // 当前线程对应的Cell还未初始化
    if ((a = as[(n - 1) & h]) == null) {
    // 获取锁
    if (cellsBusy == 0) {
    // 创建一个新的Cell
    Cell r = new Cell(x);
    if (cellsBusy == 0 && casCellsBusy()) {
    boolean created = false;
    try {
    Cell[] rs; int m, j;
    // 再次校验当前线程对应的Cell是否为null
    if ((rs = cells) != null &&
    (m = rs.length) > 0 &&
    rs[j = (m - 1) & h] == null) {
    rs[j] = r;
    created = true;
    }
    } finally {
    // 释放锁,重新置为0
    cellsBusy = 0;
    }
    // 创建成功,跳出循环
    if (created)
    break;
    continue; // Slot is now non-empty
    }
    }
    // 获取锁失败,说明发生冲突,将collide置为false
    collide = false;
    }
    else if (!wasUncontended)
    // 调用add方法时,在if里面的cas操作失败了
    // 已经知道cas会失败,下面会为当前线程重新计算hash值
    wasUncontended = true;
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
    fn.applyAsLong(v, x))))
    // 当前线程对应的Cell不为null,CAS成功,直接返回
    break;
    else if (n >= NCPU || cells != as)
    // 如果Cell数组长度已经到达了最大值,或者当前Cell数组已经扩容过了,则需要重试
    collide = false;
    else if (!collide)
    // 需要重试
    collide = true;
    else if (cellsBusy == 0 && casCellsBusy()) {
    try {
    // 检查是否已经扩容
    if (cells == as) {
    // Cell数组扩容,每次扩容为原来的2倍
    Cell[] rs = new Cell[n << 1];
    for (int i = 0; i < n; ++i)
    rs[i] = as[i];
    cells = rs;
    }
    } finally {
    cellsBusy = 0;
    }
    collide = false;
    continue; // Retry with expanded table
    }
    //为当前线程重新计算hash值
    h = advanceProbe(h);
    }
    else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    // 第一次竞争,Cell数组还未初始化,需要进行初始化
    boolean init = false;
    try { // Initialize table
    if (cells == as) {
    // 默认容量为2
    Cell[] rs = new Cell[2];
    rs[h & 1] = new Cell(x);
    cells = rs;
    init = true;
    }
    } finally {
    cellsBusy = 0;
    }
    // 初始化成功,跳出循环
    if (init)
    break;
    }
    else if (casBase(v = base, ((fn == null) ? v + x :
    fn.applyAsLong(v, x))))
    // 兜底策略,再次尝试使用cas base
    break;
    }
    }

    sum

    通过以上的源码了解,相信各位小伙伴已经知道了

    LongAdder获取当前value值的方式,就是通过遍历​​Cell​​​数组,累加其中的​​value​​值,最后再加上​​​base​​​值,就是当前的​​​value​​​值了。源码如下:

    public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
    for (int i = 0; i < as.length; ++i) {
    if ((a = as[i]) != null)
    sum += a.value;
    }
    }
    return sum;
    }

    但是,这样获取的​​value​​值可能是不准确的,因为再遍历​​Cell​​数组的过程中,可能有别的线程将已经遍历过的​​Cell​​又进行数据更新,这样一来,当前线程获取的​​value​​值就是不准确的了。


    LongAdder流程分析

    案例分析

    base基础值是:10,Cell数组已经初始化过

    此时有三个线程并发来进行自增操作

    线程1 CAS成功,也代表着线程2,线程3 CAS失败,此时,base = 11

    线程2,线程3,通过hash值计算得到不同的Cell数组下标,分别对当前Cell类内部value值CAS自增

    最终统计sum时,遍历Cell数组,累加得到2,再加上base=11,则sum = 13

    既生AtomicXXX,何生LongAdder?_ide_02


    扩展性知识


    LongAccumulator

    ​​LongAdder​​,只能对​​value​​进行加减操作,可能在一些特殊场景下下无法满足我们的需求,这时候就有了​​LongAccumulator​​

    ​​LongAccumulator​​原理本质上跟​​LongAdder​​一致,只是提供了自定义修改value的方式。

    public class LongAdderTest {

    public static void main(String[] args) {
    LongAccumulator accumulator = new LongAccumulator((x, y) -> x * y, 10);
    accumulator.accumulate(2);

    System.out.println(accumulator.longValue());
    }

    }

    原理也很简单,传入一个自定义计算的表达式,在CAS更新的时候,将计算好的值作为要更新的值就ok啦

    既生AtomicXXX,何生LongAdder?_ide_03

    当然啦,因为​​LongAccumulator​​计算当前值跟​​LongAdder​​原理一致,所以不太精准。

    但是没关系,​​AtomicInteger​​也有类似的方法

    public class LongAdderTest {

    public static void main(String[] args) {
    AtomicInteger atomicInteger = new AtomicInteger(1);
    atomicInteger.getAndUpdate(x -> x * 3);

    System.out.println(atomicInteger.get());
    }

    }

    Cell类使用Contended注解来消除伪共享

    有关伪共享的知识,大家可以去了解一下。

    @sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class<?> ak = Cell.class;
    valueOffset = UNSAFE.objectFieldOffset
    (ak.getDeclaredField("value"));
    } catch (Exception e) {
    throw new Error(e);
    }
    }
    }

    @sun.misc.Contended。加上这个注解的类会自动补齐缓存行,需要注意的是此注解默认是无效的,需要在jvm启动时设置-XX:-RestrictContended才会生效。


    小彩蛋

    在写这篇文章的时候,知道​​LongAdder​​为减少竞争使用Cell数组,内部通过hash值取模定位数组,有想到这做法不是跟Map一样嘛,哈哈

    后来又了解了,​​LongAdder​​统计当前元素值可能不太精准的时候,心想,​​ConcurrentHashMap​​是咋统计的,于是我跑去瞅了瞅,这不看不知道,一看吓一跳,哈哈

    不能说是有点类似,只能说是一摸一样

    既生AtomicXXX,何生LongAdder?_初始化_04

    既生AtomicXXX,何生LongAdder?_初始化_05

    ​​ConcurrentHashMap​​每次​​put​​ 完元素后,会更新​​size​​,就会去调用​​addCount​​方法

    这里面其实跟​​LongAdder​​一样,通过数组 + base值来处理,最终统计也是遍历数组累加 + base值得到的结果,所以​​ConcurrentHashMap​​的​​size​​也不准确

    惭愧啊~,要不是今天看了,我都忘记了,明明记得之前看过​​ConcurrentHashMap​​源码的

    网友评论