8.5 在实践中设计并发代码

当为一个特殊的任务设计并发代码时,需要根据任务本身来考虑之前所提到的问题。为了展示以上的注意事项是如何应用的,我们将看一下在C++标准库中三个标准函数的并行实现。当你遇到问题时,这里的例子可以作为很好的参照。在有较大的并发任务进行辅助下,我们也将实现一些函数。

我主要演示这些实现使用的技术,不过可能这些技术并不是最先进的;更多优秀的实现可以更好的利用硬件并发,不过这些实现可能需要到与并行算法相关的学术文献,或者是多线程的专家库中(比如:Inter的TBB[4])才能看到。

并行版的std::for_each可以看作为能最直观体现并行概念,就让我们从并行版的std::for_each开始吧!

8.5.1 并行实现:std::for_each

std::for_each的原理很简单:其对某个范围中的元素,依次调用用户提供的函数。并行和串行调用的最大区别就是函数的调用顺序。std::for_each是对范围中的第一个元素调用用户函数,接着是第二个,以此类推,而在并行实现中对于每个元素的处理顺序就不能保证了,并且它们可能(我们希望如此)被并发的处理。

为了实现这个函数的并行版本,需要对每个线程上处理的元素进行划分。你事先知道元素数量,所以可以处理前对数据进行划分(详见8.1.1节)。假设只有并行任务运行,就可以使用std::thread::hardware_concurrency()来决定线程的数量。同样,这些元素都能被独立的处理,所以可以使用连续的数据块来避免伪共享(详见8.2.3节)。

这里的算法有点类似于并行版的std::accumulate(详见8.4.1节),不过比起计算每一个元素的加和,这里对每个元素仅仅使用了一个指定功能的函数。因为不需要返回结果,可以假设这可能会对简化代码,不过想要将异常传递给调用者,就需要使用std::packaged_taskstd::future机制对线程中的异常进行转移。这里展示一个样本实现。

清单8.7 并行版std::for_each

  1. template<typename Iterator,typename Func>
  2. void parallel_for_each(Iterator first,Iterator last,Func f)
  3. {
  4. unsigned long const length=std::distance(first,last);
  5. if(!length)
  6. return;
  7. unsigned long const min_per_thread=25;
  8. unsigned long const max_threads=
  9. (length+min_per_thread-1)/min_per_thread;
  10. unsigned long const hardware_threads=
  11. std::thread::hardware_concurrency();
  12. unsigned long const num_threads=
  13. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
  14. unsigned long const block_size=length/num_threads;
  15. std::vector<std::future<void> > futures(num_threads-1); // 1
  16. std::vector<std::thread> threads(num_threads-1);
  17. join_threads joiner(threads);
  18. Iterator block_start=first;
  19. for(unsigned long i=0;i<(num_threads-1);++i)
  20. {
  21. Iterator block_end=block_start;
  22. std::advance(block_end,block_size);
  23. std::packaged_task<void(void)> task( // 2
  24. [=]()
  25. {
  26. std::for_each(block_start,block_end,f);
  27. });
  28. futures[i]=task.get_future();
  29. threads[i]=std::thread(std::move(task)); // 3
  30. block_start=block_end;
  31. }
  32. std::for_each(block_start,last,f);
  33. for(unsigned long i=0;i<(num_threads-1);++i)
  34. {
  35. futures[i].get(); // 4
  36. }
  37. }

代码结构与清单8.4的差不多。最重要的不同在于futures向量对std::future<void>类型①变量进行存储,因为工作线程不会返回值,并且简单的lambda函数会对block_start到block_end上的任务②执行f函数。这是为了避免传入线程的构造函数③。当工作线程不需要返回一个值时,调用futures[i].get()④只是提供检索工作线程异常的方法;如果不想把异常传递出去,就可以省略这一步。

实现并行std::accumulate的时候,使用std::async会简化代码;同样,parallel_for_each也可以使用std::async。实现如下所示。

清单8.8 使用std::async实现std::for_each

  1. template<typename Iterator,typename Func>
  2. void parallel_for_each(Iterator first,Iterator last,Func f)
  3. {
  4. unsigned long const length=std::distance(first,last);
  5. if(!length)
  6. return;
  7. unsigned long const min_per_thread=25;
  8. if(length<(2*min_per_thread))
  9. {
  10. std::for_each(first,last,f); // 1
  11. }
  12. else
  13. {
  14. Iterator const mid_point=first+length/2;
  15. std::future<void> first_half= // 2
  16. std::async(&parallel_for_each<Iterator,Func>,
  17. first,mid_point,f);
  18. parallel_for_each(mid_point,last,f); // 3
  19. first_half.get(); // 4
  20. }
  21. }

