6.3. 同步

虽然多线程的使用可以提高应用程序的性能,但也增加了复杂性。 如果使用线程在同一时间执行几个函数,访问共享资源时必须相应地同步。 一旦应用达到了一定规模,这涉及相当一些工作。 本段介绍了Boost.Thread提供同步线程的类。

  1. #include <boost/thread.hpp>
  2. #include <iostream>
  3.  
  4. void wait(int seconds)
  5. {
  6. boost::this_thread::sleep(boost::posix_time::seconds(seconds));
  7. }
  8.  
  9. boost::mutex mutex;
  10.  
  11. void thread()
  12. {
  13. for (int i = 0; i < 5; ++i)
  14. {
  15. wait(1);
  16. mutex.lock();
  17. std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl;
  18. mutex.unlock();
  19. }
  20. }
  21.  
  22. int main()
  23. {
  24. boost::thread t1(thread);
  25. boost::thread t2(thread);
  26. t1.join();
  27. t2.join();
  28. }

多线程程序使用所谓的互斥对象来同步。 Boost.Thread提供多个的互斥类,boost::mutex是最简单的一个。 互斥的基本原则是当一个特定的线程拥有资源的时候防止其他线程夺取其所有权。 一旦释放,其他的线程可以取得所有权。 这将导致线程等待至另一个线程完成处理一些操作,从而相应地释放互斥对象的所有权。

上面的示例使用一个类型为 boost::mutexmutex 全局互斥对象。 thread() 函数获取此对象的所有权才在 for 循环内使用 lock() 方法写入到标准输出流的。 一旦信息被写入,使用 unlock() 方法释放所有权。

main() 创建两个线程,同时执行 thread ()函数。 利用 for 循环,每个线程数到5,用一个迭代器写一条消息到标准输出流。 不幸的是,标准输出流是一个全局性的被所有线程共享的对象。 该标准不提供任何保证 std::cout 可以安全地从多个线程访问。 因此,访问标准输出流必须同步:在任何时候,只有一个线程可以访问 std::cout

由于两个线程试图在写入标准输出流前获得互斥体,实际上只能保证一次只有一个线程访问 std::cout。 不管哪个线程成功调用 lock() 方法,其他所有线程必须等待,直到 unlock() 被调用。

获取和释放互斥体是一个典型的模式,是由Boost.Thread通过不同的数据类型支持。 例如,不直接地调用 lock()unlock(),使用 boost::lock_guard 类也是可以的。

  1. #include <boost/thread.hpp>
  2. #include <iostream>
  3.  
  4. void wait(int seconds)
  5. {
  6. boost::this_thread::sleep(boost::posix_time::seconds(seconds));
  7. }
  8.  
  9. boost::mutex mutex;
  10.  
  11. void thread()
  12. {
  13. for (int i = 0; i < 5; ++i)
  14. {
  15. wait(1);
  16. boost::lock_guard<boost::mutex> lock(mutex);
  17. std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl;
  18. }
  19. }
  20.  
  21. int main()
  22. {
  23. boost::thread t1(thread);
  24. boost::thread t2(thread);
  25. t1.join();
  26. t2.join();
  27. }

boost::lockguard 在其内部构造和析构函数分别自动调用 lock()unlock() 。 访问共享资源是需要同步的,因为它显示地被两个方法调用。 boost::lock_guard 类是另一个出现在 [第 2 章 智能指针_]($1052d251e6188b67.md) 的RAII用语。

除了boost::mutexboost::lock_guard 之外,Boost.Thread也提供其他的类支持各种同步。 其中一个重要的就是 boost::unique_lock ,相比较 boost::lock_guard 而言,它提供许多有用的方法。

  1. #include <boost/thread.hpp>
  2. #include <iostream>
  3.  
  4. void wait(int seconds)
  5. {
  6. boost::this_thread::sleep(boost::posix_time::seconds(seconds));
  7. }
  8.  
  9. boost::timed_mutex mutex;
  10.  
  11. void thread()
  12. {
  13. for (int i = 0; i < 5; ++i)
  14. {
  15. wait(1);
  16. boost::unique_lock<boost::timed_mutex> lock(mutex, boost::try_to_lock);
  17. if (!lock.owns_lock())
  18. lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1));
  19. std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl;
  20. boost::timed_mutex *m = lock.release();
  21. m->unlock();
  22. }
  23. }
  24.  
  25. int main()
  26. {
  27. boost::thread t1(thread);
  28. boost::thread t2(thread);
  29. t1.join();
  30. t2.join();
  31. }

