目录
- 一:简述
- 二:ReentrantLock类图
- 三:流程简图
- 四:源码分析
- lock()源码分析:
- 非公平实现:
- 公平锁实现:
- tryAcquire()方法
- 公平锁实现:
- 非公平锁实现:
- addWaiter()
- acquireQueued()
- shouldParkAfterFailedAcquire()
- parkAndCheckInterrupt()
- unlock()方法源码分析:
- tryRelease()
- unparkSuccessor()
- 五:总结
一:简述
ReentrantLock是java.util.concurrent包中提供的一种锁机制,它是一种可重入,互斥的锁,ReentrantLock还同时支持公平和非公平两种实现。本篇文章基于java8,对并发工具ReentrantLock的实现原理进行分析。
二:ReentrantLock类图
三:流程简图
四:源码分析
我们以lock()方法和unlock()方法为入口对ReentrantLock的源码进行分析。
lock()源码分析:
ReentrantLock在构造构造方法创建对象的时候会根据构造函数传递的fair参数 创建不同的Sync对象(默认是非公平锁的实现),reentrantLock.lock()会调用sync.lock()。在Sync类中的lock()方法是一个抽象方法,具体实现分别在FairSync(公平)和NonfairSync(非公平)中。
public ReentrantLock() { //默认是非公平锁 sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
public void lock() { sync.lock(); }
非公平实现:
final void lock() { // state表示锁重入次数 0代表无锁状态 //尝试抢占锁一次 利用cas替换state标志 替换成功代表抢占锁成功 if (compareAndSetState(0, 1)) //抢占成功将抢占到锁的线程设置为当前线程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
公平锁实现:
final void lock() { acquire(1); }
可以看出非公平锁在lock()方法开始会尝试cas抢占一次锁,也就是在这里会插队一次。然后都会调用acquire()方法。acquire()方法中调用了三个方法,tryAcquire(),addWaiter(),acquireQueued()三个方法,而这三个方法正是ReentrantLock加锁的核心方法,接下来我们会对这三个方法进行重点的分析。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //这里acquireQueued()是一步步将线程是否中断的标志传回来的 如果为true 那么代表线程被中断过 需要设置中断标志 交给客户端处理 // 因为LockSupport.park()之后不会对中断进行响应 所以需要一步一步将中断标记传回来 selfInterrupt(); }
tryAcquire()方法
流程图:
tryAcquire()方法是抢占锁的方法,返回ture表示线程抢占到锁了,如果抢占到锁就什么都不做,直接执行同步代码,如果没有抢占到锁就需要将线程的信息保存起来,并且阻塞线程,也就是调用addWaiter()方法和acquireQueued()方法。
tryAcquire()也有公平和非公平锁两种实现。
公平锁实现:
protected final boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取state的值 int c = getState(); if (c == 0) { //如果state为0 代表现在是无锁状态 那么可以抢占锁 //公平锁这里多了一个判断 只有在AQS链表中不存在元素 才去尝试抢占锁 否则就去链表中排队 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程 setExclusiveOwnerThread(current); return true; } } //判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
非公平锁实现:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果state为0 代表现在是无锁状态 非公平锁这里就直接尝试抢占锁 // 公平锁这里多了一个判断 先去看AQS链表中有没有已经在等待的线程 有的话就不会尝试去抢占锁了,非公平锁这里没有这个判断,也就是这里允许插队(不公平) if (compareAndSetState(0, acquires)) { //cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程 setExclusiveOwnerThread(current); return true; } } //判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //重新设置state的值 setState(nextc); return true; } return false; }
接下来分析addWaiter()方法
addWaiter()
addWaiter()方法的作用就是将没有获取到锁的线程封装成一个Node对象,然后存储在AbstractQueuedSynchronizer这个队列同步器中(为了偷懒简称AQS)。我们先看一下Node对象的结构。
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
通过Node对象的结构我们可以看出这是一个双向链表的结构,保存了prev和next引用,thread成员变量用来保存阻塞的线程引用,并且node是有状态的,分别是CANCELLED,SIGNAL,CONDITION,PROPAGATE,其中ReentrantLock涉及到的状态就是SIGNAL(等待被唤醒),CANCELLED(取消)(由于相关性不强,其他的状态在后续文章用到的时候在讲吧)。而AQS又保存了链表的头结点head和尾结点tail,所以实际上AQS存储阻塞线程的数据结构是一 个Node双向链表。addWaiter()方法的作用就是将阻塞线程封装成Node并且将其保存在AQS的链表结构中。
流程图:
源码分析:
private Node addWaiter(Node mode) { //将没有获得锁的线程封装成一个node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //如果AQS尾结点不为null 代表AQS链表已经初始化 尝试将构建好的节点添加到链表的尾部 if (pred != null) { node.prev = pred; //cas替换AQS的尾结点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //没有初始化调用enq()方法 enq(node); return node; }
private Node enq(final Node node) { //自旋 for (;;) { Node t = tail; //尾结点为空 说明AQS链表还没有初始化 那么进行初始化 if (t == null) { // Must initialize //cas 将AQS的head节点 初始化 成功初始化head之后,将尾结点也初始化 //注意 这里我们可以看到head节点是不存储线程信息的 也就是说head节点相当于是一个虚拟节点 if (compareAndSetHead(new Node())) tail = head; } else { //尾结点不为空 那么直接添加到链表的尾部即可 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
接下来分析acquireQueued()
acquireQueued()
acquireQueued()方法的作用就是利用Locksupport.park()方法将AQS链表中存储的线程阻塞起来。
流程图:
源码分析:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //进入自旋 for (;;) { //获取当前节点的前一个节点 final Node p = node.predecessor(); // 如果前一个节点是head 而且再次尝试获取锁成功,将节点从AQS队列中去除 并替换head 同时返回中断标志 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; //注意 只有抢占到锁才会跳出这个for循环 return interrupted; } //去除状态为CANCELLED的节点 并且阻塞线程 线程被阻塞在这 //注意 线程被唤醒之后继续执行这个for循环 尝试抢占锁 没有抢占到的话又会阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //如果失败 将node状态修改为CANCELLED cancelAcquire(node); } }
在acquireQueued()方法中有两个方法比较重要shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法。
shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果节点是SIGNAL状态 不需要处理 直接返回 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //如果节点状态>0 说明节点是取消状态 这种状态的节点需要被清除 用do while循环顺便清除一下前面的连续的、状态为取消的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //正常的情况下 利用cas将前一个节点的状态替换为 SIGNAL状态 也就是-1 //注意 这样队列中节点的状态 除了最后一个都是-1 包括head节点 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() { //挂起当前线程 并且返回中断标志 LockSupport.park(thread) 会调用UNSAFE.park()方法将线程阻塞起来(是一个native方法) LockSupport.park(this); return Thread.interrupted(); }
到这里lock()方法也就分析完了 接下来我们分析unlock()方法
unlock()方法源码分析:
流程图:
reentrantLock的unlock()方法会调用sync的release()方法。
public void unlock() { //每次调用unlock 将state减1 sync.release(1); }
release()方法有两个方法比较重要,tryRelease()方法和unparkSuccessor(),tryRelease()方法计算state的值,看线程是否已经彻底释放锁(这是因为ReentrantLock是支持重入的),如果已经彻底释放锁那么需要调用unparkSuccessor()方法来唤醒线程,否则不需要唤醒线程。
public final boolean release(int arg) { //只有tryRelease返回true 说明已经释放锁 需要将阻塞的线程唤醒 否则不需要唤醒别的线程 if (tryRelease(arg)) { Node h = head; //如果头结点不为空 而且状态不为0 代表同步队列已经初始化 且存在需要唤醒的node //注意 同步队列的头结点相当于是一个虚拟节点 这一点我们可以在构建节点的代码中很清楚的知道 //并且在shouldParkAfterFailedAcquire方法中 会把head节点的状态修改为-1 //如果head的状态为0 那么代表队列中没有需要被唤醒的元素 直接返回true if (h != null && h.waitStatus != 0) //唤醒头结点的下一个节点 unparkSuccessor(h); return true; } return false; }
tryRelease()
protected final boolean tryRelease(int releases) { //减少重入次数 int c = getState() - releases; //如果获取锁的线程不是当前线程 抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果state为0 说明已经彻底释放了锁 返回true if (c == 0) { free = true; //将获取锁的线程设置为null setExclusiveOwnerThread(null); } //重置state的值 setState(c); //如果释放了锁 就返回true 否则返回false return free; }
unparkSuccessor()
private void unparkSuccessor(Node node) { //获取头结点的状态 将头结点状态设置为0 代表现在正在有线程被唤醒 如果head状态为0 就不会进入这个方法了 int ws = node.waitStatus; if (ws < 0) //将头结点状态设置为0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //唤醒头结点的下一个状态不是cancelled的节点 (因为头结点是不存储阻塞线程的) Node s = node.next; //当前节点是null 或者是cancelled状态 if (s == null || s.waitStatus > 0) { s = null; //从aqs链表的尾部开始遍历 找到离头结点最近的 不为空的 状态不是cancelled的节点 赋值给s 这里为什么从尾结点开始遍历而是头结点 应该是因为添加结点的时候是先初始化结点的prev的, 从尾结点开始遍历 不会出现prve没有赋值的情况 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //调用LockSupport.unpark()唤醒指定的线程 LockSupport.unpark(s.thread); }
五:总结
最后总结一下,有以下几点需要注意:
1.ReentrantLock有公平锁和非公平锁两种实现,其实这两种实现的差别只有两点,第一是在lock()方法开始的时候非公平锁会尝试cas抢占锁插一次队, 第二是在tryAcquire()方法发现state为0的时候,非公平锁会抢占一次锁,而公平锁会判断AQS链表中是否存在等待的线程,没有等待的线程才会去抢占锁。
2.AQS存储阻塞线程的数据结构是一个双向链表的结构,而且它是遵循先进先出的,因为它是从头结点的下一个节点开始唤醒,而添加新的节点的时候是添加到链表的尾部,所以AQS同时也是一个队列的数据结构。
3.线程被唤醒之后会继续执行acquireQueued()方法,因为它是阻塞在acquireQueued()方法的for循环中的,唤醒后尝试去获取锁,获取成功就会将节点从AQS中删除并跳出for循环,否则又会继续阻塞。(获取锁失败的原因就是因为有人插队啊。。也就是非公平锁导致的)。
以上就是java锁机制ReentrantLock源码实例分析的详细内容,更多关于java锁机制ReentrantLock的资料请关注自由互联其它相关文章!