和基于std::async的parallel_accumulate(清单8.5)一样,是在运行时对数据进行迭代划分的,而非在执行前划分好,这是因为你不知道你的库需要使用多少个线程。像之前一样,当你将每一级的数据分成两部分,异步执行另外一部分②,剩下的部分就不能再进行划分了,所以直接运行这一部分③;这样就可以直接对std::for_each①进行使用了。这里再次使用std::asyncstd::future的get()成员函数④来提供对异常的传播。

回到算法,函数需要对每一个元素执行同样的操作(这样的操作有很多种,初学者可能会想到std::countstd::replace),一个稍微复杂一些的例子就是使用std::find

8.5.2 并行实现:std::find

接下来是std::find算法,因为这是一种不需要对数据元素做任何处理的算法。比如,当第一个元素就满足查找标准,那就没有必要对其他元素进行搜索了。将会看到,算法属性对于性能具有很大的影响,并且对并行实现的设计有着直接的影响。这个算法是一个很特别的例子,数据访问模式都会对代码的设计产生影响(详见8.3.2节)。该类中的另一些算法包括std::equalstd::any_of

当你和妻子或者搭档,在一个纪念盒中找寻一张老照片,当找到这张照片时,就不会再看另外的照片了。不过,你得让其他人知道你已经找到照片了(比如,大喊一声“找到了!”),这样其他人就会停止搜索了。很多算法的特性就是要对每一个元素进行处理,所以它们没有办法像std::find一样,一旦找到合适数据就停止执行。因此,你需要设计代码对其进行使用——当得到想要的答案就中断其他任务的执行,所以不能等待线程处理对剩下的元素进行处理。

如果不中断其他线程,那么串行版本的性能可能会超越并行版,因为串行算法可以在找到匹配元素的时候,停止搜索并返回。如果系统能支持四个并发线程,那么每个线程就可以对总数据量的1/4进行检查,并且在我们的实现只需要单核完成的1/4的时间,就能完成对所有元素的查找。如果匹配的元素在第一个1/4块中,串行算法将会返回第一个,因为算法不需要对剩下的元素进行处理了。

一种办法,中断其他线程的一个办法就是使用一个原子变量作为一个标识,在处理过每一个元素后就对这个标识进行检查。如果标识被设置,那么就有线程找到了匹配元素,所以算法就可以停止并返回了。用这种方式来中断线程,就可以将那些没有处理的数据保持原样,并且在更多的情况下,相较于串行方式,性能能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍每个线程的运行。

如何返回值和传播异常呢?现在你有两个选择。你可以使用一个future数组,使用std::packaged_task来转移值和异常,在主线程上对返回值和异常进行处理;或者使用std::promise对工作线程上的最终结果直接进行设置。这完全依赖于你想怎么样处理工作线程上的异常。如果想停止第一个异常(即使还没有对所有元素进行处理),就可以使用std::promise对异常和最终值进行设置。另外,如果想要让其他工作线程继续查找,可以使用std::packaged_task来存储所有的异常,当线程没有找到匹配元素时,异常将再次抛出。

这种情况下,我会选择std::promise,因为其行为和std::find更为接近。这里需要注意一下搜索的元素是不是在提供的搜索范围内。因此,在所有线程结束前,获取future上的结果。如果被future阻塞住,所要查找的值不在范围内,就会持续的等待下去。实现代码如下。

