消息传递框架与完整的ATM示例

ATM:自动取款机。

1回到第4章,我举了一个使用消息传递框架在线程间发送信息的例子。这里就会使用这个实现来完成ATM功能。下面完整代码就是功能的实现,包括消息传递框架。

清单C.1实现了一个消息队列。其可以将消息以指针(指向基类)的方式存储在列表中;指定消息类型会由基类派生模板进行处理。推送包装类的构造实例,以及存储指向这个实例的指针;弹出实例的时候,将会返回指向其的指针。因为message_base类没有任何成员函数,在访问存储消息之前,弹出线程就需要将指针转为wrapped_message指针。

清单C.1 简单的消息队列

  1. #include <mutex>
  2. #include <condition_variable>
  3. #include <queue>
  4. #include <memory>
  5. namespace messaging
  6. {
  7. struct message_base // 队列项的基础类
  8. {
  9. virtual ~message_base()
  10. {}
  11. };
  12. template<typename Msg>
  13. struct wrapped_message: // 每个消息类型都需要特化
  14. message_base
  15. {
  16. Msg contents;
  17. explicit wrapped_message(Msg const& contents_):
  18. contents(contents_)
  19. {}
  20. };
  21. class queue // 我们的队列
  22. {
  23. std::mutex m;
  24. std::condition_variable c;
  25. std::queue<std::shared_ptr<message_base> > q; // 实际存储指向message_base类指针的队列
  26. public:
  27. template<typename T>
  28. void push(T const& msg)
  29. {
  30. std::lock_guard<std::mutex> lk(m);
  31. q.push(std::make_shared<wrapped_message<T> >(msg)); // 包装已传递的信息,存储指针
  32. c.notify_all();
  33. }
  34. std::shared_ptr<message_base> wait_and_pop()
  35. {
  36. std::unique_lock<std::mutex> lk(m);
  37. c.wait(lk,[&]{return !q.empty();}); // 当队列为空时阻塞
  38. auto res=q.front();
  39. q.pop();
  40. return res;
  41. }
  42. };
  43. }

发送通过sender类(见清单C.2)实例处理过的消息。只能对已推送到队列中的消息进行包装。对sender实例的拷贝,只是拷贝了指向队列的指针,而非队列本身。

清单C.2 sender类

  1. namespace messaging
  2. {
  3. class sender
  4. {
  5. queue*q; // sender是一个队列指针的包装类
  6. public:
  7. sender(): // sender无队列(默认构造函数)
  8. q(nullptr)
  9. {}
  10. explicit sender(queue*q_): // 从指向队列的指针进行构造
  11. q(q_)
  12. {}
  13. template<typename Message>
  14. void send(Message const& msg)
  15. {
  16. if(q)
  17. {
  18. q->push(msg); // 将发送信息推送给队列
  19. }
  20. }
  21. };
  22. }

接收信息部分有些麻烦。不仅要等待队列中的消息,还要检查消息类型是否与所等待的消息类型匹配,并调用处理函数进行处理。那么就从receiver类的实现开始吧。

清单C.3 receiver类

  1. namespace messaging
  2. {
  3. class receiver
  4. {
  5. queue q; // 接受者拥有对应队列
  6. public:
  7. operator sender() // 允许将类中队列隐式转化为一个sender队列
  8. {
  9. return sender(&q);
  10. }
  11. dispatcher wait() // 等待对队列进行调度
  12. {
  13. return dispatcher(&q);
  14. }
  15. };
  16. }

sender只是引用一个消息队列,而receiver是拥有一个队列。可以使用隐式转换的方式获取sender引用的类。难点在于wait()中的调度。这里创建了一个dispatcher对象引用receiver中的队列。dispatcher类实现会在下一个清单中看到;如你所见,任务是在析构函数中完成的。在这个例子中,所要做的工作是对消息进行等待,以及对其进行调度。

