当前位置 : 主页 > 编程语言 > java >

Java并发编程——Semaphore

来源:互联网 收集:自由互联 发布时间:2023-02-04
一、Semaphore Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Se

一、Semaphore

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

 

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。很多年以来,我都觉得从字面上很难理解Semaphore所表达的含义,只能把它比作是控制流量的红绿灯,比如XX马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入XX马路,但是如果前一百辆中有五辆车已经离开了XX马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

 

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。

 

信号量通过一组许可证来控制对共享资源的访问。

 

如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程就会醒来尝试获取许可。

 

Semaphore提供给了若干个api对应不同的功能:

  • Semaphore(int permits):非公平模式创建;
  • Semaphore(int permits, boolean fair):可以指定是否公平模式创建;
  • acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;
  • acquire(int permits):跟上一个方法类型,尝试获取permits个许可;
  • acquireUninterruptibly():尝试获取一个许可,不可中断;
  • acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;
  • tryAcquire():尝试获取一个许可,获取不到则直接返回失败;
  • tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;
  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;
  • tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;
  • release():释放一个许可;
  • release(int permits):释放n个许可;

下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly():

这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。

二、执行原理

Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:

  • 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
  • 执行release的时候,state + releases,把许可加回去。

三、Semaphore用法

/** * @Description: 演示Semaphore用法 */ public class SemaphoreDemo { public static Semaphore semaphore = new Semaphore(3,true); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { executorService.execute(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"拿到了许可证"); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"释放了许可证"); semaphore.release(); } }); } executorService.shutdown(); } }

注意,如果使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。

四、源码

公平信号量 和 非公平信号量 的区别

"公平信号量"和"非公平信号量"的释放信号量的机制是一样的!

 

不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。

4.1 Semaphore构造方法

public class Semaphore implements java.io.Serializable { private final Sync sync; public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } }
  • 1、Semaphore 构造器,permits 为传入的许可证数,默认非公平构造器;

  • 2、Semaphore 构造器,permits 为传入的许可证数,fair 是 boolean 型的,如果传入 true,则公平,否则不公平;

4.2 NonfairSync 和 FairSync源码

public class Semaphore implements java.io.Serializable { private final Sync sync; static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }

两者都继承了 Sync 同步器,初始化时都调用了父类构造器,同时都有一个获取信号的方法,稍后再分析获取信号的区别。

4.3 acquire(获取信号量)

  • 这个方法是从信号量获取一个许可,在获取到许可,或线程中断之前,当前线程阻塞;获取许可后立即返回并将许可数减一
public class Semaphore implements java.io.Serializable { private final Sync sync; /** * 如果没有许可可用,则会休眠,直到发生以下两种情况 * 1、其他调用release方法释放许可,并且当前线程获取到许可 * 2、其他线程中断了当前线程 * 1)当前线程在进入这个方法时设置了中断标志位 * 2)等待许可时发生了中断,则抛出中断异常 */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
  • acquireSharedInterruptibly 这个方法是直接调用AQS的acquireSharedInterruptibly(int ard)方法;
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 首先检测是否中断.中断后抛出异常 * 尝试获取许可,成功退出;失败则进入AQS队列,直至成功获取或中断 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取锁,返回剩余共享锁的数量;小于0则加入同步队列,自旋 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } }

tryAcquireShared(arg)则会调用Semaphore中两个同步器的tryAcquireShared实现方法; 如果获取失败则加入队列等待唤醒;

4.4 非公平模式的实现

非公平实现都是首先查看是否有可获取的许可,如果有则获取成功,没有则进队列等待;利用此可以提高并发量

public class Semaphore implements java.io.Serializable { private final Sync sync; static final class NonfairSync extends Sync { protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } }
  • 直接调用其父类Sync中非公平共享获取
