使用ForkJoin


Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。

我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:

  1. ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
  2. └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:

  1. ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
  2. └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
  3. ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
  4. └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:

  1. ┌─┬─┬─┬─┬─┬─┐
  2. └─┴─┴─┴─┴─┴─┘
  3. ┌─┬─┬─┬─┬─┬─┐
  4. └─┴─┴─┴─┴─┴─┘
  5. ┌─┬─┬─┬─┬─┬─┐
  6. └─┴─┴─┴─┴─┴─┘
  7. ┌─┬─┬─┬─┬─┬─┐
  8. └─┴─┴─┴─┴─┴─┘

这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。

我们来看如何使用Fork/Join对大数据进行并行求和:

使用ForkJoin - 图1

观察上述代码的执行过程,一个大的计算任务0~2000首先分裂为两个小任务0~1000和1000~2000,这两个小任务仍然太大,继续分裂为更小的0~500,500~1000,1000~1500,1500~2000,最后,计算结果被依次合并,得到最终结果。

因此,核心代码SumTask继承自RecursiveTask,在compute()方法中,关键是如何“分裂”出子任务并且提交子任务:

  1. class SumTask extends RecursiveTask<Long> {
  2. protected Long compute() {
  3. // “分裂”子任务:
  4. SumTask subtask1 = new SumTask(...);
  5. SumTask subtask2 = new SumTask(...);
  6. // invokeAll会并行运行两个子任务:
  7. invokeAll(subtask1, subtask2);
  8. // 获得子任务的结果:
  9. Long subresult1 = subtask1.join();
  10. Long subresult2 = subtask2.join();
  11. // 汇总结果:
  12. return subresult1 + subresult2;
  13. }
  14. }

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。

练习

使用ForkJoin - 图2下载练习:使用Fork/Join (推荐使用IDE练习插件快速下载)

小结

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率。

读后有收获可以支付宝请作者喝咖啡:

使用ForkJoin - 图3