清单C.4 dispatcher类

  1. namespace messaging
  2. {
  3. class close_queue // 用于关闭队列的消息
  4. {};
  5. class dispatcher
  6. {
  7. queue* q;
  8. bool chained;
  9. dispatcher(dispatcher const&)=delete; // dispatcher实例不能被拷贝
  10. dispatcher& operator=(dispatcher const&)=delete;
  11. template<
  12. typename Dispatcher,
  13. typename Msg,
  14. typename Func> // 允许TemplateDispatcher实例访问内部成员
  15. friend class TemplateDispatcher;
  16. void wait_and_dispatch()
  17. {
  18. for(;;) // 1 循环,等待调度消息
  19. {
  20. auto msg=q->wait_and_pop();
  21. dispatch(msg);
  22. }
  23. }
  24. bool dispatch( // 2 dispatch()会检查close_queue消息,然后抛出
  25. std::shared_ptr<message_base> const& msg)
  26. {
  27. if(dynamic_cast<wrapped_message<close_queue>*>(msg.get()))
  28. {
  29. throw close_queue();
  30. }
  31. return false;
  32. }
  33. public:
  34. dispatcher(dispatcher&& other): // dispatcher实例可以移动
  35. q(other.q),chained(other.chained)
  36. {
  37. other.chained=true; // 源不能等待消息
  38. }
  39. explicit dispatcher(queue* q_):
  40. q(q_),chained(false)
  41. {}
  42. template<typename Message,typename Func>
  43. TemplateDispatcher<dispatcher,Message,Func>
  44. handle(Func&& f) // 3 使用TemplateDispatcher处理指定类型的消息
  45. {
  46. return TemplateDispatcher<dispatcher,Message,Func>(
  47. q,this,std::forward<Func>(f));
  48. }
  49. ~dispatcher() noexcept(false) // 4 析构函数可能会抛出异常
  50. {
  51. if(!chained)
  52. {
  53. wait_and_dispatch();
  54. }
  55. }
  56. };
  57. }

从wait()返回的dispatcher实例将马上被销毁,因为是临时变量,也向前文提到的,析构函数在这里做真正的工作。析构函数调用wait_and_dispatch()函数,这个函数中有一个循环①,等待消息的传入(这样才能进行弹出操作),然后将消息传递给dispatch()函数。dispatch()函数本身②很简单;会检查小时是否是一个close_queue消息,当是close_queue消息时,抛出一个异常;如果不是,函数将会返回false来表明消息没有被处理。因为会抛出close_queue异常,所以析构函数会标示为noexcept(false);在没有任何标识的情况下,一般情况下析构函数都noexcept(true)④型,这表示没有任何异常抛出,并且close_queue异常将会使程序终止。

虽然,不会经常的去调用wait()函数,不过,在大多数时间里,你都希望对一条消息进行处理。这时就需要handle()成员函数③的加入。这个函数是一个模板,并且消息类型不可推断,所以你需要指定需要处理的消息类型,并且传入函数(或可调用对象)进行处理,并将队列传入当前dispatcher对象的handle()函数。这将在清单C.5中展示。这就是为什么,在测试析构函数中的chained值前,要等待消息耳朵原因;不仅是避免“移动”类型的对象对消息进行等待,而且允许将等待状态转移到新的TemplateDispatcher实例中。

