与workflow异步框架的结合

Server

下面我们通过一个具体例子来呈现

  • Echo RPC在接收到请求时,向下游发起一次http请求
  • 对下游请求完成后,我们将http response的body信息填充到response的message里,回复给客户端
  • 我们不希望阻塞/占据着Handler的线程,所以对下游的请求一定是一次异步请求
  • 首先,我们通过Workflow框架的工厂WFTaskFactory::create_http_task创建一个异步任务http_task
  • 然后,我们利用RPCContext的ctx->get_series()获取到ServerTask所在的SeriesWork
  • 最后,我们使用SeriesWork的push_back接口将http_task放到SeriesWork的后面
  1. class ExampleServiceImpl : public Example::Service
  2. {
  3. public:
  4. void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
  5. {
  6. auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0,
  7. [request, response](WFHttpTask *task) {
  8. if (task->get_state() == WFT_STATE_SUCCESS)
  9. {
  10. const void *data;
  11. size_t len;
  12. task->get_resp()->get_parsed_body(&data, &len);
  13. response->mutable_message()->assign((const char *)data, len);
  14. }
  15. else
  16. response->set_message("Error: " + std::to_string(task->get_error()));
  17. printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n",
  18. request->DebugString().c_str(),
  19. response->DebugString().c_str());
  20. });
  21. ctx->get_series()->push_back(http_task);
  22. }
  23. };

Client

下面我们通过一个具体例子来呈现

  • 我们并行发出两个请求,1个是rpc请求,1个是http请求
  • 两个请求都结束后,我们再发起一次计算任务,计算两个数的平方和
  • 首先,我们通过RPC Client的create_Echo_task创建一个rpc异步请求的网络任务rpc_task
  • 然后,我们通过Workflow框架的工厂WFTaskFactory::create_http_taskWFTaskFactory::create_go_task分别创建异步网络任务http_task,和异步计算任务calc_task
  • 最后,我们利用串并联流程图,乘号代表并行、大于号代表串行,将3个异步任务组合起来执行start
  1. void calc(int x, int y)
  2. {
  3. int z = x * x + y * y;
  4. printf("calc result: %d\n", z);
  5. }
  6. int main()
  7. {
  8. Example::SRPCClient client("127.0.0.1", 1412);
  9. auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
  10. if (ctx->success())
  11. printf("%s\n", response->DebugString().c_str());
  12. else
  13. printf("status[%d] error[%d] errmsg:%s\n",
  14. ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
  15. });
  16. auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
  17. if (task->get_state() == WFT_STATE_SUCCESS)
  18. {
  19. std::string body;
  20. const void *data;
  21. size_t len;
  22. task->get_resp()->get_parsed_body(&data, &len);
  23. body.assign((const char *)data, len);
  24. printf("%s\n\n", body.c_str());
  25. }
  26. else
  27. printf("Http request fail\n\n");
  28. });
  29. auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);
  30. EchoRequest req;
  31. req.set_message("Hello, sogou rpc!");
  32. req.set_name("1412");
  33. rpc_task->serialize_input(&req);
  34. ((*http_task * rpc_task) > calc_task).start();
  35. pause();
  36. return 0;
  37. }