当前位置 : 主页 > 编程语言 > java >

java线程安全锁ReentrantReadWriteLock原理分析readLock

来源:互联网 收集:自由互联 发布时间:2023-01-30
目录 前言 ReentrantReadWriteLock的简单使用 readLock源码分析 lock() acquireShared() tryAcquireShared() fullTryAcquireShared() readerShouldBlock() 公平锁实现: 非公平锁实现: doAcquireShared() setHeadAndPropagate() s
目录
  • 前言
  • ReentrantReadWriteLock的简单使用
  • readLock源码分析
    • lock()
    • acquireShared()
    • tryAcquireShared()
    • fullTryAcquireShared()
    • readerShouldBlock()
      • 公平锁实现:
      • 非公平锁实现:
    • doAcquireShared()
      • setHeadAndPropagate()
        • shouldParkAfterFailedAcquire()
          • parkAndCheckInterrupt()
            • unlock()
              • releaseShared()
                • tryReleaseShared()
                  • doReleaseShared()
                    • unparkSuccessor()
                    • 小结

                      前言

                      很多时候,我们为了保证线程安全,会对一段代码加锁,但是加锁就意味着程序效率的下降,所以,我们经常会对锁进行一些优化,例如严格控制加锁的粒度,利用cas来代替加锁等。而今天我们介绍的读写锁,也是对锁的一种优化方案的实现。试想一下,如果我们的线程大部分时候都是读操作,那么读操作与读操作直接有必要互斥吗?答案是没有必要的,只有读写操作,写写操作才需要通过互斥来保证线程安全。今天我们通过ReentrantReadWriteLock来看看读写锁是如何实现的。

                      ReentrantReadWriteLock的简单使用

                      public void test1(){
                              ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
                              new Thread(()->{
                                  Lock readLock = readWriteLock.readLock();
                                  readLock.lock();
                                  try{
                                      //读操作
                                  }catch (Exception e){
                                      e.printStackTrace();
                                  }finally {
                                      readLock.unlock();
                                  }
                              }).start();
                              new Thread(()->{
                                  Lock writeLock = readWriteLock.writeLock();
                                  writeLock.lock();
                                  try{
                                      //写操作
                                  }catch (Exception e){
                                      e.printStackTrace();
                                  }finally {
                                      writeLock.unlock();
                                  }
                              }).start();
                          }
                      

                      readLock源码分析

                      首先是lock()方法。

                      lock()

                      readLock是通过共享锁来实现的。lock()方法会调用acquireShared()方法。所以我们直接分析acquireShared()方法。

                      public void lock() {
                           sync.acquireShared(1);
                      }
                      

                      acquireShared()

                      acquireShared()主要的逻辑就在tryAcquireShared()方法和doAcquireShared()方法,首先调用tryAcquireShared()方法,如果返回的值小于0,那么就需要调用doAcquireShared()方法进行阻塞,否则就会直接去执行业务代码。接下来我们重点分析tryAcquireShared()方法和doAcquireShared()方法。

                      if (tryAcquireShared(arg) < 0)
                                  doAcquireShared(arg);
                      

                      tryAcquireShared()

                      这个方法的逻辑其实比较简单。可以分为三个部分来看

                      1.首先通过state(state的高16位表示读锁的获取次数,低16位表示写锁的次数)获取到写锁的count,会判断写锁的count是否大于0(大于0意味着有线程获取了写锁)并且获取写锁的线程不是当前线程,那么返回-1,需要执行doAcquireShared()方法去阻塞当前线程。

                      2.通过cas修改读锁的count(保证线程安全)。如果当前线程是第一个获取读锁的线程,那么修改firstReader(第一个获取读锁的线程),firstReaderHoldCount(第一个获取读锁的线程的重入次数),否则通过当前线程获取读锁的次数构建成一个HoldCounter对象,并且放入到readHolds中(readHolds是一个ThreadLocal),同时维护一个cachedHoldCounter的缓存(当前线程获取读锁的重入次数的缓存)。

                      3.cas失败,读锁的数量达到最大值或者readerShouldBlock()方法判断需要等待的话,就调用fullTryAcquireShared()方法。

                              protected final int tryAcquireShared(int unused) {
                                  Thread current = Thread.currentThread();
                                  int c = getState();
                                  //获取写锁的数量,如果有线程获取了写锁,
                                  //并且获取写锁的线程不是当前线程 直接返回-1
                                  if (exclusiveCount(c) != 0 &&
                                      getExclusiveOwnerThread() != current)
                                      return -1;
                                  //获取读锁的数量    
                                  int r = sharedCount(c);
                                  //readerShouldBlock()判断是否需要等待,如果不需要等待并且当前获取读锁的数量小于最大值的限制,cas也成功替换了读锁数量
                                  if (!readerShouldBlock() &&
                                      r < MAX_COUNT &&
                                      compareAndSetState(c, c + SHARED_UNIT)) {
                                      //如果当前读锁数量为0 那么当前线程就是第一个获取读锁的线程
                                      if (r == 0) {
                                          //将当前线程赋值给firstReader 
                                          firstReader = current;
                                          firstReaderHoldCount = 1;
                                      } else if (firstReader == current) {
                                          //如果当前线程是第一个获取读锁的线程,那么只需要将firstReaderHoldCount +1 
                                          //从这里我们就可以知道读锁是支持重入的
                                          firstReaderHoldCount++;
                                      } else {
                                          //HoldCounter用来保存当前线程获取读锁的次数,因为读锁是支持重入的,
                                          //readHolds是一个ThreadLocal,用来保存当前线程的HoldCounter
                                          HoldCounter rh = cachedHoldCounter;
                                          if (rh == null || rh.tid != getThreadId(current))
                                              cachedHoldCounter = rh = readHolds.get();
                                          else if (rh.count == 0)
                                              readHolds.set(rh);
                                          rh.count++;
                                      }
                                      return 1;
                                  }
                                  //readerShouldBlock()返回true cas失败 或者已经达到最大数量
                                  return fullTryAcquireShared(current);
                              }
                      

                      fullTryAcquireShared()

                      tryAcquireShared()在cas失败的话,读锁数量达到最大值或者readerShouldBlock()方法判断需要等待的话就会进入fullTryAcquireShared()方法,而fullTryAcquireShared()方法方法就会分别针对这三种情况进行处理。

                      1.首先依旧会先判断是否有线程获取写锁,如果有,直接返回-1.

                      2.如果readerShouldBlock()方法返回true,如果返回true,表示应该阻塞等待,就将当前线程获取读锁的数量置为0,并且返回-1(返回-1就会调用doAcquireShared()方法去阻塞线程)。

                      3.如果读锁的数量已经达到最大值,那么就直接抛出异常

                      4.如果是因为cas失败,那么再进行cas一次,并且修改firstReader(第一个获取读锁的线程),firstReaderHoldCount(第一个获取读锁的线程的重入次数),cachedHoldCounter(当前线程获取读锁的重入次数的缓存)等变量的值。注意cas失败不会跳出for循环,所以这里是会自旋重试的。

                      final int fullTryAcquireShared(Thread current) {
                                  HoldCounter rh = null;
                                  for (;;) {
                                      int c = getState();
                                      if (exclusiveCount(c) != 0) {
                                          if (getExclusiveOwnerThread() != current)
                                              return -1;
                                          // else we hold the exclusive lock; blocking here
                                          // would cause deadlock.
                                      } else if (readerShouldBlock()) {
                                          // Make sure we're not acquiring read lock reentrantly
                                          if (firstReader == current) {
                                              // assert firstReaderHoldCount > 0;
                                          } else {
                                              if (rh == null) {
                                                  rh = cachedHoldCounter;
                                                  if (rh == null || rh.tid != getThreadId(current)) {
                                                      rh = readHolds.get();
                                                      if (rh.count == 0)
                                                          readHolds.remove();
                                                  }
                                              }
                                              if (rh.count == 0)
                                                  return -1;
                                          }
                                      }
                                      if (sharedCount(c) == MAX_COUNT)
                                          throw new Error("Maximum lock count exceeded");
                                      if (compareAndSetState(c, c + SHARED_UNIT)) {
                                          if (sharedCount(c) == 0) {
                                              firstReader = current;
                                              firstReaderHoldCount = 1;
                                          } else if (firstReader == current) {
                                              firstReaderHoldCount++;
                                          } else {
                                              if (rh == null)
                                                  rh = cachedHoldCounter;
                                              if (rh == null || rh.tid != getThreadId(current))
                                                  rh = readHolds.get();
                                              else if (rh.count == 0)
                                                  readHolds.set(rh);
                                              rh.count++;
                                              cachedHoldCounter = rh; // cache for release
                                          }
                                          return 1;
                                      }
                                  }
                              }
                      

                      接下来我们需要看一下readerShouldBlock()方法,readerShouldBlock()方法如果返回true那么代表线程获取读锁的时候需要阻塞,那我们就分析一下readerShouldBlock()方法,看什么时候获取读锁需要阻塞线程。

                      readerShouldBlock()

                      readerShouldBlock()方法有两个实现,一个是公平锁的实现,一个是非公平锁的实现。

                      公平锁实现:

                      判断阻塞队列中是否有节点,并且第一个节点不是当前线程。那么作为公平锁,这就代表需要等待。

                      final boolean readerShouldBlock() {
                           return hasQueuedPredecessors();
                      }
                      public final boolean hasQueuedPredecessors() {
                              // The correctness of this depends on head being initialized
                              // before tail and on head.next being accurate if the current
                              // thread is first in queue.
                              Node t = tail; // Read fields in reverse initialization order
                              Node h = head;
                              Node s;
                              return h != t &&
                                  ((s = h.next) == null || s.thread != Thread.currentThread());
                          }
                      

                      非公平锁实现:

                      判断阻塞队列中的第一个节点是否不是共享节点,如果不是,那么就需要等待,否则就代表可以插队,也就不需要阻塞。

                      final boolean readerShouldBlock() {
                           return apparentlyFirstQueuedIsExclusive();
                      }
                      final boolean apparentlyFirstQueuedIsExclusive() {
                              Node h, s;
                              return (h = head) != null &&
                                  (s = h.next)  != null &&
                                  !s.isShared()         &&
                                  s.thread != null;
                      }
                      

                      那么接下来我们分析doAcquireShared()方法,看doAcquireShared()方法是如何阻塞线程的。

                      doAcquireShared()

                      1.调用addWaiter()方法将当前线程封装成Node并且加入到阻塞队列中。

                      2.如果发现当前节点的前节点是head节点(代表当前节点是队列中第一个节点,因为head节点始终是空的,所以head的next就是实际上的第一个节点),那么就再次调用tryAcquireShared()方法尝试获取锁。如果调用tryAcquireShared()方法获取锁成功,那么就唤醒阻塞队列中所有的共享节点。

                      3.将阻塞队列中前一个节点的状态修改为SIGNAL,并且调用LockSupport.park()方法阻塞当前线程。

                      private void doAcquireShared(int arg) {
                              //将线程封装成状态为SHARED的Node节点,并且加入到阻塞队列中
                              final Node node = addWaiter(Node.SHARED);
                              boolean failed = true;
                              try {
                                  boolean interrupted = false;
                                  for (;;) {
                                      //获取当前节点的前一个节点
                                      final Node p = node.predecessor();
                                      if (p == head) {
                                          //再次调用tryAcquireShared()方法尝试获取锁
                                          int r = tryAcquireShared(arg);
                                          if (r >= 0) {
                                              //获取锁成功 因为是共享锁,那么需要唤醒所有的共享节点
                                              setHeadAndPropagate(node, r);
                                              p.next = null; // help GC
                                              if (interrupted)
                                                  selfInterrupt();
                                              failed = false;
                                              return;
                                          }
                                      }
                                      //将前节点设置为SIGNAL状态 并且调用LockSupport.park()阻塞当前线程
                                      if (shouldParkAfterFailedAcquire(p, node) &&
                                          parkAndCheckInterrupt())
                                          interrupted = true;
                                  }
                              } finally {
                                  if (failed)
                                      //如果失败 那么就将节点状态修改为
                                      cancelAcquire(node);
                              }
                          }
                      

                      接下来我们重点分析setHeadAndPropagate(),shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()三个方法。

                      注:如果你看过我的另一篇文章# CountDownLatch源码分析,那么相信你对这三个方法已经很熟悉了,因为CountDownLatch也是通过AQS共享锁来实现的。

                      setHeadAndPropagate()

                      1.重新设置头节点,并且如果阻塞队列中的下一个节点是共享节点,那么就需要调用doReleaseShared()方法尝试去唤醒阻塞队列中其他的共享节点。doReleaseShared()方法我们在分析unlock()的时候再详细分析。

                      private void setHeadAndPropagate(Node node, int propagate) {
                              Node h = head; // Record old head for check below
                              setHead(node);
                              if (propagate > 0 || h == null || h.waitStatus < 0 ||
                                  (h = head) == null || h.waitStatus < 0) {
                                  Node s = node.next;
                                  if (s == null || s.isShared())
                                      doReleaseShared();
                              }
                      }
                      

                      shouldParkAfterFailedAcquire()

                      shouldParkAfterFailedAcquire()方法的作用就是将前面一个节点的状态修改为SIGNAL状态,并且将CANCELLED状态的节点去除(waitStatus大于0,只能是CANCELLED状态)。

                      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                              int ws = pred.waitStatus;
                              if (ws == Node.SIGNAL)
                                  return true;
                              //如果前节点的状态是CANCELLED状态,那么尝试去除阻塞队列中的其他的CANCELLED状态的节点(注意这里只会从后往前遍历吗,去除连续的CANCELLED状态的节点)
                              if (ws > 0) {
                                  do {
                                      node.prev = pred = pred.prev;
                                  } while (pred.waitStatus > 0);
                                  pred.next = node;
                              } else {
                                  //cas修改前一个节点的状态为SIGNAL
                                  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
                              }
                              return false;
                          }
                      

                      parkAndCheckInterrupt()

                      parkAndCheckInterrupt()方法会调用LockSupport.park()方法,是真正阻塞线程的地方。线程被唤醒后,会从这个地方继续执行代码。

                      private final boolean parkAndCheckInterrupt() {
                              LockSupport.park(this);
                              return Thread.interrupted();
                          }
                      

                      至此readLock的lock()方法,我们就分析完了。接下来我们继续分析readLock的unlock()方法。

                      unlock()

                      unlock()方法的逻辑都是在releaseShared()方法中完成的,所以我们具体看releaseShared()方法。

                      public void unlock() {
                          sync.releaseShared(1);
                      }
                      

                      releaseShared()

                      releaseShared()方法先调用tryReleaseShared()方法尝试是否锁,如果返回true,那么调用doReleaseShared()方法释放锁。

                      public final boolean releaseShared(int arg) {
                              if (tryReleaseShared(arg)) {
                                  doReleaseShared();
                                  return true;
                              }
                              return false;
                      }
                      

                      tryReleaseShared()

                      tryReleaseShared()方法的逻辑主要分为以下几步。

                      1.首先判断当前线程是否是第一个获取读锁的线程,如果是,那么就维护firstReader和firstReaderHoldCount变量,否则读取缓存cachedHoldCounter中的值,如果缓存的不是当前线程的值,那么就需要从readHolds中获取到当前线程的HoldCounter对象(保存了当前线程的重入次数),将当前线程的重入次数-1。

                      2.通过cas和自旋将获取总的读锁的数量-1,减完之后当前占有读锁的数量为0,那么就返回true。

                      protected final boolean tryReleaseShared(int unused) {
                                  Thread current = Thread.currentThread();
                                  if (firstReader == current) {
                                      // assert firstReaderHoldCount > 0;
                                      if (firstReaderHoldCount == 1)
                                          firstReader = null;
                                      else
                                          firstReaderHoldCount--;
                                  } else {
                                      HoldCounter rh = cachedHoldCounter;
                                      if (rh == null || rh.tid != getThreadId(current))
                                          rh = readHolds.get();
                                      int count = rh.count;
                                      if (count <= 1) {
                                          readHolds.remove();
                                          if (count <= 0)
                                              throw unmatchedUnlockException();
                                      }
                                      --rh.count;
                                  }
                                  for (;;) {
                                      int c = getState();
                                      int nextc = c - SHARED_UNIT;
                                      if (compareAndSetState(c, nextc))
                                          return nextc == 0;
                                  }
                              }
                      

                      tryReleaseShared()方法返回true之后,需要调用doReleaseShared()方法唤醒被阻塞的线程。

                      doReleaseShared()

                      doReleaseShared()方法需要唤醒阻塞队列中所有的共享节点,通过自旋和unparkSuccessor()方法不断尝试唤醒阻塞队列中的节点。

                          private void doReleaseShared() {
                              for (;;) {
                                  Node h = head;
                                  if (h != null && h != tail) {
                                      int ws = h.waitStatus;
                                      //如果头结点的状态是SIGNAL
                                      if (ws == Node.SIGNAL) {
                                          //cas修改节点的状态为0 失败的话继续自旋
                                          // 成功的话调用unparkSuccessor唤醒头结点的下一个正常节点
                                          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                                              continue;            // loop to recheck cases
                                          unparkSuccessor(h);
                                      }
                                      //如果节点状态为0 那么cas替换为PROPAGATE 失败进入下一次自旋
                                      else if (ws == 0 &&
                                               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                                          continue;                // loop on failed CAS
                                  }
                                  if (h == head)                   // loop if head changed
                                      break;
                              }
                          }
                      

                      unparkSuccessor()

                      unparkSuccessor()方法的作用是唤醒头节点后第一个不为null且状态不为cancelled的节点。通过LockSupport.unpark()方法唤醒阻塞的线程。

                      private void unparkSuccessor(Node node) {
                              //获取头结点的状态 将头结点状态设置为0 代表现在正在有线程被唤醒 如果head状态为0 就不会进入这个方法了
                              int ws = node.waitStatus;
                              if (ws < 0)
                                  //将头结点状态设置为0
                                  compareAndSetWaitStatus(node, ws, 0);
                      	//唤醒头结点的下一个状态不是cancelled的节点 (因为头结点是不存储阻塞线程的)
                              Node s = node.next;
                      	//当前节点是null 或者是cancelled状态
                              if (s == null || s.waitStatus > 0) {
                                  s = null;
                      	 //从aqs链表的尾部开始遍历 找到离头结点最近的 不为空的 状态不是cancelled的节点 赋值给s 
                               //这里为什么从尾结点开始遍历而不是头结点 是因为添加结点的时候是先初始化结点的prev的, 从尾结点开始遍历 不会出现prve没有赋值的情况 
                               //如果从头结点进行遍历 next为null 并不能保证链表遍历完了
                                  for (Node t = tail; t != null && t != node; t = t.prev)
                                      if (t.waitStatus <= 0)
                                          s = t;
                              }
                              if (s != null)
                      	    //调用LockSupport.unpark()唤醒指定的线程
                                  LockSupport.unpark(s.thread);
                          }	
                      

                      至此readLock的unlock()方法也就分析完了。

                      小结

                      1.ReentrantReadWriteLock的readLock是支持重入的。

                      2.ReentrantReadWriteLock的readLock是通过AQS的共享锁来实现的。

                      3.readLock中提供了firstReader,firstReaderHoldCount,cachedHoldCounter等变量来提供效率,当前线程的读锁重入次数一般都是存放在readHolds中(readHolds是一个TheadLocal),只有一个获取读锁的线程是通过firstReader,firstReaderHoldCount两个变量来维护的,而cachedHoldCounter则是一个缓存,这样通过这三个变量就可以减少从readHolds获取值的次数。(因为大部分情况下并发不高或许只有一个线程获取读锁)。

                      4.读锁与读锁之间不是互斥的,读锁和写锁是互斥的,但是如果获取写锁的线程是当前线程,那么当前线程是可以获取读锁的。

                      以上就是java线程安全锁ReentrantReadWriteLock原理分析readLock的详细内容,更多关于java锁ReentrantReadWriteLock readLock的资料请关注自由互联其它相关文章!

                      网友评论