上面的例子用不同的方法来演示 boost::unique_lock 的功能。 当然了,这些功能的用法对给定的情景不一定适用;boost::lock_guard 在上个例子的用法还是挺合理的。 这个例子就是为了演示 boost::unique_lock 提供的功能。

boost::unique_lock 通过多个构造函数来提供不同的方式获得互斥体。 这个期望获得互斥体的函数简单地调用了 lock() 方法,一直等到获得这个互斥体。 所以它的行为跟 boost::lock_guard 的那个是一样的。

如果第二个参数传入一个 boost::try_to_lock 类型的值,对应的构造函数就会调用 try_lock() 方法。 这个方法返回 bool 型的值:如果能够获得互斥体则返回true,否则返回 false 。 相比 lock() 函数,try_lock() 会立即返回,而且在获得互斥体之前不会被阻塞。

上面的程序向 boost::uniquelock 的构造函数的第二个参数传入_boost::try_to_lock。 然后通过 owns_lock() 可以检查是否可获得互斥体。 如果不能, owns_lock() 返回 false。 这也用到 boost::unique_lock 提供的另外一个函数: timed_lock() 等待一定的时间以获得互斥体。 给定的程序等待长达1秒,应较足够的时间来获取更多的互斥。

其实这个例子显示了三个方法获取一个互斥体:lock() 会一直等待,直到获得一个互斥体。 try_lock() 则不会等待,但如果它只会在互斥体可用的时候才能获得,否则返回 false 。 最后,timed_lock() 试图获得在一定的时间内获取互斥体。 和 try_lock() 一样,返回bool 类型的值意味着成功是否。

虽然 boost::mutex 提供了 lock()try_lock() 两个方法,但是 boost::timed_mutex 只支持 timed_lock() ,这就是上面示例那么使用的原因。 如果不用 timed_lock() 的话,也可以像以前的例子那样用 boost::mutex

就像 boost::lock_guard 一样, boost::unique_lock 的析构函数也会相应地释放互斥量。此外,可以手动地用 unlock() 释放互斥量。也可以像上面的例子那样,通过调用 release() 解除boost::unique_lock 和互斥量之间的关联。然而在这种情况下,必须显式地调用 unlock() 方法来释放互斥量,因为 boost::unique_lock 的析构函数不再做这件事情。

boost::unique_lock 这个所谓的独占锁意味着一个互斥量同时只能被一个线程获取。 其他线程必须等待,直到互斥体再次被释放。 除了独占锁,还有非独占锁。 Boost.Thread里有个 boost::shared_lock 的类提供了非独占锁。 正如下面的例子,这个类必须和 boost::shared_mutex 型的互斥量一起使用。

  1. #include <boost/thread.hpp>
  2. #include <iostream>
  3. #include <vector>
  4. #include <cstdlib>
  5. #include <ctime>
  6.  
  7. void wait(int seconds)
  8. {
  9. boost::this_thread::sleep(boost::posix_time::seconds(seconds));
  10. }
  11.  
  12. boost::shared_mutex mutex;
  13. std::vector<int> random_numbers;
  14.  
  15. void fill()
  16. {
  17. std::srand(static_cast<unsigned int>(std::time(0)));
  18. for (int i = 0; i < 3; ++i)
  19. {
  20. boost::unique_lock<boost::shared_mutex> lock(mutex);
  21. random_numbers.push_back(std::rand());
  22. lock.unlock();
  23. wait(1);
  24. }
  25. }
  26.  
  27. void print()
  28. {
  29. for (int i = 0; i < 3; ++i)
  30. {
  31. wait(1);
  32. boost::shared_lock<boost::shared_mutex> lock(mutex);
  33. std::cout << random_numbers.back() << std::endl;
  34. }
  35. }
  36.  
  37. int sum = 0;
  38.  
  39. void count()
  40. {
  41. for (int i = 0; i < 3; ++i)
  42. {
  43. wait(1);
  44. boost::shared_lock<boost::shared_mutex> lock(mutex);
  45. sum += random_numbers.back();
  46. }
  47. }
  48.  
  49. int main()
  50. {
  51. boost::thread t1(fill);
  52. boost::thread t2(print);
  53. boost::thread t3(count);
  54. t1.join();
  55. t2.join();
  56. t3.join();
  57. std::cout << "Sum: " << sum << std::endl;
  58. }

