并行流和串行流
基于尚硅谷java8教程
1. 并行流和串行流的介绍
为了适应目前多核机器的时代,提高系统CPU、内存的利用率,在jdk1.8新的stream包中针对集合的操作也提供了并行操作流和串行操作流。并行流就是把内容切割成多个数据块,并且使用多个线程分别处理每个数据块的内容。Stream api中声明可以通过parallel()与sequential()方法在并行流和串行流之间进行切换。 jdk1.8并行流使用的是fork/join框架进行并行操作。 注意:
- 使用并行流并不是一定会提高效率,因为jvm对数据进行切片和切换线程也是需要时间的。所以数据量越小,串行操作越快;数据量越大,并行操作效果越好。
2. fork/join框架简介
I. 简介
Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。 如下图所示
II. fork/join与传统线程池的区别
采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线 程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中 相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的 处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因 无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果 某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子 问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程 的等待时间,提高了性能.
III. fork/join核心类
ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm)。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。 ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。Fork/Join框架提供了在一个任务里执行fork()和join()操作的机制和控制任务状态的方法。通常,为了实现Fork/Join任务,需要实现一个以下两个类之一的子类。 RecursiveAction:用于任务没有返回结果的场景。 RecursiveTask:用于任务有返回结果的场景。
IV. fork/join demo
package com.seven.jdk8;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;/** * fork/join使用demo * . */public class ForkJoinTest { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); /** * get()和join()有两个主要的区别: * join()方法同步返回,不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。如果任务抛出任何未受检异常, * get()方法异步返回将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。 */ //同步返回结果 //Future result = pool.submit(new CountTask(0, 2000)); //System.out.println(result.get()); //异步返回结果 CountTask task = new CountTask(0, 2000); pool.execute(task); pool.shutdown(); Integer count = task.join(); System.out.println(count); }}/** * 计算1..n相加总和的简单demo * */class CountTask extends RecursiveTask { //边界值 private static final int THRESHOLD = 50; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) sum += i; } else { int mid = (start + end) / 2; CountTask t1 = new CountTask(start, mid); CountTask t2 = new CountTask(mid+1, end); t1.fork(); t2.fork(); sum = t1.join() + t2.join(); } return sum; }}
3. 范例
package com.seven.jdk8;import org.junit.Test;import java.util.stream.LongStream;/** * 测试串行流和并行流 */public class IOTest { //使用串行流 @Test public void test1(){ Long start = System.currentTimeMillis(); long sum = 0L; for(long i=0L;i<=10000000000L;i++){ sum+=i; } Long end = System.currentTimeMillis(); System.out.println("计算总和为--> "+sum+", 执行耗时-->"+(end-start)); //计算总和为--> -5340232216128654848, 执行耗时-->4543 } //使用并行流 @Test public void test2(){ Long start = System.currentTimeMillis(); Long sum = LongStream.rangeClosed(0L,10000000000L).parallel().sum(); Long end = System.currentTimeMillis(); System.out.println("计算总和为--> "+sum+", 执行耗时-->"+(end-start));//计算总和为--> -5340232216128654848, 执行耗时-->4284 } //使用stream中的串行流 @Test public void test3(){ Long start = System.currentTimeMillis(); Long sum = LongStream.rangeClosed(0L,10000000000L).sequential().sum(); Long end = System.currentTimeMillis(); System.out.println("计算总和为--> "+sum+", 执行耗时-->"+(end-start));//计算总和为--> -5340232216128654848, 执行耗时-->6591 }}源代码地址:http://git.oschina.net/johnny/java_base