当前位置 : 主页 > 编程语言 > java >

Java多线程(单例模式,阻塞队列,定时器,线程池)详解

来源:互联网 收集:自由互联 发布时间:2023-01-30
目录 1. 单例模式(singleton pattern) 1.1 懒汉模式 1.2 饿汉模式 2 阻塞队列(blocking queue) 2.1 阻塞队列 2.2 生产者消费者模型 2.3 标准库中的阻塞队列 2.4 实现阻塞队列 3. 定时器 3.1 标准库中的
目录
  • 1. 单例模式(singleton pattern)
    • 1.1 懒汉模式
    • 1.2 饿汉模式
  • 2 阻塞队列(blocking queue)
    • 2.1 阻塞队列
    • 2.2 生产者消费者模型
    • 2.3 标准库中的阻塞队列
    • 2.4 实现阻塞队列
  • 3. 定时器
    • 3.1 标准库中的定时器
    • 3.2 实现定时器
  • 4 线程池
    • 4.1 标准库中的线程池
    • 4.2 Executors 创建线程池的几种方式
    • 4.3 利用线程池 创建多线程计算fib 数
    • 4.4 实现线程池

1. 单例模式(singleton pattern)

单例模式是通过代码,保护一个类,使得类在整个进程(应用)运行过程中有且只有一个。

常用在配置对象、控制类。

设计模式(design pattern):对一些解决通用问题的、经常书写得代码片段的总结与归纳。

1.1 懒汉模式

一开始就初始化

public class StarvingMode {
    // 是线程安全的
    // 类加载的时候执行
    // JVM 保证了类加载的过程是线程安全的
    private static StarvingMode instance = new StarvingMode();
 
    public static StarvingMode getInstance() {
        return instance;
    }
 
    // 将构造方法私有化,防止其他线程new
    private StarvingMode() {}
}

1.2 饿汉模式

等到用的时候在进行初始化

a. 饿汉模式-单线程版

类加载的时候不创建实例,第一次使用的时候才创建实例 

public class LazyModeV1 {
    private static LazyModeV1 instance = null;
 
    public static LazyModeV1 getInstance(){
        // 第一次调用这个方法时,说明我们应该实例化对象了
        // 原子性
        if (instance == null) {
            instance = new LazyModeV1();    // 只在第一次的时候执行
        }
        return instance;
    }
    // 将构造方法私有化,防止其他线程new
    private LazyModeV1(){};
}

但是如果在多个线程中同时调用 getInstance 方法, 就可能导致创 建出多个实例,一旦实例已经创建好了, 后面再多线程环境调用 getInstance 就不再有线程安全问题了(不再修改 instance 了)

b. 饿汉模式-多线程版

加 synchronized 锁 使线程安全

public class LazyModeV2 {
    private static LazyModeV2 instance = null;
 
    // 加synchronized锁,但是这样性能太低,所以有了mode3
    public synchronized static LazyModeV2 getInstance(){
        // 第一次调用这个方法时,说明我们应该实例化对象了
        if (instance == null) {
            instance = new LazyModeV2();    // 只在第一次的时候执行
        }
         return instance;
    }
 
    private LazyModeV2(){};
}

但是显而易见,如果简单粗暴的加锁,只在第一次初始化时为保证线程安全使用一次,在后续getInstance 时也要进行加锁解锁操作,降低性能。

c. 饿汉模式-多线程改进版

1.使用双重 if 判定, 降低锁竞争的频率

2.给 instance 加上了 volatile

class LazyModeV3 {
    // volatile
    private volatile static LazyModeV3 instance = null;
 
