一、Semaphore
Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。
Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。
信号量通过一组许可证来控制对共享资源的访问。
如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程就会醒来尝试获取许可。
Semaphore提供给了若干个api对应不同的功能:
- Semaphore(int permits):非公平模式创建;
- Semaphore(int permits, boolean fair):可以指定是否公平模式创建;
- acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;
- acquire(int permits):跟上一个方法类型,尝试获取permits个许可;
- acquireUninterruptibly():尝试获取一个许可,不可中断;
- acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;
- tryAcquire():尝试获取一个许可,获取不到则直接返回失败;
- tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;
- tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;
- tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;
- release():释放一个许可;
- release(int permits):释放n个许可;
下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly():
这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。
二、执行原理
Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:
- 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
- 执行release的时候,state + releases,把许可加回去。
三、Semaphore用法
/** * @Description: 演示Semaphore用法 */ public class SemaphoreDemo { public static Semaphore semaphore = new Semaphore(3,true); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { executorService.execute(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"拿到了许可证"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"释放了许可证"); semaphore.release(); } }); } executorService.shutdown(); } }注意,如果使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。
四、源码
公平信号量 和 非公平信号量 的区别
"公平信号量"和"非公平信号量"的释放信号量的机制是一样的!
不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。
4.1 Semaphore构造方法
public class Semaphore implements java.io.Serializable { private final Sync sync; public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } }-
1、Semaphore 构造器,permits 为传入的许可证数,默认非公平构造器;
-
2、Semaphore 构造器,permits 为传入的许可证数,fair 是 boolean 型的,如果传入 true,则公平,否则不公平;
4.2 NonfairSync 和 FairSync源码
public class Semaphore implements java.io.Serializable { private final Sync sync; static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }两者都继承了 Sync 同步器,初始化时都调用了父类构造器,同时都有一个获取信号的方法,稍后再分析获取信号的区别。
4.3 acquire(获取信号量)
- 这个方法是从信号量获取一个许可,在获取到许可,或线程中断之前,当前线程阻塞;获取许可后立即返回并将许可数减一
- acquireSharedInterruptibly 这个方法是直接调用AQS的acquireSharedInterruptibly(int ard)方法;
tryAcquireShared(arg)则会调用Semaphore中两个同步器的tryAcquireShared实现方法; 如果获取失败则加入队列等待唤醒;
4.4 非公平模式的实现
非公平实现都是首先查看是否有可获取的许可,如果有则获取成功,没有则进队列等待;利用此可以提高并发量
public class Semaphore implements java.io.Serializable { private final Sync sync; static final class NonfairSync extends Sync { protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } }- 直接调用其父类Sync中非公平共享获取
通过自旋+CAS来一直尝试获取许可,直到获取成功或者没有许可,返回剩余的许可数
4.5 公平模式的实现
公平与非公平的区别在于始终按照AQS队列FIFO的顺序来的
public class Semaphore implements java.io.Serializable { private final Sync sync; static final class FairSync extends Sync { protected int tryAcquireShared(int acquires) { //自旋 CAS 实现线程安全 for (;;) { // 判断是否有前置任务排队 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; // 如果小于0则直接返回,否则利用CAS给AQS状态位赋值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }如果等待队列不为空,则直接返回-1。 以上两种模式获取失败后都会调用doAcquireSharedInterruptibly(int arg);自旋等待获取锁
- doAcquireSharedInterruptibly方法:会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回
- shouldParkAfterFailedAcquire方法:判断当前线程获取锁失败之后是否需要挂起
4.6 void release()
公平和非公平使用相同的释放 释放许可
public class Semaphore implements java.io.Serializable { private final Sync sync; public void release() { sync.releaseShared(1); } }- 调用AQS中的releaseShared(int arg)
- tryReleaseShared()在Semaphore.Sync中被重写,释放共享锁,将锁计数器加回去
- 如果释放许可成功,则调用AQS中的doReleaseShared()方法来唤醒AQS队列中等待的线程
-
1)获取队列的头节点元素,如果不为null,并且不为尾节点,说白了,就是不止一个人等待,进入判断。
-
2)如果线程节点是需要唤醒的线程,则进行唤醒,获取资源使用。
-
3)失败后重试。
-
4)如果没有后继需要唤醒的节点,则退出,就相当于每人排队上厕所了,让出来资源就空着。
Semaphore 总结
-
1、Semaphore 内部维护一组信号量,即一个 volatile 的整型 state 变量。
-
2、Semaphore 分为公平或非公平两种方式,获取信号量或释放信号量的本质是对 state 进行原子的减少或增加操作。
-
3、获取不到信号的线程放在等待队列里面,释放信号的时候会唤醒后继节点。
-
4、Semaphore 主要用于对线程数量、公共资源(比如数据库连接池)等进行数量控制。
参考: https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html
https://www.cnblogs.com/200911/p/6060359.html
https://juejin.cn/post/6844904119547723789
https://blog.csdn.net/yhl_jxy/article/details/87279383