当前位置 : 主页 > 编程语言 > java >

Java并发编程——LongAdder源码分析

来源:互联网 收集:自由互联 发布时间:2023-02-04
AtomicLong 大家对AtomicLong应该比较熟悉,AtomicLong是作用是对长整形进行原子操作,显而易见,在java1.8中新加入了一个新的原子类LongAdder,该类也可以保证Long类型操作的原子性,相对于

AtomicLong

大家对AtomicLong应该比较熟悉,AtomicLong是作用是对长整形进行原子操作,显而易见,在java1.8中新加入了一个新的原子类LongAdder,该类也可以保证Long类型操作的原子性,相对于AtomicLong,LongAdder有着更高的性能和更好的表现,可以完全替代AtomicLong的来进行原子操作。

 

AtomicLong的incrementAndGet()方法在高并发场景下,多个线程竞争修改共享资源value,会造成循环耗时过长,进而导致性能问题,下面贴出源码来讲解这个问题:

public class AtomicLong extends Number implements java.io.Serializable { public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } }

其中,unsafe.getAndAddLong方法如下:

public final class Unsafe { private static final Unsafe theUnsafe; // 当线程竞争很激烈时,while判断条件中的CAS会连续多次返回false, // 这样就会造成无用的循环,循环中读取volatile变量的开销本来就是比较高的 // 因为这样,在高并发时,AtomicXXX并不是那么理想的计数方式 public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; } }

可以看到,多个线程在竞争修改共享资源value值时,是在一个循环里面,高并发情况下,同一时刻只有一个线程CAS操作成功,其他大多数线程CAS失败,从而处于不断循环重试的场景,因此对性能造成影响。

LongAdder

LongAdder在高并发时比AtomicLong更高效,LongAdder是根据ConcurrentHashMap这类为并发设计的类的基本原理——锁分段,来实现的,它里面维护一组按需分配的计数单元,并发计数时,不同的线程可以在不同的计数单元上进行计数,这样减少了线程竞争,提高了并发效率。本质上是用空间换时间的思想,不过在实际高并发情况中消耗的空间可以忽略不计。

 

在处理高并发计数时,应该优先使用LongAdder,而不是继续使用AtomicLong。当然,线程竞争很低的情况下进行计数,使用Atomic还是更简单更直接,并且效率稍微高一些。

首先看LongAdder的类结构

public class LongAdder extends Striped64 implements Serializable { }

LongAdder继承了Striped64,真正发挥作用的是这个Striped64类,来看看它的类结构:

/** * A package-local class holding common representation and mechanics * for classes supporting dynamic striping on 64bit values. The class * extends Number so that concrete subclasses must publicly do so. */ @SuppressWarnings("serial") abstract class Striped64 extends Number { }

接下来看它的重要属性有哪些:

abstract class Striped64 extends Number { /** * Table of cells. When non-null, size is a power of 2. * 提升性能发挥作用的Cell数组,核心思想是通过多个线程在对应自己的Cell进行累加, * 从而减少竞争。数量为2的n次幂,和hashmap一样,为了减少冲突概率 */ transient volatile Cell[] cells; /** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. * 多个线程没有发生竞争的时候,值累加在base上,这与AtomicLong的value作用是一样的 */ transient volatile long base; /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. * Cells的锁标记,当Cells数组初始化,创建元素或者扩容的时候为1,否则为0 */ transient volatile int cellsBusy; }

可能不少同学对Cell感到不解,其实很简单,打开源码就知道究竟了

abstract class Striped64 extends Number { //@Contended注解是JDK1.8提供的字节填充方式,解决伪共享问题 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } } }

它是Striped64的内部类,里面有个volatile修饰的value值,也是通过cas操作修改它的值,LongAdder计数器的值就是所有Cell[]的value和再加上base的值。

常用关键方法

public class LongAdder extends Striped64 implements Serializable { /** * 原子累加 1 */ public void increment() { add(1L); } /** * 原子递减 1 */ public void decrement() { add(-1L); } }

可以看到,递增递减都调用了add()方法,可见它是实现的核心。往里看:

public class LongAdder extends Striped64 implements Serializable { /** * 原子累加指定的值 x 到 LongAdder */ public void add(long x) { Cell[] as; long b, v; int m; Cell a; /** * 1)如果 cells 为 null,则尝试原子更新值到 base 中 * 2)如果 cells 不为 null,则将其累加到其中一个 cell 中。 * if (!casBase(b = base, b + x)) { * 首先尝试原子更新值到 base 中,更新失败则将其累加到指定的 cell 中? * } */ if ((as = cells) != null || !casBase(b = base, b + x)) { /** * 1)cells 为 null,并且原子更新 base 值失败,出现在第一次竞争发生时。 * 2)cells 不为 null * cell 是否发生竞争的标记 */ boolean uncontended = true; /** * cells 不为 null && * 其长度大于 1 && * 基于当前线程的探测值定位的 cell 不为 null && * 则尝试原子更新目标 cell 值 */ if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) /** * 1)cell 为 null * 2)原子更新目标 cell 值失败,即单个 cell 发生竞争 */ longAccumulate(x, null, uncontended); } } }

先来看第一个if分支if ((as = cells) != null || !casBase(b = base, b + x)) 由于初始时cells为空,第一次调用add()方法的话,(as = cells) != null不成立,转向!casBase(b = base, b + x),打开里面代码很简单,就是对base值进行CAS修改,前面说过,没有竞争的时候修改的是base值,发生竞争的时候Cellp[]才起作用。

final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); }