清单8.9 并行find算法实现

  1. template<typename Iterator,typename MatchType>
  2. Iterator parallel_find(Iterator first,Iterator last,MatchType match)
  3. {
  4. struct find_element // 1
  5. {
  6. void operator()(Iterator begin,Iterator end,
  7. MatchType match,
  8. std::promise<Iterator>* result,
  9. std::atomic<bool>* done_flag)
  10. {
  11. try
  12. {
  13. for(;(begin!=end) && !done_flag->load();++begin) // 2
  14. {
  15. if(*begin==match)
  16. {
  17. result->set_value(begin); // 3
  18. done_flag->store(true); // 4
  19. return;
  20. }
  21. }
  22. }
  23. catch(...) // 5
  24. {
  25. try
  26. {
  27. result->set_exception(std::current_exception()); // 6
  28. done_flag->store(true);
  29. }
  30. catch(...) // 7
  31. {}
  32. }
  33. }
  34. };
  35. unsigned long const length=std::distance(first,last);
  36. if(!length)
  37. return last;
  38. unsigned long const min_per_thread=25;
  39. unsigned long const max_threads=
  40. (length+min_per_thread-1)/min_per_thread;
  41. unsigned long const hardware_threads=
  42. std::thread::hardware_concurrency();
  43. unsigned long const num_threads=
  44. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
  45. unsigned long const block_size=length/num_threads;
  46. std::promise<Iterator> result; // 8
  47. std::atomic<bool> done_flag(false); // 9
  48. std::vector<std::thread> threads(num_threads-1);
  49. { // 10
  50. join_threads joiner(threads);
  51. Iterator block_start=first;
  52. for(unsigned long i=0;i<(num_threads-1);++i)
  53. {
  54. Iterator block_end=block_start;
  55. std::advance(block_end,block_size);
  56. threads[i]=std::thread(find_element(), // 11
  57. block_start,block_end,match,
  58. &result,&done_flag);
  59. block_start=block_end;
  60. }
  61. find_element()(block_start,last,match,&result,&done_flag); // 12
  62. }
  63. if(!done_flag.load()) //13
  64. {
  65. return last;
  66. }
  67. return result.get_future().get(); // 14
  68. }

清单8.9中的函数主体与之前的例子相似。这次,由find_element类①的函数调用操作实现,来完成查找工作的。循环通过在给定数据块中的元素,检查每一步上的标识②。如果匹配的元素被找到,就将最终的结果设置到promise③当中,并且在返回前对done_flag④进行设置。

如果有一个异常被抛出,那么它就会被通用处理代码⑤捕获,并且在promise⑥尝中试存储前,对done_flag进行设置。如果对应promise已经被设置,设置在promise上的值可能会抛出一个异常,所以这里⑦发生的任何异常,都可以捕获并丢弃。

这意味着,当线程调用find_element查询一个值,或者抛出一个异常时,如果其他线程看到done_flag被设置,那么其他线程将会终止。如果多线程同时找到匹配值或抛出异常,它们将会对promise产生竞争。不过,这是良性的条件竞争;因为,成功的竞争者会作为“第一个”返回线程,因此这个结果可以接受。

回到parallel_find函数本身,其拥有用来停止搜索的promise⑧和标识⑨;随着对范围内的元素的查找⑪,promise和标识会传递到新线程中。主线程也使用find_element来对剩下的元素进行查找⑫。像之前提到的,需要在全部线程结束前,对结果进行检查,因为结果可能是任意位置上的匹配元素。这里将“启动-汇入”代码放在一个块中⑩,所以所有线程都会在找到匹配元素时⑬进行汇入。如果找到匹配元素,就可以调用std::future<Iterator>(来自promise⑭)的成员函数get()来获取返回值或异常。

不过,这里假设你会使用硬件上所有可用的的并发线程,或使用其他机制对线程上的任务进行提前划分。就像之前一样,可以使用std::async,以及递归数据划分的方式来简化实现(同时使用C++标准库中提供的自动缩放工具)。使用std::async的parallel_find实现如下所示。

清单8.10 使用std::async实现的并行find算法

  1. template<typename Iterator,typename MatchType> // 1
  2. Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
  3. std::atomic<bool>& done)
  4. {
  5. try
  6. {
  7. unsigned long const length=std::distance(first,last);
  8. unsigned long const min_per_thread=25; // 2
  9. if(length<(2*min_per_thread)) // 3
  10. {
  11. for(;(first!=last) && !done.load();++first) // 4
  12. {
  13. if(*first==match)
  14. {
  15. done=true; // 5
  16. return first;
  17. }
  18. }
  19. return last; // 6
  20. }
  21. else
  22. {
  23. Iterator const mid_point=first+(length/2); // 7
  24. std::future<Iterator> async_result=
  25. std::async(&parallel_find_impl<Iterator,MatchType>, // 8
  26. mid_point,last,match,std::ref(done));
  27. Iterator const direct_result=
  28. parallel_find_impl(first,mid_point,match,done); // 9
  29. return (direct_result==mid_point)?
  30. async_result.get():direct_result; // 10
  31. }
  32. }
  33. catch(...)
  34. {
  35. done=true; // 11
  36. throw;
  37. }
  38. }
  39. template<typename Iterator,typename MatchType>
  40. Iterator parallel_find(Iterator first,Iterator last,MatchType match)
  41. {
  42. std::atomic<bool> done(false);
  43. return parallel_find_impl(first,last,match,done); // 12
  44. }

