第十八章 Fork/Join框架

18.1 什么是Fork/Join

Fork/Join框架是一个实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。

与其他ExecutorService相关的实现相同的是,Fork/Join框架会将任务分配给线程池中的线程。而与之不同的是,Fork/Join框架在执行任务时使用了工作窃取算法

fork在英文里有分叉的意思,join在英文里连接、结合的意思。顾名思义,fork就是要使一个大任务分解成若干个小任务,而join就是最后将各个小任务的结果结合起来得到大任务的结果。

Fork/Join的运行流程大致如下所示:

fork/join流程图

需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止。用伪代码来表示如下:

  1. solve(任务):
  2. if(任务已经划分到足够小):
  3. 顺序执行任务
  4. else:
  5. for(划分任务得到子任务)
  6. solve(子任务)
  7. 结合所有子任务的结果到上一层循环
  8. return 最终结合的结果

通过上面伪代码可以看出,我们通过递归嵌套的计算得到最终结果,这里有体现分而治之(divide and conquer) 的算法思想。

18.2 工作窃取算法

工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。

工作窃取流程如下图所示:

工作窃取算法流程

值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。

另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

18.3 Fork/Join的具体实现

前面我们说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类ForkJoinTask来实现任务。

18.3.1 ForkJoinTask

ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。

fork()方法:使用线程池中的空闲线程异步提交任务

  1. // 本文所有代码都引自Java 8
  2. public final ForkJoinTask<V> fork() {
  3. Thread t;
  4. // ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
  5. // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
  6. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
  7. ((ForkJoinWorkerThread)t).workQueue.push(this);
  8. else
  9. // 如果不是则将线程加入队列
  10. // 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
  11. ForkJoinPool.common.externalPush(this);
  12. return this;
  13. }

其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里

join()方法:等待处理任务的线程处理完毕,获得返回值。

来看下join()的源码:

  1. public final V join() {
  2. int s;
  3. // doJoin()方法来获取当前任务的执行状态
  4. if ((s = doJoin() & DONE_MASK) != NORMAL)
  5. // 任务异常,抛出异常
  6. reportException(s);
  7. // 任务正常完成,获取返回值
  8. return getRawResult();
  9. }
  10. /**
  11. * doJoin()方法用来返回当前任务的执行状态
  12. **/
  13. private int doJoin() {
  14. int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  15. // 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
  16. return (s = status) < 0 ? s :
  17. // 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
  18. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
  19. // 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
  20. // tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
  21. // doExec()方法执行任务
  22. (w = (wt = (ForkJoinWorkerThread)t).workQueue).
  23. // 如果是处于顶端并且任务执行完毕,返回结果
  24. tryUnpush(this) && (s = doExec()) < 0 ? s :
  25. // 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
  26. // awaitJoin():使用自旋使任务执行完成,返回结果
  27. wt.pool.awaitJoin(w, this, 0L) :
  28. // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
  29. externalAwaitDone();
  30. }

我们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,下面是ForkJoinPool.join()的流程图:
join流程图

RecursiveAction和RecursiveTask

通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveActionRecursiveTask

两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask

此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。

18.3.2 ForkJoinPool

ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。

ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。

我们来大致看下ForkJoinPool的源码:

  1. @sun.misc.Contended
  2. public class ForkJoinPool extends AbstractExecutorService {
  3. // 任务队列
  4. volatile WorkQueue[] workQueues;
  5. // 线程的运行状态
  6. volatile int runState;
  7. // 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
  8. public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
  9. // 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
  10. static final ForkJoinPool common;
  11. // 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
  12. // 其他构造方法都是源自于此方法
  13. // parallelism: 并行度,
  14. // 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
  15. private ForkJoinPool(int parallelism,
  16. ForkJoinWorkerThreadFactory factory, // 工作线程工厂
  17. UncaughtExceptionHandler handler, // 拒绝任务的handler
  18. int mode, // 同步模式
  19. String workerNamePrefix) { // 线程名prefix
  20. this.workerNamePrefix = workerNamePrefix;
  21. this.factory = factory;
  22. this.ueh = handler;
  23. this.config = (parallelism & SMASK) | mode;
  24. long np = (long)(-parallelism); // offset ctl counts
  25. this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  26. }
  27. }

WorkQueue