清单C.5 TemplateDispatcher类模板

  1. namespace messaging
  2. {
  3. template<typename PreviousDispatcher,typename Msg,typename Func>
  4. class TemplateDispatcher
  5. {
  6. queue* q;
  7. PreviousDispatcher* prev;
  8. Func f;
  9. bool chained;
  10. TemplateDispatcher(TemplateDispatcher const&)=delete;
  11. TemplateDispatcher& operator=(TemplateDispatcher const&)=delete;
  12. template<typename Dispatcher,typename OtherMsg,typename OtherFunc>
  13. friend class TemplateDispatcher; // 所有特化的TemplateDispatcher类型实例都是友元类
  14. void wait_and_dispatch()
  15. {
  16. for(;;)
  17. {
  18. auto msg=q->wait_and_pop();
  19. if(dispatch(msg)) // 1 如果消息处理过后,会跳出循环
  20. break;
  21. }
  22. }
  23. bool dispatch(std::shared_ptr<message_base> const& msg)
  24. {
  25. if(wrapped_message<Msg>* wrapper=
  26. dynamic_cast<wrapped_message<Msg>*>(msg.get())) // 2 检查消息类型,并且调用函数
  27. {
  28. f(wrapper->contents);
  29. return true;
  30. }
  31. else
  32. {
  33. return prev->dispatch(msg); // 3 链接到之前的调度器上
  34. }
  35. }
  36. public:
  37. TemplateDispatcher(TemplateDispatcher&& other):
  38. q(other.q),prev(other.prev),f(std::move(other.f)),
  39. chained(other.chained)
  40. {
  41. other.chained=true;
  42. }
  43. TemplateDispatcher(queue* q_,PreviousDispatcher* prev_,Func&& f_):
  44. q(q_),prev(prev_),f(std::forward<Func>(f_)),chained(false)
  45. {
  46. prev_->chained=true;
  47. }
  48. template<typename OtherMsg,typename OtherFunc>
  49. TemplateDispatcher<TemplateDispatcher,OtherMsg,OtherFunc>
  50. handle(OtherFunc&& of) // 4 可以链接其他处理器
  51. {
  52. return TemplateDispatcher<
  53. TemplateDispatcher,OtherMsg,OtherFunc>(
  54. q,this,std::forward<OtherFunc>(of));
  55. }
  56. ~TemplateDispatcher() noexcept(false) // 5 这个析构函数也是noexcept(false)的
  57. {
  58. if(!chained)
  59. {
  60. wait_and_dispatch();
  61. }
  62. }
  63. };
  64. }

TemplateDispatcher<>类模板仿照了dispatcher类,二者几乎相同。特别是在析构函数上,都是调用wait_and_dispatch()等待处理消息。

在处理消息的过程中,如果不抛出异常,就需要检查一下在循环中①,消息是否已经得到了处理。当成功的处理了一条消息,处理过程就可以停止,这样就可以等待下一组消息的传入了。当获取了一个和指定类型匹配的消息,使用函数调用的方式②,就要好于抛出异常(虽然,处理函数也可能会抛出异常)。如果消息类型不匹配,那么就可以链接前一个调度器③。在第一个实例中,dispatcher实例确实作为一个调度器,当在handle()④函数中进行链接后,就允许处理多种类型的消息。在链接了之前的TemplateDispatcher<>实例后,当消息类型和当前的调度器类型不匹配的时候,调度链会依次的向前寻找类型匹配的调度器。因为任何调度器都可能抛出异常(包括dispatcher中对close_queue消息进行处理的默认处理器),析构函数在这里会再次被声明为noexcept(false)⑤。

这种简单的架构允许你想队列推送任何类型的消息,并且调度器有选择的与接收端的消息进行匹配。同样,也允许为了推送消息,将消息队列的引用进行传递的同时,保持接收端的私有性。

为了完成第4章的例子,消息的组成将在清单C.6中给出,各种状态机将在清单C.7,C.8和C.9中给出。最后,驱动代码将在C.10给出。