如果想要在找到匹配项时结束,就需要在线程之间设置一个标识来表明匹配项已经被找到。因此,需要将这个标识递归的传递。通过函数①的方式来实现是最简单的办法,只需要增加一个参数——一个done标识的引用,这个表示通过程序的主入口点传入⑫。

核心实现和之前的代码一样。通常函数的实现中,会让单个线程处理最少的数据项②;如果数据块大小不足于分成两半,就要让当前线程完成所有的工作了③。实际算法在一个简单的循环当中(给定范围),直到在循环到指定范围中的最后一个,或找到匹配项,并对标识进行设置④。如果找到匹配项,标识done就会在返回前进行设置⑤。无论是因为已经查找到最后一个,还是因为其他线程对done进行了设置,都会停止查找。如果没有找到,会将最后一个元素last进行返回⑥。

如果给定范围可以进行划分,首先要在st::async在对第二部分进行查找⑧前,要找数据中点⑦,而且需要使用std::ref将done以引用的方式传递。同时,可以通过对第一部分直接进行递归查找。两部分都是异步的,并且在原始范围过大时,直接递归查找的部分可能会再细化。

如果直接查找返回的是mid_point,这就意味着没有找到匹配项,所以就要从异步查找中获取结果。如果在另一半中没有匹配项的话,返回的结果就一定是last,这个值的返回就代表了没有找到匹配的元素⑩。如果“异步”调用被延迟(非真正的异步),那么实际上这里会运行get();这种情况下,如果对下半部分的元素搜索成功,那么就不会执行对上半部分元素的搜索了。如果异步查找真实的运行在其他线程上,那么async_result变量的析构函数将会等待该线程完成,所以这里不会有线程泄露。

像之前一样,std::async可以用来提供“异常-安全”和“异常-传播”特性。如果直接递归抛出异常,future的析构函数就能让异步执行的线程提前结束;如果异步调用抛出异常,那么这个异常将会通过对get()成员函数的调用进行传播⑩。使用try/catch块只能捕捉在done发生的异常,并且当有异常抛出⑪时,所有线程都能很快的终止运行。不过,不使用try/catch的实现依旧没问题,不同的就是要等待所有线程的工作是否完成。

实现中一个重要的特性就是,不能保证所有数据都能被std::find串行处理。其他并行算法可以借鉴这个特性,因为要让一个算法并行起来这是必须具有的特性。如果有顺序问题,元素就不能并发的处理了。如果每个元素独立,虽然对于parallel_for_each不是很重要,不过对于parallel_find,即使在开始部分已经找到了匹配元素,也有可能返回范围中最后一个元素;如果在知道结果的前提下,这样的结果会让人很惊讶。

OK,现在你已经使用了并行化的std::find。如在本节开始说的那样,其他相似算法不需要对每一个数据元素进行处理,并且同样的技术可以使用到这些类似的算法上去。我们将在第9章中看到“中断线程”的问题。

为了完成我们的并行“三重奏”,我们将换一个角度来看一下std::partial_sum。对于这个算法,没有太多的文献可参考,不过让这个算法并行起来是一件很有趣的事。

8.5.3 并行实现:std::partial_sum

std::partial_sum会计算给定范围中的每个元素,并用计算后的结果将原始序列中的值替换掉。比如,有一个序列[1,2,3,4,5],在执行该算法后会成为:[1,3(1+2),6(1+2+3),10(1+2+3+4),15(1+2+3+4+5)]。让这样一个算法并行起来会很有趣,因为这里不能讲任务分块,对每一块进行独立的计算。比如,原始序列中的第一个元素需要加到后面的一个元素中去。

确定某个范围部分和的一种的方式,就是在独立块中计算部分和,然后将第一块中最后的元素的值,与下一块中的所有元素进行相加,依次类推。如果有个序列[1,2,3,4,5,6,7,8,9],然后将其分为三块,那么在第一次计算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后将6(第一块的最后一个元素)加到第二个块中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再将第二块的最后一个元素21加到第三块中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。

将原始数据分割成块,加上之前块的部分和就能够并行了。如果每个块中的末尾元素都是第一个被更新的,那么块中其他的元素就能被其他线程所更新,同时另一个线程对下一块进行更新,等等。当处理的元素比处理核心的个数多的时候,这样完成工作没问题,因为每一个核芯在每一个阶段都有合适的数据可以进行处理。

