当前位置 : 主页 > 编程语言 > java >

JUC(10)线程池

来源:互联网 收集:自由互联 发布时间:2022-07-13
文章目录 ​​一、三大方法​​ ​​二、七大参数​​ ​​三、四种拒绝策略​​ ​​3.1 new ThreadPoolExecutor.AbortPolicy()​​ ​​3.2 new ThreadPoolExecutor.CallerRunsPolicy()​​ ​​3.3 new Thr


文章目录

  • ​​一、三大方法​​
  • ​​二、七大参数​​
  • ​​三、四种拒绝策略​​
  • ​​3.1 new ThreadPoolExecutor.AbortPolicy()​​
  • ​​3.2 new ThreadPoolExecutor.CallerRunsPolicy()​​
  • ​​3.3 new ThreadPoolExecutor.DiscardPolicy()​​
  • ​​3.4 new ThreadPoolExecutor.DiscardOldestPolicy():​​
  • ​​四、手动创建线程池​​
  • ​​五、小结和拓展(CPU密集型和IO密集型!)​​
  • ​​六、Fork/Join框架​​
  • ​​6.1如何使用ForkJoin?​​

此部分可以参考

  • 线程池:三大方法、7大参数、4种拒绝策略
  • 池化技术
  • 程序的运行,本质:占用系统的资源!我们需要去优化资源的使用 ===> 池化技术
  • 线程池、JDBC的连接池、内存池、对象池 等等。。。。
  • 资源的创建、销毁十分消耗资源
  • 池化技术:事先准备好一些资源,如果有人要用,就来我这里拿,用完之后还给我,以此来提高效率。
  • 线程池的好处:
  • 1、降低资源的消耗;
  • 2、提高响应的速度;
  • 3、方便管理;
  • 线程池的作用:线程复用、可以控制最大并发数、管理线程;

一、三大方法

  • ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
  • ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小
  • ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的
package com.wlw.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo01 {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定大小的线程池
ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的

try {
for (int i = 0; i < 20; i++) {
//使用线程池来创建线程
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"ok");
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完必须要关闭线程池
threadPool.shutdown();
}
}
}

二、七大参数

  • 源码分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, //21亿
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 本质:三种方法都是开启的ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大的线程池大小
long keepAliveTime, //超时了没有人调用就会释放
TimeUnit unit, //超时单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动
RejectedExecutionHandler handler //拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • 阿里巴巴的Java操作手册中明确说明:对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池。

JUC(10)线程池_抛出异常

  • 七个参数的说明
  • 要说明的是当银行人满了的时候,就会执行拒绝策略,不再让顾客入内
  • 非核心(Core)线程,如果超过设定的时间还没有被使用,就会关闭

JUC(10)线程池_java_02

三、四种拒绝策略

  • 这是七大参数中最后一个参数:拒绝策略的类型

JUC(10)线程池_线程池_03

3.1 new ThreadPoolExecutor.AbortPolicy()

  • 该拒绝策略为:整个线程池满了,还有线程进来,不处理这个想进来的线程,并抛出异常
  • 而判断线程池满? 线程池的最大承载量为 :阻塞队列+最大的线程池大小 ,超过报java.util.concurrent.RejectedExecutionException

3.2 new ThreadPoolExecutor.CallerRunsPolicy()

  • 该拒绝策略为:哪来的去哪里 ,让 main线程去进行处理多余的线程

3.3 new ThreadPoolExecutor.DiscardPolicy()

  • 该拒绝策略为:队列满了,把多余的线程(任务)丢掉,不会抛出异常。

3.4 new ThreadPoolExecutor.DiscardOldestPolicy():

  • 该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常

四、手动创建线程池

package com.wlw.pool;

import java.util.concurrent.*;

public class Demo01 {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定大小的线程池
//ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的

ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常

//该线程池最大承载量为: maximumPoolSize + BlockingQueue = 5+3 = 8
try {
for (int i = 0; i < 8; i++) {
//使用线程池来创建线程
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"==>ok");
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完必须要关闭线程池
threadPool.shutdown();
}
}
}

五、小结和拓展(CPU密集型和IO密集型!)

  • 如何去设置线程池的最大大小( maximumPoolSize)如何去设置?
  • 利用CPU密集型和IO密集型!:
  • CPU密集型:电脑的核数是几核就选择几;这个大小就是maximunPoolSize的大小
  • Runtime.getRuntime().availableProcessors() 获取cpu的核数
  • I/O密集型:在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,然后将maximunPoolSize设置成大约是最大I/O数的一倍到两倍之间。
package com.wlw.pool;
import java.util.concurrent.*;

public class Demo01 {
public static void main(String[] args) {

ExecutorService threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常

//该线程池最大承载量为: maximumPoolSize + BlockingQueue = 5+3 = 8
try {
for (int i = 0; i < 8; i++) {
//使用线程池来创建线程
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"==>ok");
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完必须要关闭线程池
threadPool.shutdown();
}
}
}

六、Fork/Join框架

  • 什么是ForkJoin? (分支合并)
  • ForkJoin 在JDK1.7,并行执行任务!提高效率~。在大数据量速率会更快!
  • 大数据中:MapReduce 核心思想->把大任务拆分为小任务!

JUC(10)线程池_java_04

  • ForkJoin 的特点:工作窃取!
  • 实现原理是:双端队列!从上面和下面都可以去拿到任务进行执行!
  • JUC(10)线程池_forkjoin_05

6.1如何使用ForkJoin?

  • 通过ForkJoinPool来执行
  • 计算任务forkjoinPool. execute(ForkJoinTask task)
  • JUC(10)线程池_forkjoin_06

package com.wlw.forkjoin;

import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo extends RecursiveTask<Long> {

private Long start;
private Long end;

//临界值
private Long temp = 10000L;

public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}

//计算方法
@Override
protected Long compute() {
if((end - start) < temp){
Long sum = 0L;
for (Long i = start;i <= end ; i++){
sum = sum + i;
}
return sum;
}else {
//使用forkjoin 分而治之 计算
Long middle = (start+end)/2;
//拆分任务,把线程任务压入线程队列
ForkJoinDemo forkJoinDemoTask1 = new ForkJoinDemo(start, middle);
forkJoinDemoTask1.fork();
//拆分任务,把线程任务压入线程队列
ForkJoinDemo forkJoinDemoTask2 = new ForkJoinDemo(middle+1, end);
forkJoinDemoTask2.fork();

//获取结果
Long sum = forkJoinDemoTask1.join() + forkJoinDemoTask2.join();
return sum;
}
}
}package com.wlw.forkjoin;

import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//test1(); //sum=500000000500000000,时间:8598
//test2(); //sum=500000000500000000,时间:4835
test3(); //sum=500000000500000000时间:512
}

public static void test1(){
Long start = System.currentTimeMillis();
Long sum = 0L;
for (Long i = 0L;i <= 10_0000_0000L; i++){
sum = sum + i;
}
Long end = System.currentTimeMillis();
System.out.println("sum="+sum+",时间:"+(end-start));
}

public static void test2() throws ExecutionException, InterruptedException {
Long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo forkJoinDemoTask = new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemoTask);
Long aLong = submit.get();
Long end = System.currentTimeMillis();
System.out.println("sum="+aLong+",时间:"+(end-start));
}

public static void test3(){
Long start = System.currentTimeMillis();
//使用Stream
Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0,(x,y)->{return x+y;});

Long end = System.currentTimeMillis();
System.out.println("sum="+sum+"时间:"+(end-start));
}
}


上一篇:JVM(1)JVM简单总结
下一篇:没有了
网友评论