清单C.6 ATM消息

  1. struct withdraw
  2. {
  3. std::string account;
  4. unsigned amount;
  5. mutable messaging::sender atm_queue;
  6. withdraw(std::string const& account_,
  7. unsigned amount_,
  8. messaging::sender atm_queue_):
  9. account(account_),amount(amount_),
  10. atm_queue(atm_queue_)
  11. {}
  12. };
  13. struct withdraw_ok
  14. {};
  15. struct withdraw_denied
  16. {};
  17. struct cancel_withdrawal
  18. {
  19. std::string account;
  20. unsigned amount;
  21. cancel_withdrawal(std::string const& account_,
  22. unsigned amount_):
  23. account(account_),amount(amount_)
  24. {}
  25. };
  26. struct withdrawal_processed
  27. {
  28. std::string account;
  29. unsigned amount;
  30. withdrawal_processed(std::string const& account_,
  31. unsigned amount_):
  32. account(account_),amount(amount_)
  33. {}
  34. };
  35. struct card_inserted
  36. {
  37. std::string account;
  38. explicit card_inserted(std::string const& account_):
  39. account(account_)
  40. {}
  41. };
  42. struct digit_pressed
  43. {
  44. char digit;
  45. explicit digit_pressed(char digit_):
  46. digit(digit_)
  47. {}
  48. };
  49. struct clear_last_pressed
  50. {};
  51. struct eject_card
  52. {};
  53. struct withdraw_pressed
  54. {
  55. unsigned amount;
  56. explicit withdraw_pressed(unsigned amount_):
  57. amount(amount_)
  58. {}
  59. };
  60. struct cancel_pressed
  61. {};
  62. struct issue_money
  63. {
  64. unsigned amount;
  65. issue_money(unsigned amount_):
  66. amount(amount_)
  67. {}
  68. };
  69. struct verify_pin
  70. {
  71. std::string account;
  72. std::string pin;
  73. mutable messaging::sender atm_queue;
  74. verify_pin(std::string const& account_,std::string const& pin_,
  75. messaging::sender atm_queue_):
  76. account(account_),pin(pin_),atm_queue(atm_queue_)
  77. {}
  78. };
  79. struct pin_verified
  80. {};
  81. struct pin_incorrect
  82. {};
  83. struct display_enter_pin
  84. {};
  85. struct display_enter_card
  86. {};
  87. struct display_insufficient_funds
  88. {};
  89. struct display_withdrawal_cancelled
  90. {};
  91. struct display_pin_incorrect_message
  92. {};
  93. struct display_withdrawal_options
  94. {};
  95. struct get_balance
  96. {
  97. std::string account;
  98. mutable messaging::sender atm_queue;
  99. get_balance(std::string const& account_,messaging::sender atm_queue_):
  100. account(account_),atm_queue(atm_queue_)
  101. {}
  102. };
  103. struct balance
  104. {
  105. unsigned amount;
  106. explicit balance(unsigned amount_):
  107. amount(amount_)
  108. {}
  109. };
  110. struct display_balance
  111. {
  112. unsigned amount;
  113. explicit display_balance(unsigned amount_):
  114. amount(amount_)
  115. {}
  116. };
  117. struct balance_pressed
  118. {};

