当前位置 : 主页 > 编程语言 > 其它开发 >

AQS 源码分析

来源:互联网 收集:自由互联 发布时间:2022-05-30
一、AQS 简介 AQS,就是 AbstractQueuedSynchronizer ,在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义,AQS则实现了 对同步状态的管理,以
一、AQS 简介

AQS,就是 AbstractQueuedSynchronizer,在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义,AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法,归纳整理如下:

AbstractQueuedSynchronizer独占式锁方法如下:

// 独占式获取同步状态,如果获取失败则插入同步队列进行等待;
public final void acquire(int arg) 
// 与acquire方法相同,但在同步队列中进行等待的时候可以检测中断;
public final void acquireInterruptibly(int arg)
// 在acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false;
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
// 释放同步状态,该方法会唤醒在同步队列中的下一个节点
public final boolean release(int arg) 

AbstractQueuedSynchronizer共享式锁方法如下:

// 共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;
public final void acquireShared(int arg)
// 在acquireShared方法基础上增加了能响应中断的功能;
public final void acquireSharedInterruptibly(int arg)
// 在acquireSharedInterruptibly基础上增加了超时等待的功能;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
// 共享式释放同步状态
public final boolean releaseShared(int arg)
二、同步队列

先要了解一下AQS中的同步队列。

当共享资源被某个线程占有,其他请求该资源的线程就会被阻塞,从而进入同步队列。

就数据结构而言,队列的实现就是数组或链表的形式。AQS 中的同步队列是通过链表实现的

在AQS 中有一个静态内部类:

static final class Node {
	//指示节点正在共享模式下等待的标记
    static final Node SHARED = new Node();
	//指示节点正在独占模式下等待的标记
    static final Node EXCLUSIVE = null;
	//节点从同步队列中取消
    static final int CANCELLED =  1;
	//后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行;
    static final int SIGNAL    = -1;
	//当前节点进入等待队列中
    static final int CONDITION = -2;
	//表示下一次共享式同步状态获取将会无条件传播下去
    static final int PROPAGATE = -3;

	//节点状态
    volatile int waitStatus;
	//当前节点(线程)的前驱节点
    volatile Node prev;
	//当前节点(线程)的后继节点
    volatile Node next;
	//加入同步队列的线程引用
    volatile Thread thread;
	//等待队列中的下一个节点
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {   
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

从上面可以看出来,AQS 的同步队列有前驱结点和后续节点,它是一个双向链表。下面看一个Demo

public class Test {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        MyThread myThread = new MyThread(lock);
        for (int i = 0; i < 5; i++) {
            new Thread(myThread).start();
        }
    }
}

class MyThread implements Runnable {
    private final ReentrantLock lock;
    static int count;
    MyThread(ReentrantLock lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        lock.lock();
        System.out.println("count == " + count++);
        lock.unlock();
    }
}

实例代码中开启了5个线程,先获取锁之后再睡眠10S中,实际上这里让线程睡眠是想模拟出当线程无法获取锁时进入同步队列的情况。通过debug,当Thread-4(在本例中最后一个线程)获取锁失败后进入同步时,AQS时现在的同步队列如图所示:

Thread-0先获得锁后进行睡眠,其他线程(Thread-1,Thread-2,Thread-3,Thread-4)获取锁失败进入同步队列,同时也可以很清楚的看出来每个节点有两个域:prev(前驱)和next(后继),并且每个节点用来保存获取同步状态失败的线程引用以及等待状态等信息。另外AQS中有两个重要的成员变量:

private transient volatile Node head;
private transient volatile Node tail;

也就是说AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法。其示意图如下:

通过对源码的理解以及做实验的方式,现在我们可以清楚的知道这样几点:

  • 节点的数据结构,即AQS的静态内部类Node,节点的等待状态等信息
  • 同步队列是一个双向队列,AQS通过持有头尾指针管理同步队列

那么,节点如何进行入队和出队是怎样做的了?实际上这对应着锁的获取和释放两个操作:获取锁失败进行入队操作,获取锁成功进行出队操作。

三、独占锁 1. 独占锁的获取

独占锁的获取(acquire方法)

继续通过看源码和debug的方式来看,还是以上面的demo为例,调用lock()方法是获取独占式锁,获取失败就将当前线程加入同步队列,成功则线程执行。而lock()方法实际上会调用AQS的acquire()方法,源码如下:

public final void acquire(int arg) {
    //先看同步状态是否获取成功,如果成功则方法结束返回
    //若失败则先调用addWaiter()方法,再调用acquireQueued()方法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire根据当前获得同步状态成功与否做了两件事情:

  1. 成功,则方法结束返回
  2. 失败,则先调用addWaiter()然后在调用acquireQueued()方法。
1.1 获取同步状态失败,入队操作

当线程获取独占式锁失败后就会将当前线程加入同步队列,那么加入队列的方式是怎样的了?我们接下来就应该去研究一下addWaiter()acquireQueued()addWaiter() 源码如下:

private Node addWaiter(Node mode) {
    // 1. 将当前线程构建成Node类型
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 2. 当前尾节点是否为null
    Node pred = tail;
    if (pred != null) {
        // 2.2 将当前节点尾插入的方式插入同步队列中
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 2.1. 当前同步队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程
    enq(node);
    return node;
}

分析可以看上面的注释。程序的逻辑主要分为两个部分:

  1. 当前同步队列的尾节点为null,调用方法enq()插入;
  2. 当前队列的尾节点不为null,则采用尾插入(compareAndSetTail() 方法)的方式入队。

另外还会有另外一个问题:如果 if (compareAndSetTail(pred, node))为false怎么办?会继续执行到enq()方法,同时很明显compareAndSetTail是一个CAS操作,通常来说如果CAS操作失败会继续自旋(死循环)进行重试。

因此,经过我们这样的分析,enq()方法可能承担两个任务:

  1. 处理当前同步队列尾节点为null时进行入队操作;
  2. 如果CAS尾插入节点失败后负责自旋进行尝试。

那么是不是真的就像我们分析的一样了?只有源码会告诉我们答案,enq()源码如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //1. 构造头结点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 2. 尾插入,CAS操作失败自旋尝试
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在上面的分析中我们可以看出在第1步中会先创建头结点,说明同步队列是带头结点的链式存储结构。带头结点与不带头结点相比,会在入队和出队的操作中获得更大的便捷性,因此同步队列选择了带头结点的链式存储结构。那么带头节点的队列初始化时机是什么?自然而然是在tail为null时,即当前线程是第一次插入同步队列compareAndSetTail(t, node)方法会利用CAS操作设置尾节点,如果CAS操作失败会在for (;;)for死循环中不断尝试,直至成功return返回为止。因此,对enq()方法可以做这样的总结:

  1. 在当前线程是第一个加入同步队列时,调用compareAndSetHead(new Node())方法,完成链式队列的头结点的初始化
  2. 自旋不断尝试CAS尾插入节点直至成功为止

现在我们已经很清楚获取独占式锁失败的线程包装成Node然后插入同步队列的过程了。那么紧接着会有下一个问题:在同步队列中的节点(线程)会做什么事情来保证自己能够有机会获得独占式锁了?带着这样的问题我们就来看看acquireQueued()方法,从方法名就可以很清楚,这个方法的作用就是排队获取锁的过程,源码如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 1. 获得当前节点的先驱节点
            final Node p = node.predecessor();
            // 2. 当前节点能否获取独占式锁					
            // 2.1 如果当前节点的先驱节点是头结点并且成功获取同步状态,即可以获得独占式锁
            if (p == head && tryAcquire(arg)) {
                //队列头指针用指向当前节点
                setHead(node);
                //释放前驱节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 2.2 获取锁失败,线程进入等待状态等待获取独占式锁
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

程序逻辑通过注释已经标出,整体来看这又是一个自旋的过程(for循环),代码首先获取当前节点的先驱节点,如果先驱节点是头结点的并且成功获得同步状态的时候(if (p == head && tryAcquire(arg))),当前节点所指向的线程能够获取锁。反之,获取锁失败进入等待状态。整体示意图为下图:

1.2 获取锁成功,出队操作

获取锁的节点出队的逻辑是:

//队列头结点引用指向当前节点
setHead(node);
//释放前驱节点
p.next = null; // help GC
failed = false;
return interrupted;

setHead() 方法为:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

将当前节点通过setHead()方法设置为队列的头结点,然后将之前的头结点的next域设置为null并且pre域也为null,即与队列断开,无任何引用方便GC时能够将内存进行回收。示意图如下:

那么当获取锁失败的时候会调用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他们做了什么事情。shouldParkAfterFailedAcquire()方法源码为:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire()方法主要逻辑是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS将节点状态由INITIAL设置成SIGNAL,表示当前线程阻塞。当compareAndSetWaitStatus设置失败则说明shouldParkAfterFailedAcquire方法返回false,然后会在acquireQueued()方法中for (;;)死循环中会继续重试,直至compareAndSetWaitStatus设置节点状态位为SIGNAL时shouldParkAfterFailedAcquire返回true时才会执行方法parkAndCheckInterrupt()方法,该方法的源码为:

private final boolean parkAndCheckInterrupt() {
    //使得该线程阻塞
	LockSupport.park(this);
    return Thread.interrupted();
}

该方法的关键是会调用LookSupport.park()方法(关于LookSupport会在以后的文章进行讨论),该方法是用来阻塞当前线程的。因此到这里就应该清楚了,acquireQueued()在自旋过程中主要完成了两件事情:

  1. 如果当前节点的前驱节点是头节点,并且能够获得同步状态的话,当前线程能够获得锁该方法执行结束退出
  2. 获取锁失败的话,先将节点状态设置成SIGNAL,然后调用LookSupport.park方法使得当前线程阻塞

经过上面的分析,独占式锁的获取过程也就是acquire()方法的执行流程如下图所示:

2. 独占锁的释放

独占锁的释放(release()方法)

ReentrantLockunlock() 方法,调用的是 AQS 的 release() 方法。下面看一下

独占锁的释放就相对来说比较容易理解了,先来看下源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

这段代码逻辑就比较容易理解了,如果同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。unparkSuccessor方法源码:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */

	//头节点的后继节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
		//后继节点不为null时唤醒该线程
        LockSupport.unpark(s.thread);
}

源码的关键信息请看注释,首先获取头节点的后继节点,当后继节点不为空的时候会调用LookSupport.unpark()方法,该方法会唤醒该节点的后继节点所包装的线程。因此,每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程。

到现在我们终于啃下了一块硬骨头了,通过学习源码的方式非常深刻的学习到了独占式锁的获取和释放的过程以及同步队列。可以做一下总结:

  1. 线程获取锁失败,线程被封装成Node进行入队操作,核心方法在于addWaiter()enq(),同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试
  2. 线程获取锁是一个自旋的过程,当且仅当 当前节点的前驱节点是头结点并且成功获得同步状态时,节点出队即该节点引用的线程获得锁,否则,当不满足条件时就会调用LookSupport.park()方法使得线程阻塞
  3. 释放锁的时候会唤醒后继节点

总体来说:在获取同步状态时,AQS维护一个同步队列,获取同步状态失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时,同步器会调用unparkSuccessor() 方法唤醒后继节点。

3. 可中断式获取锁

可中断式获取锁(acquireInterruptibly方法)

我们知道lock相较于synchronized有一些更方便的特性,比如能响应中断以及超时等待等特性,现在我们依旧采用通过学习源码的方式来看看能够响应中断是怎么实现的。可响应中断式锁可调用方法lock.lockInterruptibly();而该方法其底层会调用AQS的acquireInterruptibly方法,源码为:

public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        //线程获取锁失败
        doAcquireInterruptibly(arg);
}

在获取同步状态失败后就会调用doAcquireInterruptibly方法:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
	//将节点插入到同步队列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //获取锁出队
			if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
				//线程中断抛异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

关键信息请看注释,现在看这段代码就很轻松了吧,与acquire方法逻辑几乎一致,唯一的区别是当parkAndCheckInterrupt返回true时,即线程阻塞时该线程被中断,代码抛出被中断异常。

4. 超时等待式获取锁

超时等待式获取锁(tryAcquireNanos()方法)

通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果,该方法会在三种情况下才会返回:

  1. 在超时时间内,当前线程成功获取了锁;
  2. 当前线程在超时时间内被中断;
  3. 超时时间结束,仍未获得锁返回false。

我们仍然通过采取阅读源码的方式来学习底层具体是怎么实现的,该方法会调用AQS的方法tryAcquireNanos(),源码为:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
		//实现超时等待的效果
        doAcquireNanos(arg, nanosTimeout);
}

很显然这段源码最终是靠doAcquireNanos方法实现超时等待的效果,该方法源码如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
	//1. 根据超时时间和当前时间计算出截止时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
			//2. 当前线程获得锁出队列
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
			// 3.1 重新计算超时时间
            nanosTimeout = deadline - System.nanoTime();
            // 3.2 已经超时返回false
			if (nanosTimeout <= 0L)
                return false;
			// 3.3 线程阻塞等待 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 3.4 线程被中断抛出被中断异常
			if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

程序逻辑如图所示:

程序逻辑同独占锁和响应中断式获取基本一致,唯一的不同在于获取锁失败后,对超时时间的处理上,在第1步会先计算出按照现在时间和超时时间计算出理论上的截止时间,比如当前时间是08:10,超时时间是10min,那么根据deadline = System.nanoTime() + nanosTimeout计算出刚好达到超时时间时的系统时间就是08:10+10min = 08:20。然后根据deadline - System.nanoTime()就可以判断是否已经超时了,比如,当前系统时间是08:20很明显已经超过了理论上的系统时间08:20,deadline - System.nanoTime()计算出来就是一个负数,自然而然会在3.2步中的If判断之间返回false。如果还没有超时即3.2步中的if判断为true时就会继续执行3.3步通过LockSupport.parkNanos使得当前线程阻塞,同时在3.4步增加了对中断的检测,若检测出被中断直接抛出被中断异常。

5. 公平锁和非公平锁

还是看上面同步队列中的例子,以 ReentrantLocklock() 方法为例。

public void lock() {
    sync.lock();
}

可以看一下这里的 sync 是有两个实现的:公平和非公平

5.1 公平锁

进入 AQS 的 acquire() 方法,这个方法进去之后会调用 AQS 实现类的 tryAcquire() 方法。tryAcquire() 方法分为公平锁和非公平锁的实现:

下面看一下公平锁的实现(也就是 FairSync 的实现):

static final class FairSync extends Sync {
    //...
    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取当前线程的状态,大于0则是获取到锁的(1代表获取一次锁,2代表重入了一次),等于0就是没获取到锁
        int c = getState();
        // 如果当前状态没获取锁
        if (c == 0) {
            // 这里先判断当前队列是否为空,如果为空,当前队列就可以直接获取锁
            // 如果当前队列不为空,则后面直接返回false,让线程走上面分析的独占锁的获取流程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果获取到锁的就是当前的线程,则直接重入一次,就是 state+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

这里主要是 !hasQueuedPredecessors() 判断了当前队列是否为空,为空就加入队列尾部等待,所以称之为公平锁。

5.2 非公平锁

可以看到非公平锁的实现,上来就直接尝试获取锁(就是compareAndSetState(0, 1)),如果获取不到,就调用 AQS 的 acquire() 方法,这里进入 AQS 的 acquire() 方法之后,再看非公平锁的tryAcquire(arg) 实现

static final class NonfairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    // ...
    final boolean nonfairTryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取当前线程的状态,大于0则是获取到锁的(1代表获取一次锁,2代表重入了一次),等于0就是没获取到锁
        int c = getState();
        // 如果当前状态没获取锁
        if (c == 0) {
            // 这里就不会像公平锁一样去判断队列是否为空了,直接回尝试抢占锁。
            // 如果抢到了就获取锁,没抢到才会加入队列尾部进行排队
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果获取到锁的就是当前的线程,则直接重入一次,就是 state+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
四、共享锁

共享锁就需要看一下 ReentrantReadWriteLock 类了,这个是一个读写锁,它的读锁就是共享锁,写锁就是互斥锁:读读共享、读写互斥,写写互斥。

ReentrantReadWriteLock 的 API 介绍

ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 获取读锁
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
readLock.lock();
readLock.unlock();
// 获取写锁
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
writeLock.lock();
writeLock.unlock();
1. 共享锁的获取

共享锁的获取(acquireShared()方法)

可以看看上面对于读写锁的 API 介绍,看看 readLock.lock();writeLock.lock(); 的源码可以发现:

  • readLock.lock(); 调用的是 AQS 的 acquireShared() 方法,也就是获取共享锁
  • writeLock.lock(); 调用的是 AQS 的 acquire() 方法,也就是互斥锁

下面看一下共享锁是怎么获取的,直接上 acquireShared() 方法的源码:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

这段源码的逻辑很容易理解,在该方法中会首先调用tryAcquireShared方法,tryAcquireShared返回值是一个int类型,当返回值为大于等于0的时候方法结束说明获得成功获取锁,否则,表明获取同步状态失败即所引用的线程获取锁失败,会执行doAcquireShared方法,该方法的源码为:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 当该节点的前驱节点是头结点且成功获取同步状态
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

现在来看这段代码会不会很容易了?逻辑几乎和独占式锁的获取一模一样,这里的自旋过程中能够退出的条件是当前节点的前驱节点是头结点并且tryAcquireShared(arg)返回值大于等于0即能成功获得同步状态

这里以公平锁为例,看一下 tryAcquireShared(arg) 的源码:

static final class FairSync extends Sync {
	// ...
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 队列中有等待线程则不获取锁,返回 -1
            if (hasQueuedPredecessors())
                return -1;
            // 获取状态
            int available = getState();
            int remaining = available - acquires;
            // 尝试设置状态,但是这里获取锁的时候并不会给 state 加一,因为是共享锁
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

这里和独占锁有一个很重要的区别,就是在独占锁中是调用了 AQSacquireQueued 方法去排队等待获取锁,而共享锁只是将当前线程加入了队列中,然后调用 tryAcquireShared(arg) 尝试获取锁,并且共享锁并不会给 state 加 1。

而互斥锁,就是这里的写锁会给 state 加 1 ,代表当前是有互斥锁的。

总结

  • 互斥锁会给 AQS 的 state 从 0 改为 1(代表当前线程获取到锁),如果重入锁,state 会累加 1 ,释放锁 state 就会累减 1,减到 0 后就完全释放锁了
  • 共享锁并不会给 AQS 的 state 加一或减一,并且共享锁加入队列后并不会一直自旋等待获取锁,而是直接看 state 是否为 0,为 0 代表没有写锁了,不为 0 就代表在它前面还有写锁。读写互斥
2. 共享锁的释放

共享锁的释放(releaseShared()方法)

共享锁的释放在AQS中会调用方法 releaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

当成功释放同步状态之后即tryReleaseShared会继续执行doReleaseShared方法:

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

这段方法跟独占式锁释放过程有点不同,在共享式锁的释放过程中,对于能够支持多个线程同时访问的并发组件,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。

3. 可中断和超时等待

可中断(acquireSharedInterruptibly()方法),超时等待(tryAcquireSharedNanos()方法)

关于可中断锁以及超时等待的特性其实现和独占式锁可中断获取锁以及超时等待的实现几乎一致,具体的就不再说了,如果理解了上面的内容对这部分的理解也是水到渠成的。

五、锁

锁升级和锁降级不是一回事,锁升级是synchronized关键字在jdk1.6之后做的优化,锁降级是为了保证数据的可见性在添加了写锁后再添加一道读锁

1. 锁升级

锁升级的顺序为:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁,且锁升级的顺序是不可逆的。

因为操作唤醒阻塞的线程需从用户态切换到内存态,开销大,所以才优先使用轻量级锁进行循环判断是否可获取监视类或对象。

线程第一次获取锁获时锁的状态为偏向锁,如果下次还是这个线程获取锁,则锁的状态不变,否则会升级为CAS轻量级锁;如果还有线程竞争获取锁,如果线程获取到了轻量级锁没啥事了,如果没获取到会自旋,自旋期间获取到了锁没啥事,超过了10次还没获取到锁,锁就升级为重量级的锁,此时如果其他线程没获取到重量级锁,就会被阻塞等待唤起,此时效率就低了。

2. 锁降级

锁降级:写锁降级成为读锁。持有写锁的同时,获取到读锁,然后释放写锁。避免读到被其他线程修改的数据。

oracle官网锁降级示例:

 class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
 
   void processCachedData() {
     rwl.readLock().lock();
     // 缓存无效
     if (!cacheValid) {
        // Must release read lock before acquiring write lock
        // 释放读锁
        rwl.readLock().unlock();
        // 尝试获取写锁
        rwl.writeLock().lock();
        try {
          // Recheck state because another thread might have
          // acquired write lock and changed state before we did.
          // 再次判断获取是否无效
          if (!cacheValid) {
              // 获取数据
            data = ...
            cacheValid = true;
          }
          // Downgrade by acquiring read lock before releasing write lock
          // 锁降级
          rwl.readLock().lock();
        } finally {
          rwl.writeLock().unlock(); // Unlock write, still hold read
        }
     }
     // 经过很长的时间做一些处理、而且不想我刚刚自己更新的数据被别人改了
     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }
六、Synchronized 和 Lock 区别

百度抄一把!!!

存在层面

synchronized: 是Java 中的一个关键字,存在于 JVM 层面

Lock: 是 Java 中的一个接口

锁的释放条件

synchronized:1、以获取锁的线程执行完同步代码,释放锁 2、线程执行发生异常,jvm会让线程释放锁

Lock:在finally中必须释放锁,不然容易造成线程死锁

锁的获取

synchronized: 在发生异常时候会自动释放占有的锁,因此不会出现死锁

Lock: 发生异常时候,不会主动释放占有的锁,必须手动unlock来释放锁,可能引起死锁的发生

锁的状态

synchronized:无法判断

Lock:可以判断

锁的类型

synchronized: 可重入 不可中断 非公平

Lock:可重入 可判断 可公平(两者皆可)

性能

synchronized: 少量同步

Lock:大量同步

  • Lock 可以提高多个线程进行读操作的效率。(可以通过 readwritelock 实现读写分离)
  • 在资源竞争不是很激烈的情况下,Synchronized 的性能要优于 ReetrantLock,但是在资源竞争很激烈的情况下,Synchronized 的性能会下降几十倍,但是 ReetrantLock 的性能能维持常态;
  • ReentrantLock 提供了多样化的同步,比如有时间限制的同步,可以被 Interrupt 的同步(synchronized 的同步是不能 Interrupt 的)等。在资源竞争不激烈的情形下,性能稍微比synchronized 差点点。但是当同步非常激烈的时候,synchronized 的性能一下子能下降好几十倍。而ReentrantLock 确还能维持常态。

调度

synchronized:使用 Object 对象本身的 waitnotifynotifyAll调度机制

Lock:可以使用 Condition 进行线程之间的调度

用法

synchronized:在需要同步的对象中加入此控制,synchronized可以加在方法上,也可以加在特定代码块中,括号中表示需要锁的对象。

Lock:一般使用ReentrantLock类做为锁。在加锁和解锁处需要通过lock()unlock()显示指出。所以一般会在finally块中写unlock()以防死锁。

底层实现

synchronized:底层使用指令码方式来控制锁的,映射成字节码指令就是增加来两个指令:monitorentermonitorexit。当线程执行遇到 monitorenter 指令时会尝试获取内置锁,如果获取锁则锁计数器+1,如果没有获取锁则阻塞;当遇到 monitorexit 指令时锁计数器-1,如果计数器为0则释放锁。

Lock:底层是CAS乐观锁,依赖AbstractQueuedSynchronizer类,把所有的请求线程构成一个CLH队列。而对该队列的操作均通过Lock-Free(CAS)操作。

上一篇:10轮伪匹配
下一篇:没有了
网友评论