boost::shared_lock 类型的非独占锁可以在线程只对某个资源读访问的情况下使用。 一个线程修改的资源需要写访问,因此需要一个独占锁。 这样做也很明显:只需要读访问的线程不需要知道同一时间其他线程是否访问。 因此非独占锁可以共享一个互斥体。

在给定的例子, print()count() 都可以只读访问 random_numbers 。 虽然 print() 函数把 random_numbers 里的最后一个数写到标准输出,count() 函数把它统计到 sum 变量。 由于没有函数修改 random_numbers,所有的都可以在同一时间用 boost::shared_lock 类型的非独占锁访问它。

fill() 函数里,需要用一个 boost::uniquelock 类型的非独占锁,因为它插入了一个新的随机数到 _random_numbers。 在 unlock() 显式地调用 unlock() 来释放互斥量之后, fill() 等待了一秒。 相比于之前的那个样子, 在 for 循环的尾部调用 wait() 以保证容器里至少存在一个随机数,可以被print() 或者 count() 访问。 对应地,这两个函数在 for 循环的开始调用了 wait()

考虑到在不同的地方每个单独地调用 wait() ,一个潜在的问题变得很明显:函数调用的顺序直接受CPU执行每个独立进程的顺序决定。 利用所谓的条件变量,可以同步哪些独立的线程,使数组的每个元素都被不同的线程立即添加到 random_numbers

  1. #include <boost/thread.hpp>
  2. #include <iostream>
  3. #include <vector>
  4. #include <cstdlib>
  5. #include <ctime>
  6.  
  7. boost::mutex mutex;
  8. boost::condition_variable_any cond;
  9. std::vector<int> random_numbers;
  10.  
  11. void fill()
  12. {
  13. std::srand(static_cast<unsigned int>(std::time(0)));
  14. for (int i = 0; i < 3; ++i)
  15. {
  16. boost::unique_lock<boost::mutex> lock(mutex);
  17. random_numbers.push_back(std::rand());
  18. cond.notify_all();
  19. cond.wait(mutex);
  20. }
  21. }
  22.  
  23. void print()
  24. {
  25. std::size_t next_size = 1;
  26. for (int i = 0; i < 3; ++i)
  27. {
  28. boost::unique_lock<boost::mutex> lock(mutex);
  29. while (random_numbers.size() != next_size)
  30. cond.wait(mutex);
  31. std::cout << random_numbers.back() << std::endl;
  32. ++next_size;
  33. cond.notify_all();
  34. }
  35. }
  36.  
  37. int main()
  38. {
  39. boost::thread t1(fill);
  40. boost::thread t2(print);
  41. t1.join();
  42. t2.join();
  43. }

这个例子的程序删除了 wait()count() 。线程不用在每个循环迭代中等待一秒,而是尽可能快地执行。此外,没有计算总额;数字完全写入标准输出流。

为确保正确地处理随机数,需要一个允许检查多个线程之间特定条件的条件变量来同步不每个独立的线程。

正如上面所说, fill() 函数用在每个迭代产生一个随机数,然后放在 random_numbers 容器中。 为了防止其他线程同时访问这个容器,就要相应得使用一个排它锁。 不是等待一秒,实际上这个例子却用了一个条件变量。 调用 notify_all() 会唤醒每个哪些正在分别通过调用wait() 等待此通知的线程。

通过查看 print() 函数里的 for 循环,可以看到相同的条件变量被 wait() 函数调用了。 如果这个线程被 notify_all() 唤醒,它就会试图这个互斥量,但只有在 fill() 函数完全释放之后才能成功。

这里的窍门就是调用 wait() 会释放相应的被参数传入的互斥量。 在调用 notify_all()后, fill() 函数会通过 wait() 相应地释放线程。 然后它会阻止和等待其他的线程调用 notify_all() ,一旦随机数已写入标准输出流,这就会在 print() 里发生。

注意到在 print() 函数里调用 wait() 事实上发生在一个单独 while 循环里。 这样做的目的是为了处理在 print() 函数里第一次调用 wait() 函数之前随机数已经放到容器里。 通过比较 random_numbers 里元素的数目与预期值,发现这成功地处理了把随机数写入到标准输出流。