双端队列,ForkJoinTask存放在这里。

当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。

ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。

runState

ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。

18.4 Fork/Join的使用

上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。

下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

  1. public class FibonacciTest {
  2. class Fibonacci extends RecursiveTask<Integer> {
  3. int n;
  4. public Fibonacci(int n) {
  5. this.n = n;
  6. }
  7. // 主要的实现逻辑都在compute()里
  8. @Override
  9. protected Integer compute() {
  10. // 这里先假设 n >= 0
  11. if (n <= 1) {
  12. return n;
  13. } else {
  14. // f(n-1)
  15. Fibonacci f1 = new Fibonacci(n - 1);
  16. f1.fork();
  17. // f(n-2)
  18. Fibonacci f2 = new Fibonacci(n - 2);
  19. f2.fork();
  20. // f(n) = f(n-1) + f(n-2)
  21. return f1.join() + f2.join();
  22. }
  23. }
  24. }
  25. @Test
  26. public void testFib() throws ExecutionException, InterruptedException {
  27. ForkJoinPool forkJoinPool = new ForkJoinPool();
  28. System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
  29. long start = System.currentTimeMillis();
  30. Fibonacci fibonacci = new Fibonacci(40);
  31. Future<Integer> future = forkJoinPool.submit(fibonacci);
  32. System.out.println(future.get());
  33. long end = System.currentTimeMillis();
  34. System.out.println(String.format("耗时:%d millis", end - start));
  35. }
  36. }

上面例子在本机的输出:

  1. CPU核数:4
  2. 计算结果:102334155
  3. 耗时:9490 millis

需要注意的是,上述计算时间复杂度为O(2^n),随着n的增长计算效率会越来越低,这也是上面的例子中n不敢取太大的原因。

此外,也并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率,下面我们试试用普通的递归来求f(n)的值,看看是不是要比使用Fork/Join快:

  1. // 普通递归,复杂度为O(2^n)
  2. public int plainRecursion(int n) {
  3. if (n == 1 || n == 2) {
  4. return 1;
  5. } else {
  6. return plainRecursion(n -1) + plainRecursion(n - 2);
  7. }
  8. }
  9. @Test
  10. public void testPlain() {
  11. long start = System.currentTimeMillis();
  12. int result = plainRecursion(40);
  13. long end = System.currentTimeMillis();
  14. System.out.println("计算结果:" + result);
  15. System.out.println(String.format("耗时:%d millis", end -start));
  16. }

普通递归的例子输出:

  1. 计算结果:102334155
  2. 耗时:436 millis

通过输出可以很明显的看出来,使用普通递归的效率都要比使用Fork/Join框架要高很多。

这里我们再用另一种思路来计算:

  1. // 通过循环来计算,复杂度为O(n)
  2. private int computeFibonacci(int n) {
  3. // 假设n >= 0
  4. if (n <= 1) {
  5. return n;
  6. } else {
  7. int first = 1;
  8. int second = 1;
  9. int third = 0;
  10. for (int i = 3; i <= n; i ++) {
  11. // 第三个数是前两个数之和
  12. third = first + second;
  13. // 前两个数右移
  14. first = second;
  15. second = third;
  16. }
  17. return third;
  18. }
  19. }
  20. @Test
  21. public void testComputeFibonacci() {
  22. long start = System.currentTimeMillis();
  23. int result = computeFibonacci(40);
  24. long end = System.currentTimeMillis();
  25. System.out.println("计算结果:" + result);
  26. System.out.println(String.format("耗时:%d millis", end -start));
  27. }

上面例子在笔者所用电脑的输出为:

  1. 计算结果:102334155
  2. 耗时:0 millis

这里耗时为0不代表没有耗时,是表明这里计算的耗时几乎可以忽略不计,大家可以在自己的电脑试试,即使是n取大很多量级的数据(注意int溢出的问题)耗时也是很短的,或者可以用System.nanoTime()统计纳秒的时间。

为什么在这里普通的递归或循环效率更快呢?因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。

如果要计算的任务比较简单(比如我们案例中的斐波那契数列),那当然是直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。

另外,Java 8 Stream的并行操作底层就是用到了Fork/Join框架,下一章我们将从源码及案例两方面介绍Java 8 Stream的并行操作。


参考资料