如果有很多的处理器(就是要比处理的元素个数多),那么之前的方式就无法正常工作了。如果还是将工作划分给每个处理器,那么在第一步就没必要去做了。这种情况下,传递结果就意味着让处理器进行等待,这时需要给这些处于等待中的处理器一些工作。所以,可以采用完全不同的方式来处理这个问题。比起将数据块中的最后一个元素的结果向后面的元素块传递,可以对部分结果进行传播:第一次与相邻的元素(距离为1)相加和(和之前一样),之后和距离为2的元素相加,在后来和距离为4的元素相加,以此类推。比如,初始序列为[1,2,3,4,5,6,7,8,9],第一次后为[1,3,5,7,9,11,13,15,17],第二次后为[1,3,6,10,14,18, 22,26,30],下一次就要隔4个元素了。第三次后[1, 3, 6, 10, 15, 21, 28, 36, 44],下一次就要隔8个元素了。第四次后[1, 3, 6, 10, 15, 21, 28, 36, 45],这就是最终的结果。虽然,比起第一种方法多了很多步骤,不过在可并发平台下,这种方法提高了并行的可行性;每个处理器可在每一步中处理一个数据项。

总体来说,当有N个操作时(每步使用一个处理器)第二种方法需要log(N)[底为2]步;在本节中,N就相当于数据链表的长度。比起第一种,每个线程对分配块做N/k个操作,然后在做N/k次结果传递(这里的k是线程的数量)。因此,第一种方法的时间复杂度为O(N),不过第二种方法的时间复杂度为Q(Nlog(N))。当数据量和处理器数量相近时,第二种方法需要每个处理器上log(N)个操作,第一种方法中每个处理器上执行的操作数会随着k的增加而增多,因为需要对结果进行传递。对于处理单元较少的情况,第一种方法会比较合适;对于大规模并行系统,第二种方法比较合适。

不管怎么样,先将效率问题放一边,让我们来看一些代码。下面清单实现的,就是第一种方法。

清单8.11 使用划分的方式来并行的计算部分和

  1. template<typename Iterator>
  2. void parallel_partial_sum(Iterator first,Iterator last)
  3. {
  4. typedef typename Iterator::value_type value_type;
  5. struct process_chunk // 1
  6. {
  7. void operator()(Iterator begin,Iterator last,
  8. std::future<value_type>* previous_end_value,
  9. std::promise<value_type>* end_value)
  10. {
  11. try
  12. {
  13. Iterator end=last;
  14. ++end;
  15. std::partial_sum(begin,end,begin); // 2
  16. if(previous_end_value) // 3
  17. {
  18. value_type& addend=previous_end_value->get(); // 4
  19. *last+=addend; // 5
  20. if(end_value)
  21. {
  22. end_value->set_value(*last); // 6
  23. }
  24. std::for_each(begin,last,[addend](value_type& item) // 7
  25. {
  26. item+=addend;
  27. });
  28. }
  29. else if(end_value)
  30. {
  31. end_value->set_value(*last); // 8
  32. }
  33. }
  34. catch(...) // 9
  35. {
  36. if(end_value)
  37. {
  38. end_value->set_exception(std::current_exception()); // 10
  39. }
  40. else
  41. {
  42. throw; // 11
  43. }
  44. }
  45. }
  46. };
  47. unsigned long const length=std::distance(first,last);
  48. if(!length)
  49. return last;
  50. unsigned long const min_per_thread=25; // 12
  51. unsigned long const max_threads=
  52. (length+min_per_thread-1)/min_per_thread;
  53. unsigned long const hardware_threads=
  54. std::thread::hardware_concurrency();
  55. unsigned long const num_threads=
  56. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
  57. unsigned long const block_size=length/num_threads;
  58. typedef typename Iterator::value_type value_type;
  59. std::vector<std::thread> threads(num_threads-1); // 13
  60. std::vector<std::promise<value_type> >
  61. end_values(num_threads-1); // 14
  62. std::vector<std::future<value_type> >
  63. previous_end_values; // 15
  64. previous_end_values.reserve(num_threads-1); // 16
  65. join_threads joiner(threads);
  66. Iterator block_start=first;
  67. for(unsigned long i=0;i<(num_threads-1);++i)
  68. {
  69. Iterator block_last=block_start;
  70. std::advance(block_last,block_size-1); // 17
  71. threads[i]=std::thread(process_chunk(), // 18
  72. block_start,block_last,
  73. (i!=0)?&previous_end_values[i-1]:0,
  74. &end_values[i]);
  75. block_start=block_last;
  76. ++block_start; // 19
  77. previous_end_values.push_back(end_values[i].get_future()); // 20
  78. }
  79. Iterator final_element=block_start;
  80. std::advance(final_element,std::distance(block_start,last)-1); // 21
  81. process_chunk()(block_start,final_element, // 22
  82. (num_threads>1)?&previous_end_values.back():0,
  83. 0);
  84. }