    public static LazyModeV3 getInstance(){
        // 1. 第一次调用这个方法时,说明我们应该实例化对象了
        if (instance == null) {
            // 在第一次instance 没有初始化的时候
            // 没有锁保护,有多个线程可以走到这里 a, b, c, d
 
            // 2. **但是只有第一个线程a能加锁,a 加锁后并且实例化对象,
            //    **b, c, d 加锁进去后发现instance != null, 就不会再创建了
            synchronized (LazyModeV3.class) {
                // 3. 加锁之后才能执行
                // 第一个抢到锁的线程看instance 是 null
                // 其他第一个抢到锁的线程看instance 是 null
                // 保证instance 只实例化一次
                if (instance == null) {
                    instance = new LazyModeV3();    // 只在第一次的时候执行
 
                    // 4. 但是还可能出问题,出现重排序,变成 1 -> 3 -> 2 其他线程掉instance就出现问题,
                    // 所以定义时就加上volatile,防止重排序;
                }
            }
        }
        return instance;
    }
 
    private LazyModeV3(){};
}

2 阻塞队列(blocking queue)

2.1 阻塞队列

阻塞队列是一种特殊的队列也遵守 "先进先出" 的原则

阻塞队列能是一种线程安全的数据结构, 并且具有以下特性:

  • 当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素
  • 当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插入元素

阻塞队列的一个典型应用场景就是 "生产者消费者模型".

2.2 生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取

  • 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
  • 阻塞队列也能使生产者和消费者之间 解耦

2.3 标准库中的阻塞队列

在 Java 标准库,JUC包下的blocking queue,是Queue 的子接口

  • BlockingQueue 是一个接口. 真正实现的类是 LinkedBlockingQueue(无上限)、ArrayBlockingQueue(有上限)
  • put 方法用于阻塞式的入队列, take 用于阻塞式的出队列
  • BlockingQueue 也有 offer, poll, peek 等方法, 但是这些方法不带有阻塞特性
  • 都会抛出lnterruptedException 异常,可以被中断
public class Main0 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue b1 = new LinkedBlockingDeque();
        BlockingQueue<Integer> b2 = new ArrayBlockingQueue<>(3);
        b2.put(1);
        b2.put(2);
        b2.put(3);
        b2.put(4); // 插入第四个时就会阻塞
    }
}

2.4 实现阻塞队列

通过 "循环队列" 的方式来实现.

使用 synchronized 进行加锁控制.

put 插入元素的时候, 判定如果队列满了, 就进行 wait. (注意, 要在循环中进行 wait. 被唤醒时不一定 队列就不满了, 因为同时可能是唤醒了多个线程).

take 取出元素的时候, 判定如果队列为空, 就进行 wait. (也是循环 wait)

public class MyArrayBlockingQueue {
    private long[] array;
    private int frontIndex;
    private int rearIndex;
    private int size;
 
    public MyArrayBlockingQueue (int capacity){
        array = new long[capacity];
        frontIndex = 0;
        rearIndex = 0;
        size = 0;
    }
 
    public synchronized void put (long val) throws InterruptedException {
        // while 防止假唤醒
        while(size == array.length){
            this.wait();
        }
 
        // 预期:队列一定不是满的
        array[rearIndex] = val;
        rearIndex++;
        if(rearIndex == array.length){
            rearIndex = 0;
        }
 
        // notify();
        // 在多生产者,多消费者时用notifyAll()
        notifyAll();
    }
 
    public synchronized long take () throws InterruptedException {
        while(size == 0){
            wait();
        }
 
        long val = array[frontIndex];
        frontIndex++;
        if(frontIndex == array.length){
            frontIndex = 0;
        }
 
        // notify();
        // 在多生产者,多消费者时用notifyAll()
        notifyAll();
        return val;
    }
}

3. 定时器

定时器一种实际开发中非常常用的组件,类似于一个“闹钟”,达到特定时间执行某个特定的代码。

3.1 标准库中的定时器

标准库中提供了一个 Timer 类. Timer 类的核心方法为 schedule

schedule 包含两个参数. 第一个参数指定即将要执行的任务代码, 第二个参数指定多长时间之后执 行 (单位为毫秒)

public class UserTimer {
    public static void main(String[] args) {
        Timer timer = new Timer();
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                System.out.println("闹钟响了");
            }
        };
        //timer.schedule(task, 5000); // 5秒后执行任务
        timer.schedule(task, 2000, 3000); // 2秒后执行任务,并且之后每三秒执行一次
        while (true){} // 主线程死循环,所以之后的输出都不是主线程打印的
    }
}