如果casBase返回true,表示该线程修改成功,结束; 如果casBase返回false,表示该线程修改失败,产生了竞争,进入里面的if条件

if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x)))

咋看有点复杂,有4个分支,不着急,一个个来看。 第一个和第二个就是判断Cell[]有没有初始化,且元素不为空。

 

第三个和第四个就是在Cell[]已初始化的前提下,定位出当前线程应该对应的Cell元素,并尝试CAS修改里面的value值,给它加x,如果不成功,进入里面的longAccumulate(x, null, uncontended);

 

进入之前,可能有同学对uncontended和getProbe() & m有疑问。 uncontendted,翻译过来是"未发生过竞争的"意思,里面的方法会用到这个标记;而getProbe()返回的是Thread类threadLocalRandomProbe属性的值,它在ThreadLocalRandom里面发挥作用。

 

在这里我们可以把它理解成HashMap的哈希值h,然后与m=as.length - 1进行与操作,其实等效于h % as.length,即找到对应的位置,是不是和HashMap定位元素位置很类似?

static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe"));

现在我们可以进入longAccumulate(x, null, uncontended);方法了,打开一看,你kin你ca,这么复杂,绝望了有没有?

abstract class Striped64 extends Number { final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 如果没有初始化 if ((h = getProbe()) == 0) { // current()里面会初始化probe值 ThreadLocalRandom.current(); // force initialization // 重新获取probe值 h = getProbe(); // 还未初始化,肯定没有产生竞争 wasUncontended = true; } // 是否发生碰撞,即多个线程hash到同一个Cell元素位置 boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; // 如果cells数组已经初始化 if ((as = cells) != null && (n = as.length) > 0) { // hash到的数组元素位置为空 if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create // 尝试获取锁 if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; // 再次检查该位置元素是否为空 if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 将新生成的元素Cell(x)放在该位置上 rs[j] = r; created = true; } } finally { // 释放锁 cellsBusy = 0; } if (created) // (1)创建成功,退出循环 break; // 创建不成功,下一轮循环重试 continue; // Slot is now non-empty } } // 该位置元素为空,则没有发生碰撞 collide = false; } // 对应外面add()方法的第四个条件,即该位置元素不为空,且cas失败了 // 重置wasUncontended,通过下面的advanceProbe()重新hash,找到新的位置进行下一轮重试 // 之所以重置wasUncontended,是为了下一轮重试时走下面cas分支,尝试对该位置元素进行值的修改 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 第N(N > 1)轮重试,尝试对该位置元素进行值的修改, else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (2)修改成功退出循环 break; // 如果数组元素到达CPU个数或者已经被扩容了,则重新hash下一轮重试 else if (n >= NCPU || cells != as) collide = false; // At max size or stale // 以上条件都不满足,则发生了碰撞,且竞争失败了 else if (!collide) collide = true; // 碰撞竞争失败时,则去尝试获取锁去扩容Cell数组 else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale // 扩容为原来的2倍 Cell[] rs = new Cell[n << 1]; // 拷贝旧数组元素到新数组中 for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { // 释放锁 cellsBusy = 0; } // 扩容成功,则重置collide,表示我有新的位置去重试了,不跟你抢这个位置了 collide = false; continue; // Retry with expanded table } // 产生新的hash值,尝试去找别的数组位置 h = advanceProbe(h); } // Cell[]为空,对应外面add()的第一二个条件,则尝试获取锁去初始化数组 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { // 初始化大小为2 Cell[] rs = new Cell[2]; // 将Cell(x)放在0或1号位置上 rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { // 释放锁 cellsBusy = 0; } // (3)初始化成功,退出循环 if (init) break; } // 有别的线程正在初始化数组,则尝试累加在base变量上 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (4)成功则退出循环 break; // Fall back on using base } } }

由上面代码可以看出,这个方法逻辑相当复杂,再来总结梳理下,可以从上面注释标记的4处退出循环的条件来看:

  • (1) Cell[]不为空,hash到的位置元素为空,那么就创建元素,并赋值为x,成功的话可以退出循环;

  • (2) Cell[]不为空,hash到的位置元素不为空,且上一轮cas修改失败了,这轮重试如果成功,可以退出循环;

  • (3) Cell[]为空,那么尝试初始化数组,并把x赋值到0或1号位置上,成功的话可以退出循环;

  • (4) Cell[]为空,且有其他线程在初始化数组,那么尝试累加到base上,成功的话可以退出循环;

  • 其他条件都是需要通过advanceProbe()进行rehash到其他位置,进行下一轮重试

三、总结

总结之前顺便提下LongAccumulator,它是把LongAdder的(v + x)操作换成一个LongBinaryOperator,即用户可以自定义累加操作的逻辑,其他地方都是一样的

public class LongAccumulator extends Striped64 implements Serializable { private final LongBinaryOperator function; private final long identity; public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) { this.function = accumulatorFunction; base = this.identity = identity; } }

整个LongAdder的源码分析就到这里结束了,其实JDK也提供了double类型的DoubleAdder和DoubleAccumulator,他们都继承了Striped64,原理是大同小异的,有兴趣的同学可以自己去看看源码。

 

关于平时开发如何选择AtomicLong,相信大家也很清楚了,并发不高的情况下用AtomicLong就行,并发很高的情况下就要选择LongAdder或者LongAccumulator了!

 

参考: 《Java并发编程之美》

https://segmentfault.com/a/1190000039055166

上一篇:Java并发编程——ReentrantLock实现原理
下一篇:没有了
网友评论