这个实现中,使用的结构体和之前算法中的一样,将问题进行分块解决,每个线程处理最小的数据块⑫。其中,有一组线程⑬和一组promise⑭,用来存储每块中的最后一个值;并且实现中还有一组future⑮,用来对前一块中的最后一个值进行检索。可以为future⑯做些储备,以避免生成新线程时,再分配内存。

主循环和之前一样,不过这次是让迭代器指向了每个数据块的最后一个元素,而不是作为一个普通值传递到最后⑰,这样就方便向其他块传递当前块的最后一个元素了。实际处理是在process_chunk函数对象中完成的,这个结构体看上去不是很长;当前块的开始和结束迭代器和前块中最后一个值的future一起,作为参数进行传递,并且promise用来保留当前范围内最后一个值的原始值⑱。

生成新的线程后,就对开始块的ID进行更新,别忘了传递最后一个元素⑲,并且将当前块的最后一个元素存储到future,上面的数据将在循环中再次使用到⑳。

在处理最后一个数据块前,需要获取之前数据块中最后一个元素的迭代器(21),这样就可以将其作为参数传入process_chunk(22)中了。std::partial_sum不会返回一个值,所以在最后一个数据块被处理后,就不用再做任何事情了。当所有线程的操作完成时,求部分和的操作也就算完成了。

OK,现在来看一下process_chunk函数对象①。对于整块的处理是始于对std::partial_sum的调用,包括对于最后一个值的处理②,不过得要知道当前块是否是第一块③。如果当前块不是第一块,就会有一个previous_end_value值从前面的块传过来,所以这里需要等待这个值的产生④。为了将算法最大程度的并行,首先需要对最后一个元素进行更新⑤,这样你就能将这个值传递给下一个数据块(如果有下一个数据块的话)⑥。当完成这个操作,就可以使用std::for_each和简单的lambda函数⑦对剩余的数据项进行更新。

如果previous_end_value值为空,当前数据块就是第一个数据块,所以只需要为下一个数据块更新end_value⑧(如果有下一个数据块的话——当前数据块可能是唯一的数据块)。

最后,如果有任意一个操作抛出异常,就可以将其捕获⑨,并且存入promise⑩,如果下一个数据块尝试获取前一个数据块的最后一个值④时,异常会再次抛出。处理最后一个数据块时,异常会全部重新抛出⑪,因为抛出动作一定会在主线程上进行。

因为线程间需要同步,这里的代码就不容易使用std::async重写。任务等待会让线程中途去执行其他的任务,所以所有的任务必须同时执行。

基于块,以传递末尾元素值的方法就介绍到这里,让我们来看一下第二种计算方式。

实现以2的幂级数为距离部分和算法

第二种算法通过增加距离的方式,让更多的处理器充分发挥作用。在这种情况下,没有进一步同步的必要了,因为所有中间结果都直接传递到下一个处理器上去了。不过,在实际中我们很少见到,单个处理器处理对一定数量的元素执行同一条指令,这种方式成为单指令-多数据流(SIMD)。因此,代码必须能处理通用情况,并且需要在每步上对线程进行显式同步。

完成这种功能的一种方式是使用栅栏(barrier)——一种同步机制:只有所有线程都到达栅栏处,才能进行之后的操作;先到达的线程必须等待未到达的线程。C++11标准库没有直接提供这样的工具,所以你得自行设计一个。

试想游乐场中的过山车。如果有适量的游客在等待,那么过山车管理员就要保证,在过山车启动前,每一个位置都得坐一个游客。栅栏的工作原理也一样:你已经知道了“座位”的数量,线程就是要等待所有“座位”都坐满。当等待线程够数,那么它们可以继续运行;这时,栅栏会重置,并且会让下一拨线程开始扥带。通常,会在循环中这样做,当同一个线程再次到达栅栏处,它会再次等待。这种方法是为了让线程同步,所以不会有线程在其他未完成的情况下,就去完成下一个任务。如果有线程提前执行,对于这样一个算法,就是一场灾难,因为提前出发的线程可能会修改要被其他线程使用到的数据,后面线程获取到的数据就不是正确数据了。