清单C.7 ATM状态机

  1. class atm
  2. {
  3. messaging::receiver incoming;
  4. messaging::sender bank;
  5. messaging::sender interface_hardware;
  6. void (atm::*state)();
  7. std::string account;
  8. unsigned withdrawal_amount;
  9. std::string pin;
  10. void process_withdrawal()
  11. {
  12. incoming.wait()
  13. .handle<withdraw_ok>(
  14. [&](withdraw_ok const& msg)
  15. {
  16. interface_hardware.send(
  17. issue_money(withdrawal_amount));
  18. bank.send(
  19. withdrawal_processed(account,withdrawal_amount));
  20. state=&atm::done_processing;
  21. })
  22. .handle<withdraw_denied>(
  23. [&](withdraw_denied const& msg)
  24. {
  25. interface_hardware.send(display_insufficient_funds());
  26. state=&atm::done_processing;
  27. })
  28. .handle<cancel_pressed>(
  29. [&](cancel_pressed const& msg)
  30. {
  31. bank.send(
  32. cancel_withdrawal(account,withdrawal_amount));
  33. interface_hardware.send(
  34. display_withdrawal_cancelled());
  35. state=&atm::done_processing;
  36. });
  37. }
  38. void process_balance()
  39. {
  40. incoming.wait()
  41. .handle<balance>(
  42. [&](balance const& msg)
  43. {
  44. interface_hardware.send(display_balance(msg.amount));
  45. state=&atm::wait_for_action;
  46. })
  47. .handle<cancel_pressed>(
  48. [&](cancel_pressed const& msg)
  49. {
  50. state=&atm::done_processing;
  51. });
  52. }
  53. void wait_for_action()
  54. {
  55. interface_hardware.send(display_withdrawal_options());
  56. incoming.wait()
  57. .handle<withdraw_pressed>(
  58. [&](withdraw_pressed const& msg)
  59. {
  60. withdrawal_amount=msg.amount;
  61. bank.send(withdraw(account,msg.amount,incoming));
  62. state=&atm::process_withdrawal;
  63. })
  64. .handle<balance_pressed>(
  65. [&](balance_pressed const& msg)
  66. {
  67. bank.send(get_balance(account,incoming));
  68. state=&atm::process_balance;
  69. })
  70. .handle<cancel_pressed>(
  71. [&](cancel_pressed const& msg)
  72. {
  73. state=&atm::done_processing;
  74. });
  75. }
  76. void verifying_pin()
  77. {
  78. incoming.wait()
  79. .handle<pin_verified>(
  80. [&](pin_verified const& msg)
  81. {
  82. state=&atm::wait_for_action;
  83. })
  84. .handle<pin_incorrect>(
  85. [&](pin_incorrect const& msg)
  86. {
  87. interface_hardware.send(
  88. display_pin_incorrect_message());
  89. state=&atm::done_processing;
  90. })
  91. .handle<cancel_pressed>(
  92. [&](cancel_pressed const& msg)
  93. {
  94. state=&atm::done_processing;
  95. });
  96. }
  97. void getting_pin()
  98. {
  99. incoming.wait()
  100. .handle<digit_pressed>(
  101. [&](digit_pressed const& msg)
  102. {
  103. unsigned const pin_length=4;
  104. pin+=msg.digit;
  105. if(pin.length()==pin_length)
  106. {
  107. bank.send(verify_pin(account,pin,incoming));
  108. state=&atm::verifying_pin;
  109. }
  110. })
  111. .handle<clear_last_pressed>(
  112. [&](clear_last_pressed const& msg)
  113. {
  114. if(!pin.empty())
  115. {
  116. pin.pop_back();
  117. }
  118. })
  119. .handle<cancel_pressed>(
  120. [&](cancel_pressed const& msg)
  121. {
  122. state=&atm::done_processing;
  123. });
  124. }
  125. void waiting_for_card()
  126. {
  127. interface_hardware.send(display_enter_card());
  128. incoming.wait()
  129. .handle<card_inserted>(
  130. [&](card_inserted const& msg)
  131. {
  132. account=msg.account;
  133. pin="";
  134. interface_hardware.send(display_enter_pin());
  135. state=&atm::getting_pin;
  136. });
  137. }
  138. void done_processing()
  139. {
  140. interface_hardware.send(eject_card());
  141. state=&atm::waiting_for_card;
  142. }
  143. atm(atm const&)=delete;
  144. atm& operator=(atm const&)=delete;
  145. public:
  146. atm(messaging::sender bank_,
  147. messaging::sender interface_hardware_):
  148. bank(bank_),interface_hardware(interface_hardware_)
  149. {}
  150. void done()
  151. {
  152. get_sender().send(messaging::close_queue());
  153. }
  154. void run()
  155. {
  156. state=&atm::waiting_for_card;
  157. try
  158. {
  159. for(;;)
  160. {
  161. (this->*state)();
  162. }
  163. }
  164. catch(messaging::close_queue const&)
  165. {
  166. }
  167. }
  168. messaging::sender get_sender()
  169. {
  170. return incoming;
  171. }
  172. };