3.2 实现定时器

  • 一个带优先级的阻塞队列
  • 队列中的每个元素是一个 Task 对象.
  • Task 中带有一个时间属性, 队首元素就是即将执行的Task
  • 同时有一个 worker 线程一直扫描队首元素, 看队首元素是否需要执行

为啥要带优先级呢?

因为阻塞队列中的任务都有各自的执行时刻 (delay). 最先执行的任务一定是 delay 最小的. 使用带 优先级的队列就可以高效的把这个 delay 最小的任务找出来.  

import java.util.concurrent.PriorityBlockingQueue;
 
// 定义一个工作抽象类
abstract class MyTimerTask implements Comparable<MyTimerTask> {
    long runAt;     // 这个任务应该在何时运行(记录为 ms 为单位的时间戳)
    abstract public void run();
 
    @Override
    public int compareTo(MyTimerTask o) {
        if (runAt < o.runAt) {
            return -1;
        } else if (runAt > o.runAt) {
            return 1;
        } else {
            return 0;
        }
    }
}
 
// 定时器
public class MyTimer {
    // 这里是普通属性,不是静态属性
    // 优先级队列,要求元素具备比较能力
    private final PriorityBlockingQueue<MyTimerTask> queue = new PriorityBlockingQueue<>();
 
    private final Object newTaskComing = new Object();
 
    public MyTimer() {
        Worker worker = new Worker();
        worker.start();
    }
 
    // 不能使用静态内部类,否则看不到外部类的属性
    class Worker extends Thread {
 
        @Override
        public void run() {
            while (true) {
                MyTimerTask task = null;
                try {
                    task = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // task 应该有个应该执行的时刻(不能记录 delay)
                long now = System.currentTimeMillis();
                long delay = task.runAt - now;
                if (delay <= 0) {
                    task.run();
                } else {
                    try {
                        // Thread.sleep(delay);    // 5s
 
                        // 应该在两种条件下醒来:
                        // 1. 有新的任务过来了(任务可能比当前最小的任务更靠前)
                        // 2. 没有新任务来,但到了该执行该任务的时候了
                        synchronized (newTaskComing) {
                            newTaskComing.wait(delay); // 最多等待delay秒
                        }
 
                        // 如果当前时间已经在要执行任务的时间之后了
                        // 说明任务的执行时间已过,所以应该去执行任务了
                        // 否则,先把这个任务放回去(因为时间还没到),再去取最小的任务
                        if (System.currentTimeMillis() >= task.runAt) {
                            task.run();
                        } else {
                            queue.put(task);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
 
    public void schedule(MyTimerTask task, long delay) {
        // 该方法非工作线程(主线程)调用
        task.runAt = System.currentTimeMillis() + delay;
        queue.put(task);
        synchronized (newTaskComing) {
            newTaskComing.notify();
        }
    }
}

4 线程池

因为创建线程 / 销毁线程 的开销较大,使用线程池就是减少每次启动、销毁线程的损耗

4.1 标准库中的线程池

Executor -> ExecutorService -> ThreadPoolExcutor() 实现类

  • corePoolSize: 正式员工的名额上限
  • maximumPoolSize: 正式+临时的名额上限
  • keepAliveTime + unit: 临时工允许空闲时间的上限
  • workQueue: 任务队列
  • handler: 拒绝(默认)、调用者允许、丢弃最老的、丢弃当前
import java.util.Scanner;
import java.util.concurrent.*;
 
public class Demo {
    public static void main(String[] args) {
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);
        ExecutorService service = new ThreadPoolExecutor(
                3, // 正式员工 10
                9, // 临时员工 20
                10, TimeUnit.SECONDS,
                queue,  // 阻塞队列
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r, "饭店厨师");
                        return t;
                    }
                }, // 线程工厂
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );
 
        // 定义任务
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.DAYS.sleep(365);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
 
        // 把任务提交给线程池对象(公司)
        Scanner s = new Scanner(System.in);
        for (int i = 1; i < 100; i++) {
            s.nextLine();
            service.execute(task);
            System.out.println(i);
        }
    }
}

Executor 是接口

Executor 定义了一些固定策略的线程池

4.2 Executors 创建线程池的几种方式

  • newFixedThreadPool: 创建固定线程数的线程池(只有正式员工)
  • newCachedThreadPool: 创建线程数目动态增长的线程池(只有临时员工)
  • newSingleThreadExecutor: 创建只包含单个线程的线程池(只有一个正式员工)
  • newScheduledThreadPool: 设定 延迟时间后执行命令,或者定期执行命令. 是进阶版的 Timer
public class Demo2 {
    public static void main(String[] args) {
        // 不太建议在实际生产项目下使用
        ExecutorService service = Executors.newFixedThreadPool(10);
        ExecutorService service1 = Executors.newSingleThreadExecutor();
        ExecutorService service2 = Executors.newCachedThreadPool();
 
        Runnable task = new Runnable() {
            @Override
            public void run() {
 
            }
        };
        service.execute(task);
    }
}

4.3 利用线程池 创建多线程计算fib 数

import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class Demo3 {
    static class CalcFib implements Runnable {
        private final int n;
        CalcFib(int n) {
            this.n = n;
        }
 
        @Override
        public void run() {
            long r = fib(n);
            System.out.printf("fib(%d) = %d\n", n, r);
        }
        private long fib(int n) {
            if (n == 0 || n == 1) {
                return 1;
            }
            return fib(n - 1) + fib(n - 2);
        }
    }
 
    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        ExecutorService service = Executors.newFixedThreadPool(10);
        while (true) {
            System.out.print("提交数字: ");
            int n = scanner.nextInt();
            Runnable task = new CalcFib(n);
            service.execute(task);
        }
    }
}

4.4 实现线程池

总结: 

线程中线程是按需创建:

  • 一开始一个线程都没有︰随着任务提交,创建core线程(当前线程数<corePoolSize)
  • 优先提交队列,直到队列满
  • 创建临时工去处理 > corePoolSize的线程,直到maximumPoolSize
  • 执行拒绝策略
import java.util.concurrent.*;
 
// 线程池类
public class MyThreadPoolExecutor implements Executor {
    private int currentCoreSize;      // 当前正式员工的数量
 
    private final int corePoolSize;   // 正式员工的数量上限
 
    private int currentTemporarySize; // 当前临时员工的数量
 
    private final int temporaryPoolSize;      // 临时员工的数量上限
 
    private final ThreadFactory threadFactory;// 创建线程的工厂对象
 
    // 临时工摸鱼的时间上限
    private final long keepAliveTime;
    private final TimeUnit unit;
 
    // 传递任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;
 
    public MyThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler) {
        this.corePoolSize = corePoolSize;
        this.temporaryPoolSize = maximumPoolSize - corePoolSize;
        this.workQueue = workQueue;
        this.threadFactory = threadFactory;
        this.keepAliveTime = keepAliveTime;
        this.unit = unit;
    }
 
    // 向线程池中提交任务
    @Override
    public void execute(Runnable command) {
        // 1. 如果正式员工的数量还低于正式员工的数量上限,则优先创建正式员工处理任务
        // 1.1 需要管理,当前正式员工有多少,正式员工的数量上限有多少?
        if (currentCoreSize < corePoolSize) {
            // 优先创建正式员工进行处理
            // 创建一个线程,这个线程中的任务就是不断地取任务-做任务,但是不需要考虑退出的问题
            CoreJob job = new CoreJob(workQueue, command);
//            Thread thread = new Thread(job);    // 不使用工厂创建的线程
            Thread thread = threadFactory.newThread(job);   // thread 代表的就是正式员工
            String name = String.format("正式员工-%d", currentCoreSize);
            thread.setName(name);
 
            thread.start();
 
            // 只是两种不同的策略,没有谁是正确的说法
            // 1. 把 command 放到队列中;command 的执行次序是在队列已有的任务之后
            // 2. 创建正式员工的时候,就把 command 提交给正式员工,让 command 优先执行
            // 我们这里采用第二种方案,主要原因就是 java 官方的就是使用的第二种策略
 
            currentCoreSize++;
            return;
        }
 
        // 走到这里,说明正式员工的数量 == 正式员工的上限了
        // 2. 优先把任务放入队列中,如果放入成功,execute 执行结束,否则还需要继续
        // 2.1 需要一个阻塞队列
        // workQueue.put(command); // 带阻塞的放入,是否满足这里的需求?
        // 我们这里希望的是立即得到结果
        boolean success = workQueue.offer(command);
        if (success == true) {
            // 说明放入队列成功
            return;
        }
 
        // 队列也已经放满了
        // 3. 继续判断,临时工的数量有没有到上限,如果没有到达,创建新的临时工来处理
        if (currentTemporarySize < temporaryPoolSize) {
            // 创建临时工进行处理
            TemporaryJob job = new TemporaryJob(keepAliveTime, unit, workQueue, command);
            //Thread thread = new Thread(job);    // 不使用工厂创建的线程
            Thread thread = threadFactory.newThread(job);   // thread 代表的就是临时员工
            String name = String.format("临时员工-%d", currentTemporarySize);
            thread.setName(name);
 
            thread.start();
 
            currentTemporarySize++;
            return;
        }
 
        // 4. 执行拒绝策略
        // 为了实现方便,暂时不考虑其他策略
        throw new RejectedExecutionException();
    }
 