下面的代码就简单的实现了一个栅栏。

清单8.12 简单的栅栏类

  1. class barrier
  2. {
  3. unsigned const count;
  4. std::atomic<unsigned> spaces;
  5. std::atomic<unsigned> generation;
  6. public:
  7. explicit barrier(unsigned count_): // 1
  8. count(count_),spaces(count),generation(0)
  9. {}
  10. void wait()
  11. {
  12. unsigned const my_generation=generation; // 2
  13. if(!--spaces) // 3
  14. {
  15. spaces=count; // 4
  16. ++generation; // 5
  17. }
  18. else
  19. {
  20. while(generation==my_generation) // 6
  21. std::this_thread::yield(); // 7
  22. }
  23. }
  24. };

这个实现中,用一定数量的“座位”构造了一个barrier①,这个数量将会存储count变量中。起初,栅栏中的spaces与count数量相当。当有线程都在等待时,spaces的数量就会减少③。当spaces的数量减到0时,spaces的值将会重置为count④,并且generation变量会增加,以向线程发出信号,让这些等待线程能够继续运行⑤。如果spaces没有到达0,那么线程会继续等待。这个实现使用了一个简单的自旋锁⑥,对generation的检查会在wait()开始的时候进行②。因为generation只会在所有线程都到达栅栏的时候更新⑤,在等待的时候使用yield()⑦就不会让CPU处于忙等待的状态。

这个实现比较“简单”的真实意义:使用自旋等待的情况下,如果让线程等待很长时间就不会很理想,并且如果超过count数量的线程对wait()进行调用,这个实现就没有办法工作了。如果想要很好的处理这样的情况,必须使用一个更加健壮(更加复杂)的实现。我依旧坚持对原子变量操作顺序的一致性,因为这会让事情更加简单,不过有时还是需要放松这样的约束。全局同步对于大规模并行架构来说是消耗巨大的,因为相关处理器会穿梭于存储栅栏状态的缓存行中(可见8.2.2中对乒乓缓存的讨论),所以需要格外的小心,来确保使用的是最佳同步方法。

不论怎么样,这些都需要你考虑到;需要有固定数量的线程执行同步循环。好吧,大多数情况下线程数量都是固定的。你可能还记得,代码起始部分的几个数据项,只需要几步就能得到其最终值。这就意味着,无论是让所有线程循环处理范围内的所有元素,还是让栅栏来同步线程,都会递减count的值。我会选择后者,因为其能避免线程做不必要的工作,仅仅是等待最终步骤完成。

这意味着你要将count改为一个原子变量,这样在多线程对其进行更新的时候,就不需要添加额外的同步:

  1. std::atomic<unsigned> count;

初始化保持不变,不过当spaces的值被重置后,你需要显式的对count进行load()操作:

  1. spaces=count.load();

这就是要对wait()函数的改动;现在需要一个新的成员函数来递减count。这个函数命名为done_waiting(),因为当一个线程完成其工作,并在等待的时候,才能对其进行调用它:

  1. void done_waiting()
  2. {
  3. --count; // 1
  4. if(!--spaces) // 2
  5. {
  6. spaces=count.load(); // 3
  7. ++generation;
  8. }
  9. }

实现中,首先要减少count①,所以下一次spaces将会被重置为一个较小的数。然后,需要递减spaces的值②。如果不做这些操作,有些线程将会持续等待,因为spaces被旧的count初始化,大于期望值。一组当中最后一个线程需要对计数器进行重置,并且递增generation的值③,就像在wait()里面做的那样。最重要的区别:最后一个线程不需要等待。当最后一个线程结束,整个等待也就随之结束!

现在就准备开始写部分和的第二个实现吧。在每一步中,每一个线程都在栅栏出调用wait(),来保证线程所处步骤一致,并且当所有线程都结束,那么最后一个线程会调用done_waiting()来减少count的值。如果使用两个缓存对原始数据进行保存,栅栏也可以提供你所需要的同步。每一步中,线程都会从原始数据或是缓存中读取数据,并且将新值写入对应位置。如果有线程先从原始数据处获取数据,那下一步就从缓存上获取数据(或相反)。这就能保证在读与写都是由独立线程完成,并不存在条件竞争。当线程结束等待循环,就能保证正确的值最终被写入到原始数据当中。下面的代码就是这样的实现。