清单C.8 银行状态机

  1. class bank_machine
  2. {
  3. messaging::receiver incoming;
  4. unsigned balance;
  5. public:
  6. bank_machine():
  7. balance(199)
  8. {}
  9. void done()
  10. {
  11. get_sender().send(messaging::close_queue());
  12. }
  13. void run()
  14. {
  15. try
  16. {
  17. for(;;)
  18. {
  19. incoming.wait()
  20. .handle<verify_pin>(
  21. [&](verify_pin const& msg)
  22. {
  23. if(msg.pin=="1937")
  24. {
  25. msg.atm_queue.send(pin_verified());
  26. }
  27. else
  28. {
  29. msg.atm_queue.send(pin_incorrect());
  30. }
  31. })
  32. .handle<withdraw>(
  33. [&](withdraw const& msg)
  34. {
  35. if(balance>=msg.amount)
  36. {
  37. msg.atm_queue.send(withdraw_ok());
  38. balance-=msg.amount;
  39. }
  40. else
  41. {
  42. msg.atm_queue.send(withdraw_denied());
  43. }
  44. })
  45. .handle<get_balance>(
  46. [&](get_balance const& msg)
  47. {
  48. msg.atm_queue.send(::balance(balance));
  49. })
  50. .handle<withdrawal_processed>(
  51. [&](withdrawal_processed const& msg)
  52. {
  53. })
  54. .handle<cancel_withdrawal>(
  55. [&](cancel_withdrawal const& msg)
  56. {
  57. });
  58. }
  59. }
  60. catch(messaging::close_queue const&)
  61. {
  62. }
  63. }
  64. messaging::sender get_sender()
  65. {
  66. return incoming;
  67. }
  68. };

清单C.9 用户状态机

class interface_machine
{
  messaging::receiver incoming;
public:
  void done()
  {
    get_sender().send(messaging::close_queue());
  }

  void run()
  {
    try
    {
      for(;;)
      {
        incoming.wait()
          .handle<issue_money>(
           [&](issue_money const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Issuing "
                 <<msg.amount<<std::endl;
             }
           })
          .handle<display_insufficient_funds>(
           [&](display_insufficient_funds const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Insufficient funds"<<std::endl;
             }
           })
          .handle<display_enter_pin>(
           [&](display_enter_pin const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Please enter your PIN (0-9)"<<std::endl;
             }
           })
          .handle<display_enter_card>(
           [&](display_enter_card const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Please enter your card (I)"
                 <<std::endl;
             }
           })
          .handle<display_balance>(
           [&](display_balance const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout
                 <<"The balance of your account is "
                 <<msg.amount<<std::endl;
             }
           })
          .handle<display_withdrawal_options>(
           [&](display_withdrawal_options const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Withdraw 50? (w)"<<std::endl;
               std::cout<<"Display Balance? (b)"
                 <<std::endl;
               std::cout<<"Cancel? (c)"<<std::endl;
             }
           })
          .handle<display_withdrawal_cancelled>(
           [&](display_withdrawal_cancelled const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Withdrawal cancelled"
                 <<std::endl;
             }
           })
          .handle<display_pin_incorrect_message>(
           [&](display_pin_incorrect_message const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"PIN incorrect"<<std::endl;
             }
           })
          .handle<eject_card>(
           [&](eject_card const& msg)
           {
             {
               std::lock_guard<std::mutex> lk(iom);
               std::cout<<"Ejecting card"<<std::endl;
             }
           });
      }
    }
    catch(messaging::close_queue&)
    {
    }
  }

  messaging::sender get_sender()
  {
    return incoming;
  }
};

清单C.10 驱动代码

int main()
{
  bank_machine bank;
  interface_machine interface_hardware;

  atm machine(bank.get_sender(),interface_hardware.get_sender());

  std::thread bank_thread(&bank_machine::run,&bank);
  std::thread if_thread(&interface_machine::run,&interface_hardware);
  std::thread atm_thread(&atm::run,&machine);

  messaging::sender atmqueue(machine.get_sender());

  bool quit_pressed=false;

  while(!quit_pressed)
  {
    char c=getchar();
    switch(c)
    {
    case '0':
    case '1':
    case '2':
    case '3':
    case '4':
    case '5':
    case '6':
    case '7':
    case '8':
    case '9':
      atmqueue.send(digit_pressed(c));
      break;
    case 'b':
      atmqueue.send(balance_pressed());
      break;
    case 'w':
      atmqueue.send(withdraw_pressed(50));
      break;
    case 'c':
      atmqueue.send(cancel_pressed());
      break;
    case 'q':
      quit_pressed=true;
      break;
    case 'i':
      atmqueue.send(card_inserted("acc1234"));
      break;
    }
  }

  bank.done();
  machine.done();
  interface_hardware.done();

  atm_thread.join();
  bank_thread.join();
  if_thread.join();
}