public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { // 自旋直到无许可或者状态位赋值成功 for (;;) { int available = getState(); int remaining = available - acquires; // 如果小于0则直接返回,否则利用CAS给AQS状态位赋值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }

通过自旋+CAS来一直尝试获取许可,直到获取成功或者没有许可,返回剩余的许可数

4.5 公平模式的实现

公平与非公平的区别在于始终按照AQS队列FIFO的顺序来的

public class Semaphore implements java.io.Serializable { private final Sync sync; static final class FairSync extends Sync { protected int tryAcquireShared(int acquires) { //自旋 CAS 实现线程安全 for (;;) { // 判断是否有前置任务排队 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; // 如果小于0则直接返回,否则利用CAS给AQS状态位赋值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }

如果等待队列不为空,则直接返回-1。 以上两种模式获取失败后都会调用doAcquireSharedInterruptibly(int arg);自旋等待获取锁

  • doAcquireSharedInterruptibly方法:会使得当前线程一直等待,直到当前线程获取到锁(或被中断)才返回
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //创建“当前线程”的Node节点,且node中记录的锁是“共享锁”类型,并将节点添加到CLH队列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //获取前继节点,如果前继节点是等待锁队列的表头,则尝试获取共享锁 // 判断新增的节点的前一个节点是否头节点 final Node p = node.predecessor(); if (p == head) { // 是头节点,那么在此尝试获取共享锁 int r = tryAcquireShared(arg); if (r >= 0) { // 获取成功,把当前节点变为新的head节点, //并且检查后续节点是否可以在共享模式下等待, //并且允许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //前继节点不是头节点,当前线程一直等待,直到获取到锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } }
  • shouldParkAfterFailedAcquire方法:判断当前线程获取锁失败之后是否需要挂起
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /*说明:4.shouldParkAfterFailedAcquire 返回当前线程是否应该阻塞 (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值) CANCELLED[1] -- 当前线程已被取消 SIGNAL[-1] -- “当前线程的后继线程需要被unpark(唤醒)”。 一般发生情况是:当前线程的后继线程处于阻塞状态, 而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。 CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒 PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁” [0] -- 当前线程不属于上面的任何一种状态。 (02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。 规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。 规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前驱节点的状态 int ws = pred.waitStatus; // 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true if (ws == Node.SIGNAL) return true; // 如果前继节点是取消的状态即前驱节点状态为CANCELLED if (ws > 0) { // 从队尾向前寻找第一个状态不为CANCELLED的节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将前驱节点的状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } }

4.6 void release()

公平和非公平使用相同的释放 释放许可

public class Semaphore implements java.io.Serializable { private final Sync sync; public void release() { sync.releaseShared(1); } }
  • 调用AQS中的releaseShared(int arg)
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //目的是让当前线程释放它所持有的共享锁,它首先会通过tryReleaseShared()去尝试释放共享锁。 //尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。 public final boolean releaseShared(int arg) { //释放共享锁 if (tryReleaseShared(arg)) { //唤醒所有共享节点线程 doReleaseShared(); return true; } return false; } }
  • tryReleaseShared()在Semaphore.Sync中被重写,释放共享锁,将锁计数器加回去
public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取“锁计数器”的状态 int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // 通过CAS函数进行赋值。 if (compareAndSetState(current, next)) return true; } } } }
  • 如果释放许可成功,则调用AQS中的doReleaseShared()方法来唤醒AQS队列中等待的线程
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** * 唤醒同步队列中的一个线程 */ private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //是否需要唤醒后继节点 if (ws == Node.SIGNAL) { //修改状态为初始0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒h.nex节点线程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } }
  • 1)获取队列的头节点元素,如果不为null,并且不为尾节点,说白了,就是不止一个人等待,进入判断。

  • 2)如果线程节点是需要唤醒的线程,则进行唤醒,获取资源使用。

  • 3)失败后重试。

  • 4)如果没有后继需要唤醒的节点,则退出,就相当于每人排队上厕所了,让出来资源就空着。

Semaphore 总结

  • 1、Semaphore 内部维护一组信号量,即一个 volatile 的整型 state 变量。

  • 2、Semaphore 分为公平或非公平两种方式,获取信号量或释放信号量的本质是对 state 进行原子的减少或增加操作。

  • 3、获取不到信号的线程放在等待队列里面,释放信号的时候会唤醒后继节点。

  • 4、Semaphore 主要用于对线程数量、公共资源(比如数据库连接池)等进行数量控制。

参考: https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html

https://www.cnblogs.com/200911/p/6060359.html

https://juejin.cn/post/6844904119547723789

https://blog.csdn.net/yhl_jxy/article/details/87279383

网友评论