当前位置 : 主页 > 编程语言 > java >

java锁机制ReentrantLock源码实例分析

来源:互联网 收集:自由互联 发布时间:2023-01-30
目录 一:简述 二:ReentrantLock类图 三:流程简图 四:源码分析 lock()源码分析: 非公平实现: 公平锁实现: tryAcquire()方法 公平锁实现: 非公平锁实现: addWaiter() acquireQueued() shouldPa
目录
  • 一:简述
  • 二:ReentrantLock类图
  • 三:流程简图
  • 四:源码分析
    • lock()源码分析:
      • 非公平实现:
      • 公平锁实现:
    • tryAcquire()方法
      • 公平锁实现:
      • 非公平锁实现:
    • addWaiter()
      • acquireQueued()
        • shouldParkAfterFailedAcquire()
          • parkAndCheckInterrupt()
            • unlock()方法源码分析:
              • tryRelease()
                • unparkSuccessor()
                • 五:总结

                  一:简述

                  ReentrantLock是java.util.concurrent包中提供的一种锁机制,它是一种可重入,互斥的锁,ReentrantLock还同时支持公平和非公平两种实现。本篇文章基于java8,对并发工具ReentrantLock的实现原理进行分析。

                  二:ReentrantLock类图

                  三:流程简图

                  四:源码分析

                  我们以lock()方法和unlock()方法为入口对ReentrantLock的源码进行分析。

                  lock()源码分析:

                  ReentrantLock在构造构造方法创建对象的时候会根据构造函数传递的fair参数 创建不同的Sync对象(默认是非公平锁的实现),reentrantLock.lock()会调用sync.lock()。在Sync类中的lock()方法是一个抽象方法,具体实现分别在FairSync(公平)和NonfairSync(非公平)中。

                      public ReentrantLock() {
                          //默认是非公平锁
                          sync = new NonfairSync();
                      }
                      public ReentrantLock(boolean fair) {
                          sync = fair ? new FairSync() : new NonfairSync();
                      }
                  
                      public void lock() {
                          sync.lock();
                      }
                  

                  非公平实现:

                         final void lock() {
                              // state表示锁重入次数 0代表无锁状态
                              //尝试抢占锁一次  利用cas替换state标志 替换成功代表抢占锁成功
                              if (compareAndSetState(0, 1))
                  		//抢占成功将抢占到锁的线程设置为当前线程
                                  setExclusiveOwnerThread(Thread.currentThread());
                              else
                                  acquire(1);
                          }
                  

                  公平锁实现:

                  	final void lock() {
                              acquire(1);
                          }
                  

                  可以看出非公平锁在lock()方法开始会尝试cas抢占一次锁,也就是在这里会插队一次。然后都会调用acquire()方法。acquire()方法中调用了三个方法,tryAcquire(),addWaiter(),acquireQueued()三个方法,而这三个方法正是ReentrantLock加锁的核心方法,接下来我们会对这三个方法进行重点的分析。

                  public final void acquire(int arg) {
                          if (!tryAcquire(arg) &&
                              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))		
                  	//这里acquireQueued()是一步步将线程是否中断的标志传回来的 如果为true 那么代表线程被中断过 需要设置中断标志 交给客户端处理
                  	// 因为LockSupport.park()之后不会对中断进行响应 所以需要一步一步将中断标记传回来
                              selfInterrupt();
                          }
                  

                  tryAcquire()方法

                  流程图:

                  tryAcquire()方法是抢占锁的方法,返回ture表示线程抢占到锁了,如果抢占到锁就什么都不做,直接执行同步代码,如果没有抢占到锁就需要将线程的信息保存起来,并且阻塞线程,也就是调用addWaiter()方法和acquireQueued()方法。

                  tryAcquire()也有公平和非公平锁两种实现。

                  公平锁实现:

                  	protected final boolean tryAcquire(int acquires) {
                  	    //获取当前线程
                              final Thread current = Thread.currentThread();
                  	    //获取state的值
                              int c = getState();
                              if (c == 0) {
                  	    //如果state为0 代表现在是无锁状态 那么可以抢占锁
                  	    //公平锁这里多了一个判断 只有在AQS链表中不存在元素 才去尝试抢占锁 否则就去链表中排队
                                  if (!hasQueuedPredecessors() &&
                                      compareAndSetState(0, acquires)) {
                  		//cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程
                                      setExclusiveOwnerThread(current);
                                      return true;
                                  }
                              }
                  	//判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值)
                              else if (current == getExclusiveOwnerThread()) {
                                  int nextc = c + acquires;
                                  if (nextc < 0)
                                      throw new Error("Maximum lock count exceeded");
                                  setState(nextc);
                                  return true;
                              }
                              return false;
                          }
                      }
                  

                  非公平锁实现:

                  protected final boolean tryAcquire(int acquires) {
                              return nonfairTryAcquire(acquires);
                      }
                  
                  	final boolean nonfairTryAcquire(int acquires) {
                              final Thread current = Thread.currentThread();
                              int c = getState();
                              if (c == 0) {
                  		//如果state为0 代表现在是无锁状态 非公平锁这里就直接尝试抢占锁
                  		// 公平锁这里多了一个判断 先去看AQS链表中有没有已经在等待的线程 有的话就不会尝试去抢占锁了,非公平锁这里没有这个判断,也就是这里允许插队(不公平)
                                  if (compareAndSetState(0, acquires)) {	
                  		//cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程
                                      setExclusiveOwnerThread(current);
                                      return true;
                                  }
                              }
                  	   //判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值)
                              else if (current == getExclusiveOwnerThread()) {
                                  int nextc = c + acquires;
                                  if (nextc < 0) // overflow
                                      throw new Error("Maximum lock count exceeded");
                                  //重新设置state的值    
                                  setState(nextc);
                                  return true;
                              }
                              return false;
                          }
                  

                  接下来分析addWaiter()方法

                  addWaiter()

                  addWaiter()方法的作用就是将没有获取到锁的线程封装成一个Node对象,然后存储在AbstractQueuedSynchronizer这个队列同步器中(为了偷懒简称AQS)。我们先看一下Node对象的结构。

                      static final class Node {
                          static final Node SHARED = new Node();
                          static final Node EXCLUSIVE = null;
                          static final int CANCELLED =  1;
                          static final int SIGNAL    = -1;
                          static final int CONDITION = -2;
                          static final int PROPAGATE = -3;
                          volatile int waitStatus;
                          volatile Node prev;
                          volatile Node next;
                          volatile Thread thread;
                          Node nextWaiter;
                          final boolean isShared() {
                              return nextWaiter == SHARED;
                          }
                          final Node predecessor() throws NullPointerException {
                              Node p = prev;
                              if (p == null)
                                  throw new NullPointerException();
                              else
                                  return p;
                          }
                          Node() {    // Used to establish initial head or SHARED marker
                          }
                          Node(Thread thread, Node mode) {     // Used by addWaiter
                              this.nextWaiter = mode;
                              this.thread = thread;
                          }
                          Node(Thread thread, int waitStatus) { // Used by Condition
                              this.waitStatus = waitStatus;
                              this.thread = thread;
                          }
                      }
                  

                  通过Node对象的结构我们可以看出这是一个双向链表的结构,保存了prev和next引用,thread成员变量用来保存阻塞的线程引用,并且node是有状态的,分别是CANCELLED,SIGNAL,CONDITION,PROPAGATE,其中ReentrantLock涉及到的状态就是SIGNAL(等待被唤醒),CANCELLED(取消)(由于相关性不强,其他的状态在后续文章用到的时候在讲吧)。而AQS又保存了链表的头结点head和尾结点tail,所以实际上AQS存储阻塞线程的数据结构是一 个Node双向链表。addWaiter()方法的作用就是将阻塞线程封装成Node并且将其保存在AQS的链表结构中。

                  流程图:

                  源码分析:

                  private Node addWaiter(Node mode) {
                  	//将没有获得锁的线程封装成一个node 
                          Node node = new Node(Thread.currentThread(), mode);
                          // Try the fast path of enq; backup to full enq on failure
                          Node pred = tail;
                  	//如果AQS尾结点不为null 代表AQS链表已经初始化 尝试将构建好的节点添加到链表的尾部
                          if (pred != null) {
                              node.prev = pred;
                  	    //cas替换AQS的尾结点 
                              if (compareAndSetTail(pred, node)) {
                                  pred.next = node;
                                  return node;
                              }
                          }
                  	//没有初始化调用enq()方法
                          enq(node);
                          return node;
                      }
                  
                  private Node enq(final Node node) {
                  	    //自旋
                          for (;;) {
                              Node t = tail;
                  	    //尾结点为空 说明AQS链表还没有初始化 那么进行初始化
                              if (t == null) { // Must initialize
                  	    //cas 将AQS的head节点 初始化 成功初始化head之后,将尾结点也初始化
                              //注意 这里我们可以看到head节点是不存储线程信息的 也就是说head节点相当于是一个虚拟节点
                                  if (compareAndSetHead(new Node()))
                                      tail = head;
                              } else {
                  		//尾结点不为空 那么直接添加到链表的尾部即可
                                  node.prev = t;
                                  if (compareAndSetTail(t, node)) {
                                      t.next = node;
                                      return t;
                                  }
                              }
                          }
                      }
                  

                  接下来分析acquireQueued()

                  acquireQueued()

                  acquireQueued()方法的作用就是利用Locksupport.park()方法将AQS链表中存储的线程阻塞起来。

                  流程图:

                  源码分析:

                  final boolean acquireQueued(final Node node, int arg) {
                          boolean failed = true;
                          try {
                              boolean interrupted = false;
                  	    //进入自旋
                              for (;;) {
                  		//获取当前节点的前一个节点
                                  final Node p = node.predecessor();
                  		// 如果前一个节点是head 而且再次尝试获取锁成功,将节点从AQS队列中去除 并替换head 同时返回中断标志
                                  if (p == head && tryAcquire(arg)) {
                                      setHead(node);
                                      p.next = null; // help GC
                                      failed = false;
                                      //注意 只有抢占到锁才会跳出这个for循环
                                      return interrupted;
                                  }
                                  //去除状态为CANCELLED的节点 并且阻塞线程 线程被阻塞在这
                                  //注意 线程被唤醒之后继续执行这个for循环 尝试抢占锁 没有抢占到的话又会阻塞
                                  if (shouldParkAfterFailedAcquire(p, node) &&
                                      parkAndCheckInterrupt())
                                      interrupted = true;
                              }
                          } finally {
                              if (failed)
                                  //如果失败 将node状态修改为CANCELLED
                                  cancelAcquire(node);
                          }
                      }
                  

                  在acquireQueued()方法中有两个方法比较重要shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法。

                  shouldParkAfterFailedAcquire()

                  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                          int ws = pred.waitStatus;
                          if (ws == Node.SIGNAL)
                  	    //如果节点是SIGNAL状态 不需要处理 直接返回
                              return true;
                          if (ws > 0) {
                              /*
                               * Predecessor was cancelled. Skip over predecessors and
                               * indicate retry.
                               */
                  	   //如果节点状态>0 说明节点是取消状态 这种状态的节点需要被清除 用do while循环顺便清除一下前面的连续的、状态为取消的节点
                              do {
                                  node.prev = pred = pred.prev;
                              } while (pred.waitStatus > 0);
                              pred.next = node;
                          } else {
                              /*
                               * waitStatus must be 0 or PROPAGATE.  Indicate that we
                               * need a signal, but don't park yet.  Caller will need to
                               * retry to make sure it cannot acquire before parking.
                               */
                  	    //正常的情况下 利用cas将前一个节点的状态替换为 SIGNAL状态 也就是-1
                  	    //注意 这样队列中节点的状态 除了最后一个都是-1 包括head节点
                              compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
                          }
                          return false;
                      }
                  

                  parkAndCheckInterrupt()

                  private final boolean parkAndCheckInterrupt() {
                    //挂起当前线程 并且返回中断标志  LockSupport.park(thread) 会调用UNSAFE.park()方法将线程阻塞起来(是一个native方法)
                          LockSupport.park(this);
                          return Thread.interrupted();
                      }
                  

                  到这里lock()方法也就分析完了 接下来我们分析unlock()方法

                  unlock()方法源码分析:

                  流程图:

                  reentrantLock的unlock()方法会调用sync的release()方法。

                  public void unlock() {
                  	    //每次调用unlock 将state减1
                          sync.release(1);
                  }
                  

                  release()方法有两个方法比较重要,tryRelease()方法和unparkSuccessor(),tryRelease()方法计算state的值,看线程是否已经彻底释放锁(这是因为ReentrantLock是支持重入的),如果已经彻底释放锁那么需要调用unparkSuccessor()方法来唤醒线程,否则不需要唤醒线程。

                  public final boolean release(int arg) {
                  	//只有tryRelease返回true 说明已经释放锁 需要将阻塞的线程唤醒 否则不需要唤醒别的线程
                          if (tryRelease(arg)) {
                              Node h = head;
                  	//如果头结点不为空 而且状态不为0 代表同步队列已经初始化 且存在需要唤醒的node
                  	//注意 同步队列的头结点相当于是一个虚拟节点  这一点我们可以在构建节点的代码中很清楚的知道
                  	//并且在shouldParkAfterFailedAcquire方法中 会把head节点的状态修改为-1
                  	//如果head的状态为0 那么代表队列中没有需要被唤醒的元素 直接返回true
                              if (h != null && h.waitStatus != 0)
                  		//唤醒头结点的下一个节点
                                  unparkSuccessor(h);
                              return true;
                          }
                          return false;
                      }
                  

                  tryRelease()

                  protected final boolean tryRelease(int releases) {
                  	     //减少重入次数
                              int c = getState() - releases;
                  	    //如果获取锁的线程不是当前线程 抛出异常
                              if (Thread.currentThread() != getExclusiveOwnerThread())
                                  throw new IllegalMonitorStateException();
                              boolean free = false;
                  	    //如果state为0 说明已经彻底释放了锁 返回true
                              if (c == 0) {
                                  free = true;
                  		//将获取锁的线程设置为null
                                  setExclusiveOwnerThread(null);
                              }
                  	    //重置state的值
                              setState(c);
                  	    //如果释放了锁 就返回true 否则返回false
                              return free;
                          }
                  

                  unparkSuccessor()

                  private void unparkSuccessor(Node node) {
                          //获取头结点的状态 将头结点状态设置为0 代表现在正在有线程被唤醒 如果head状态为0 就不会进入这个方法了
                          int ws = node.waitStatus;
                          if (ws < 0)
                              //将头结点状态设置为0
                              compareAndSetWaitStatus(node, ws, 0);
                          /*
                           * Thread to unpark is held in successor, which is normally
                           * just the next node.  But if cancelled or apparently null,
                           * traverse backwards from tail to find the actual
                           * non-cancelled successor.
                           */
                  	//唤醒头结点的下一个状态不是cancelled的节点 (因为头结点是不存储阻塞线程的)
                          Node s = node.next;
                  	//当前节点是null 或者是cancelled状态
                          if (s == null || s.waitStatus > 0) {
                              s = null;
                  	 //从aqs链表的尾部开始遍历 找到离头结点最近的 不为空的 状态不是cancelled的节点 赋值给s 这里为什么从尾结点开始遍历而是头结点 应该是因为添加结点的时候是先初始化结点的prev的, 从尾结点开始遍历 不会出现prve没有赋值的情况 
                              for (Node t = tail; t != null && t != node; t = t.prev)
                                  if (t.waitStatus <= 0)
                                      s = t;
                          }
                          if (s != null)
                  	    //调用LockSupport.unpark()唤醒指定的线程
                              LockSupport.unpark(s.thread);
                      }	
                  

                  五:总结

                  最后总结一下,有以下几点需要注意:

                  1.ReentrantLock有公平锁和非公平锁两种实现,其实这两种实现的差别只有两点,第一是在lock()方法开始的时候非公平锁会尝试cas抢占锁插一次队, 第二是在tryAcquire()方法发现state为0的时候,非公平锁会抢占一次锁,而公平锁会判断AQS链表中是否存在等待的线程,没有等待的线程才会去抢占锁。

                  2.AQS存储阻塞线程的数据结构是一个双向链表的结构,而且它是遵循先进先出的,因为它是从头结点的下一个节点开始唤醒,而添加新的节点的时候是添加到链表的尾部,所以AQS同时也是一个队列的数据结构。

                  3.线程被唤醒之后会继续执行acquireQueued()方法,因为它是阻塞在acquireQueued()方法的for循环中的,唤醒后尝试去获取锁,获取成功就会将节点从AQS中删除并跳出for循环,否则又会继续阻塞。(获取锁失败的原因就是因为有人插队啊。。也就是非公平锁导致的)。

                  以上就是java锁机制ReentrantLock源码实例分析的详细内容,更多关于java锁机制ReentrantLock的资料请关注自由互联其它相关文章!

                  上一篇:java并发包工具CountDownLatch源码分析
                  下一篇:没有了
                  网友评论