清单8.13 通过两两更新对的方式实现partial_sum

  1. struct barrier
  2. {
  3. std::atomic<unsigned> count;
  4. std::atomic<unsigned> spaces;
  5. std::atomic<unsigned> generation;
  6. barrier(unsigned count_):
  7. count(count_),spaces(count_),generation(0)
  8. {}
  9. void wait()
  10. {
  11. unsigned const gen=generation.load();
  12. if(!--spaces)
  13. {
  14. spaces=count.load();
  15. ++generation;
  16. }
  17. else
  18. {
  19. while(generation.load()==gen)
  20. {
  21. std::this_thread::yield();
  22. }
  23. }
  24. }
  25. void done_waiting()
  26. {
  27. --count;
  28. if(!--spaces)
  29. {
  30. spaces=count.load();
  31. ++generation;
  32. }
  33. }
  34. };
  35. template<typename Iterator>
  36. void parallel_partial_sum(Iterator first,Iterator last)
  37. {
  38. typedef typename Iterator::value_type value_type;
  39. struct process_element // 1
  40. {
  41. void operator()(Iterator first,Iterator last,
  42. std::vector<value_type>& buffer,
  43. unsigned i,barrier& b)
  44. {
  45. value_type& ith_element=*(first+i);
  46. bool update_source=false;
  47. for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
  48. {
  49. value_type const& source=(step%2)? // 2
  50. buffer[i]:ith_element;
  51. value_type& dest=(step%2)?
  52. ith_element:buffer[i];
  53. value_type const& addend=(step%2)? // 3
  54. buffer[i-stride]:*(first+i-stride);
  55. dest=source+addend; // 4
  56. update_source=!(step%2);
  57. b.wait(); // 5
  58. }
  59. if(update_source) // 6
  60. {
  61. ith_element=buffer[i];
  62. }
  63. b.done_waiting(); // 7
  64. }
  65. };
  66. unsigned long const length=std::distance(first,last);
  67. if(length<=1)
  68. return;
  69. std::vector<value_type> buffer(length);
  70. barrier b(length);
  71. std::vector<std::thread> threads(length-1); // 8
  72. join_threads joiner(threads);
  73. Iterator block_start=first;
  74. for(unsigned long i=0;i<(length-1);++i)
  75. {
  76. threads[i]=std::thread(process_element(),first,last, // 9
  77. std::ref(buffer),i,std::ref(b));
  78. }
  79. process_element()(first,last,buffer,length-1,b); // 10
  80. }

代码的整体结构应该不用说了。process_element类有函数调用操作可以用来做具体的工作①,就是运行一组线程⑨,并将线程存储到vector中⑧,同样还需要在主线程中对其进行调用⑩。这里与之前最大的区别就是,线程的数量是根据列表中的数据量来定的,而非根据std::thread::hardware_concurrency。如我之前所说,除非你使用的是一个大规模并行的机器,因为这上面的线程都十分廉价(虽然这样的方式并不是很好),还能为我们展示了其整体结构。这个结构在有较少线程的时候,每一个线程只能处理源数据中的部分数据,当没有足够的线程支持该结构时,效率要比传递算法低。

不管怎样,主要的工作都是调用process_element的函数操作符来完成的。每一步,都会从原始数据或缓存中获取第i个元素②,并且将获取到的元素加到指定stride的元素中去③,如果从原始数据开始读取的元素,加和后的数需要存储在缓存中④。然后,在开始下一步前,会在栅栏处等待⑤。当stride超出了给定数据的范围,当最终结果已经存在缓存中时,就需要更新原始数据中的数据,同样这也意味着本次加和结束。最后,在调用栅栏中的done_waiting()函数⑦。

注意这个解决方案并不是异常安全的。如果某个线程在process_element执行时抛出一个异常,其就会终止整个应用。这里可以使用一个std::promise来存储异常,就像在清单8.9中parallel_find的实现,或仅使用一个被互斥量保护的std::exception_ptr即可。

总结下这三个例子。希望其能保证我们了解8.1、8.2、8.3和8.4节中提到的设计考量,并且证明了这些技术在真实的代码中,需要承担些什么责任。


[4] http://threadingbuildingblocks.org/