    // 一个正式员工线程要完成的工作
    class CoreJob implements Runnable {
        // 需要阻塞队列
        private final BlockingQueue<Runnable> workQueue;
        private Runnable firstCommand;
 
        CoreJob(BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
            this.workQueue = workQueue;
            this.firstCommand = firstCommand;
        }
 
        @Override
        public void run() {
            try {
                firstCommand.run();     // 优先先把刚提交的任务先做掉了
                firstCommand = null;    // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收
 
                while (!Thread.interrupted()) {
                    Runnable command = workQueue.take();
                    command.run();
                }
            } catch (InterruptedException ignored) {}
        }
    }
 
    // 一个临时员工线程要完成的工作
    class TemporaryJob implements Runnable {
        // 需要阻塞队列
        private final BlockingQueue<Runnable> workQueue;
        private final long keepAliveTime;
        private final TimeUnit unit;
        private Runnable firstCommand;
 
        TemporaryJob(long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Runnable firstCommand) {
            this.keepAliveTime = keepAliveTime;
            this.unit = unit;
            this.workQueue = workQueue;
            this.firstCommand = firstCommand;
        }
 
        @Override
        public void run() {
            try {
                firstCommand.run();     // 优先先把刚提交的任务先做掉了
                firstCommand = null;    // 这里设置 null 的意思是,不影响 firstCommand 对象被 GC 时的回收
 
                // 一旦超过一定时间没有任务,临时工是需要退出的
                // 1. keepAliveTime + unit 记录起来
                // 2. 怎么就知道超过多久没有任务了?如果一定时间内都无法从队列中取出来任务,则认为摸鱼时间够了
                while (!Thread.interrupted()) {
//                Runnable command = workQueue.take();
                    Runnable command = workQueue.poll(keepAliveTime, unit);
                    if (command == null) {
                        // 说明,没有取到任务
                        // 说明超时时间已到
                        // 说明该线程已经 keepAliveTime + unit 时间没有工作了
                        // 所以,可以退出了
                        break;
                    }
                    command.run();
                }
            } catch (InterruptedException ignored) {}
        }
    }
}

以上就是Java多线程(单例模式,阻塞队列,定时器,线程池)详解的详细内容,更多关于Java多线程的资料请关注自由互联其它相关文章!

网友评论