前言:哇,这一章是真的费劲,以前是知其然,这次探索了如此多的同步组件的源码,终于是知其所以然了。
下一步也能开发自己的同步组件了,嘎嘎。
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
5.1
显示的获取与释放锁。
Lock接口的三大特性:
(1)非阻塞的获取锁 tryLock(); 获取了锁返回true。没有获取到就返回false。内部使用的是同步器Sync.tryAcquired(1)
(2)能中断的获取锁 获取锁的线程会相应中断。
(3)超时锁 可以在指定时间内尝试获取锁,在这个时间内是阻塞状态。
//获取锁,尝试tryAcquired(1)
lock.lock();
try {
//业务
} finally {
//释放同步状态
lock.unlock();
}
不要把获取锁的过程放到try中,因为在获取锁的时候出现了异常,可能会导致锁无故释放。也就是说,只有真实的获取了锁以后,再去调用lock.unlock()释放同步锁。
5.2
队列同步器AbstractQueuedSynchronizer,简称同步器。使用的是int类型的成员变量来表示同步状态,内部维护了一个先进先出FIFO的同步队列。
修改同步状态的三个方法。 getState()、setState()、compareAndSetState(int expect,int update)
继承同步器AQS。
同步器面向的锁的实现者。
同步器已经完成了方法的骨架,锁的开发者需要完成骨架中定义的模版方法,一般try开头的方法需要被开发者重写。
5.2.1
AQS同步器有两种模式,独占式与共享式。
同步器提供的模版方法:(1)独占式获取与释放同步状态方法(2)共享式..(3)查询同步队列中等待的线程
5.2.2
同步队列遵循FIFO,首节点是获取同步状态成功的节点,头节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
调用同步器的acquired(int arg)方法可以获取同步状态。该方法对中断不敏感,即该线程在获取同步状态失败后,构造节点并加入了同步队列后,若进入阻塞状态,那么中断该线程并不会让该节点从同步队列中移除。
接下来是AbstractQueuedSynchronizer源码分析,大佬早就会了:
//同步器为独占式提供的acquried()方法public final void acquire(int arg) {
//tryAcquire方法由锁的开发者重写。若返回false,说明线程获取锁失败。那么进入自旋
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//线程自旋获取同步状态
//acquireQueued方法返回当前线程是否被中断过,只是记录,不抛异常。
selfInterrupt();
}
//该方法就是将当前线程构造成Node对象,并加入到AQS维护的FIFO队列的末尾
private Node addWaiter(Node mode) {
//构造节点
Node node = new Node(Thread.currentThread(), mode);
//尝试快速的加入队列尾部
Node pred = tail;
if (pred != null) {
//把当前节点的前驱节点设置为之前的尾部节点
node.prev = pred;
//使用CAS保证这个时候没有其它的节点加入到尾巴节点。
if (compareAndSetTail(pred, node)) {
//设置之前尾巴节点的后继节点是当前的节点
pred.next = node;
//返回新构造的当前线程的节点
return node;
}
}
//快速加入队列失败,说明这个时候队列还没初始化呢。
//所以就先创建一个Node对象赋给head与tail
enq(node);//方法略
return node;
}
//该方法是:节点自旋的获取同步状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//找到当前节点的前驱节点
final Node p = node.predecessor();
//如果当前节点的前驱节点是头节点,并且又获取到了同步状态
if (p == head && tryAcquire(arg)) {
//那么就把当前节点作为头结点,并唤醒后继节点
setHead(node);
p.next = null; // help GC
failed = false;
//单纯的返回中断位,提示外层这个线程可能被中断过
return interrupted;
}
//如果自旋失败,那么就使用工具park阻塞线程。
//线程再次醒来的条件是:前驱节点被释放、或者线程被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//只是单纯的记录一下这个线程中断过
interrupted = true;//而相应中断则throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
为什么只有前驱节点是头节点才能尝试获取同步状态?
(1)头节点是获取了同步状态的节点,释放同步状态之后会唤醒后继节点,所以被唤醒的节点要判断一下自己的前驱节点是不是头节点
(2)维护队列先进先出原则。前驱节点是头节点的节点是最接近头结点的节点,也是最早加入队列的节点。
调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。即让别的线程开始执行任务。
共享式同步状态的获取
//AQS的共享式获取同步状态public final void acquireShared(int arg) {
//tryAcquiredShared方法若返回大于等于0,表明获取同步状态成功
//tryAcquiredShared由开发者重写
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//线程自旋获取同步状态
private void doAcquireShared(int arg) {
//构造当前线程并加入到同步队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
if (p == head) {
//若前驱节点是头节点
int r = tryAcquireShared(arg);
//并且成功获取到同步锁,tryAcquireShared重写的方法>=0
if (r >= 0) {
//设置当前节点是头节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
//响应外部中断该线程
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//本次自旋获取同步状态失败
if (shouldParkAfterFailedAcquire(p, node) &&
//阻塞当前节点,并且判断当前线程是否被外部中断过
parkAndCheckInterrupt())
//修改中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享式同步状态的释放
public final boolean releaseShared(int arg) {//由开发者重写的方法,若tryReleaseShared方法返回true
if (tryReleaseShared(arg)) {
//那么就释放同步状态,唤醒头节点的后继节点
doReleaseShared();
return true;
}
return false;
}
TripleSharedLock.java
package com.ssi.javase.thread2;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Created by jay.zhou on 2018/9/27.
*/
public class TripleSharedLock implements Lock {
//创建同步器对象,共享状态为3,表示同时只有3个线程能够同时获取临界资源
private static final Sync sync = new Sync(3);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count < 0) {
throw new IllegalArgumentException("synchronized state must larger than 0");
}
//设置底层同步状态为count
setState(count);
}
//返回 大于等于0 ,表示线程获取到同步状态
@Override
protected int tryAcquireShared(int reduceCount) {
//更改同步状态通过CAS保证原子性
for (; ; ) {
//获取当前同步状态
int current = getState();
//计算出最新的同步状态
int newCount = current - reduceCount;
//如果当前newCount大于等于0,那么就是获取成功,那么就进行CAS更改同步状态
//如果当前newCount小于0,那么就返回这个小于0的数值,表示获取同步状态失败
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int increaseCount) {
for (; ; ) {
//释放同步锁,相当于在同步状态state上加1
//获取当前同步状态
int current = getState();
//计算出最新的同步状态
int newCount = current + increaseCount;
//如果成功归还同步状态,那么就退出循环。
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
@Override
public void lock() {
//尝试获取同步状态,失败了AQS会用 LockSupport.park(currentThreadNode)
sync.acquireShared(1);
}
@Override
public void unlock() {
//尝试释放同步状态,死循环,直到释放成功
sync.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
//响应中断的获取同步资源
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
//返回大于等于0,表示当前线程已经获取到同步锁
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
//尝试在指定时间内获取同步资源(本方法响应中断)
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("暂时还不会实现");
}
}
5.3
重入锁表ReentrantLock表明可以对资源重复加锁。
公平锁与非公平锁。公平体现在:先请求锁的线程优先获取锁。
实现重入锁的关键:(1)当前线程能够再次获取锁。(2)重复获取同步状态n次,同步状态也需被释放释放n次。
判断当前现线程是否是获取锁的线程。如果是的话,那么就将同步状态自行增加。释放同步锁判断同步状态是否为0,为0 的话说明已经释放同步锁。
因为lock()与unlock()是成对出现的,所以重进入了n次,也会释放n次同步状态。
公平锁的优势:避免一个线程长时间得不到锁
非公平锁的优势:避免大量的线程切换带来的性能损耗,它能保证更大的吞吐量
RentrantLock默认调用顺序:ReentrantLock.lock(); Sync.acquire(1); Sync.tryAcquire(1); Sync.nonfairTryAcquire(1);
//ReentrantLock内部的子类AQS的nonfairTryAcquire方法//ReentrantLock获取同步状态
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取当前同步状态
int c = getState();
//如果当前没有其它线程获取了同步状态
if (c == 0) {
//那么就成功获取同步状态
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前持有同步锁的线程就是自己
else if (current == getExclusiveOwnerThread()) {
//计算最新的同步状态
int nextc = c + acquires;
//设置同步状态,已经加锁
setState(nextc);
return true;
}
return false;
}
//释放同步锁
protected final boolean tryRelease(int releases) {
//计算最新的同步状态
int c = getState() - releases;
//如果当前的线程不是占有同步状态的线程,那么抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//free表示是否成功设置最新的同步状态
boolean free = false;
//如果最新的同步状态为0,说明已经释放完同步状态
if (c == 0) {
//表示已经无锁了
free = true;
//设置占有同步状态的线程为null
setExclusiveOwnerThread(null);
}
//在同步状态下设置最新的同步状态
setState(c);
return free;
}
ReentrantLock内部还有一个公平的子类AQS。唯一不同的是,在尝试获取同步状态的时候,增加了判断是否前驱节点的方法。tryAcquired方法与nonfairTryAcquire方法的唯一区别。
所以它要等待前驱节点获取并释放锁之后才能继续尝试获取锁。
//公平锁的AQS的tryAcquire方法protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//判断当前节点是否有前驱节点,返回true,说明有比它更早的节点。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5.4
维护了一对锁,一个读锁ReadLock接口,与一个写锁WriteLock接口。
通过分离读锁与写锁,使得并发性能比一般其它的排他锁有了很大提升。
全部阻塞。
Lock接口的三大特性:
(1)非阻塞的获取锁 tryLock(); 获取了锁返回true。没有获取到就返回false。内部使用的是同步器Sync.tryAcquired(1)
(2)能中断的获取锁 获取锁的线程会相应中断。
(3)超时锁 可以在指定时间内尝试获取锁,在这个时间内是阻塞状态。
读写锁三大特性
(1)可重入
(2)公平性选择 通过构造函数
(3)锁降级
按位切割,整形变量的高16位维护读同步状态,低16位维护写同步状态。
总结:读锁读读共享,乃共享锁。线程安全的增加和减少同步状态。
写锁,除当前线程外的所有线程阻塞,可获取读锁(锁降级),线程安全的操作同步状态。
锁降级:先获取写锁,再获取读锁,再释放写锁。这个时候线程只剩下读锁,所以写锁降级为读锁。
5.5
LockSupport.park()方法,阻塞当前线程。再次醒来的条件是:LockSupport.unpark()方法或者线程被中断。
5.6
直到被通知 signal()或者中断。
Condition接口的signal()方法唤醒一个等待在当前Condition对象上的线程。
每个Conditoin内部都有一个等待队列。
线程调用condtion.await()方法,那么该线程会释放同步锁,并构造节点加入等待队列中。