2.5 服务消费方接收调用结果

服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。这个过程和服务提供方接收请求的过程是一样的,因此这里就不重复分析了。本节我们重点分析两个方面的内容,一是响应数据的解码过程,二是 Dubbo 如何将调用结果传递给用户线程的。下面先来分析响应数据的解码过程。

2.5.1 响应数据解码

响应数据解码逻辑主要的逻辑封装在 DubboCodec 中,我们直接分析这个类的代码。如下:

  1. public class DubboCodec extends ExchangeCodec implements Codec2 {
  2. @Override
  3. protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
  4. byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
  5. Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
  6. // 获取请求编号
  7. long id = Bytes.bytes2long(header, 4);
  8. // 检测消息类型,若下面的条件成立,表明消息类型为 Response
  9. if ((flag & FLAG_REQUEST) == 0) {
  10. // 创建 Response 对象
  11. Response res = new Response(id);
  12. // 检测事件标志位
  13. if ((flag & FLAG_EVENT) != 0) {
  14. // 设置心跳事件
  15. res.setEvent(Response.HEARTBEAT_EVENT);
  16. }
  17. // 获取响应状态
  18. byte status = header[3];
  19. // 设置响应状态
  20. res.setStatus(status);
  21. // 如果响应状态为 OK,表明调用过程正常
  22. if (status == Response.OK) {
  23. try {
  24. Object data;
  25. if (res.isHeartbeat()) {
  26. // 反序列化心跳数据,已废弃
  27. data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
  28. } else if (res.isEvent()) {
  29. // 反序列化事件数据
  30. data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
  31. } else {
  32. DecodeableRpcResult result;
  33. // 根据 url 参数决定是否在 IO 线程上执行解码逻辑
  34. if (channel.getUrl().getParameter(
  35. Constants.DECODE_IN_IO_THREAD_KEY,
  36. Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
  37. // 创建 DecodeableRpcResult 对象
  38. result = new DecodeableRpcResult(channel, res, is,
  39. (Invocation) getRequestData(id), proto);
  40. // 进行后续的解码工作
  41. result.decode();
  42. } else {
  43. // 创建 DecodeableRpcResult 对象
  44. result = new DecodeableRpcResult(channel, res,
  45. new UnsafeByteArrayInputStream(readMessageData(is)),
  46. (Invocation) getRequestData(id), proto);
  47. }
  48. data = result;
  49. }
  50. // 设置 DecodeableRpcResult 对象到 Response 对象中
  51. res.setResult(data);
  52. } catch (Throwable t) {
  53. // 解码过程中出现了错误,此时设置 CLIENT_ERROR 状态码到 Response 对象中
  54. res.setStatus(Response.CLIENT_ERROR);
  55. res.setErrorMessage(StringUtils.toString(t));
  56. }
  57. }
  58. // 响应状态非 OK,表明调用过程出现了异常
  59. else {
  60. // 反序列化异常信息,并设置到 Response 对象中
  61. res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
  62. }
  63. return res;
  64. } else {
  65. // 对请求数据进行解码,前面已分析过,此处忽略
  66. }
  67. }
  68. }

以上就是响应数据的解码过程,上面逻辑看起来是不是似曾相识。对的,我们在前面章节分析过 DubboCodec 的 decodeBody 方法中关于请求数据的解码过程,该过程和响应数据的解码过程很相似。下面,我们继续分析调用结果的反序列化过程,如下:

  1. public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable {
  2. private Invocation invocation;
  3. @Override
  4. public void decode() throws Exception {
  5. if (!hasDecoded && channel != null && inputStream != null) {
  6. try {
  7. // 执行反序列化操作
  8. decode(channel, inputStream);
  9. } catch (Throwable e) {
  10. // 反序列化失败,设置 CLIENT_ERROR 状态到 Response 对象中
  11. response.setStatus(Response.CLIENT_ERROR);
  12. // 设置异常信息
  13. response.setErrorMessage(StringUtils.toString(e));
  14. } finally {
  15. hasDecoded = true;
  16. }
  17. }
  18. }
  19. @Override
  20. public Object decode(Channel channel, InputStream input) throws IOException {
  21. ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
  22. .deserialize(channel.getUrl(), input);
  23. // 反序列化响应类型
  24. byte flag = in.readByte();
  25. switch (flag) {
  26. case DubboCodec.RESPONSE_NULL_VALUE:
  27. break;
  28. case DubboCodec.RESPONSE_VALUE:
  29. // ...
  30. break;
  31. case DubboCodec.RESPONSE_WITH_EXCEPTION:
  32. // ...
  33. break;
  34. // 返回值为空,且携带了 attachments 集合
  35. case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
  36. try {
  37. // 反序列化 attachments 集合,并存储起来
  38. setAttachments((Map<String, String>) in.readObject(Map.class));
  39. } catch (ClassNotFoundException e) {
  40. throw new IOException(StringUtils.toString("Read response data failed.", e));
  41. }
  42. break;
  43. // 返回值不为空,且携带了 attachments 集合
  44. case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
  45. try {
  46. // 获取返回值类型
  47. Type[] returnType = RpcUtils.getReturnTypes(invocation);
  48. // 反序列化调用结果,并保存起来
  49. setValue(returnType == null || returnType.length == 0 ? in.readObject() :
  50. (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
  51. : in.readObject((Class<?>) returnType[0], returnType[1])));
  52. // 反序列化 attachments 集合,并存储起来
  53. setAttachments((Map<String, String>) in.readObject(Map.class));
  54. } catch (ClassNotFoundException e) {
  55. throw new IOException(StringUtils.toString("Read response data failed.", e));
  56. }
  57. break;
  58. // 异常对象不为空,且携带了 attachments 集合
  59. case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
  60. try {
  61. // 反序列化异常对象
  62. Object obj = in.readObject();
  63. if (obj instanceof Throwable == false)
  64. throw new IOException("Response data error, expect Throwable, but get " + obj);
  65. // 设置异常对象
  66. setException((Throwable) obj);
  67. // 反序列化 attachments 集合,并存储起来
  68. setAttachments((Map<String, String>) in.readObject(Map.class));
  69. } catch (ClassNotFoundException e) {
  70. throw new IOException(StringUtils.toString("Read response data failed.", e));
  71. }
  72. break;
  73. default:
  74. throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
  75. }
  76. if (in instanceof Cleanable) {
  77. ((Cleanable) in).cleanup();
  78. }
  79. return this;
  80. }
  81. }

本篇文章所分析的源码版本为 2.6.4,该版本下的 Response 支持 attachments 集合,所以上面仅对部分 case 分支进行了注释。其他 case 分支的逻辑比被注释分支的逻辑更为简单,这里就忽略了。我们所使用的测试服务接口 DemoService 包含了一个具有返回值的方法,正常调用下,线程会进入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后线程会从 invocation 变量(大家探索一下 invocation 变量的由来)中获取返回值类型,接着对调用结果进行反序列化,并将序列化后的结果存储起来。最后对 attachments 集合进行反序列化,并存到指定字段中。到此,关于响应数据的解码过程就分析完了。接下来,我们再来探索一下响应对象 Response 的去向。

2.5.2 向用户线程传递调用结果

响应数据解码完成后,Dubbo 会将响应对象派发到线程池上。要注意的是,线程池中的线程并非用户的调用线程,所以要想办法将响应对象从线程池线程传递到用户线程上。我们在 2.1 节分析过用户线程在发送完请求后的动作,即调用 DefaultFuture 的 get 方法等待响应对象的到来。当响应对象到来后,用户线程会被唤醒,并通过调用编号获取属于自己的响应对象。下面我们来看一下整个过程对应的代码。

  1. public class HeaderExchangeHandler implements ChannelHandlerDelegate {
  2. @Override
  3. public void received(Channel channel, Object message) throws RemotingException {
  4. channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
  5. ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
  6. try {
  7. if (message instanceof Request) {
  8. // 处理请求,前面已分析过,省略
  9. } else if (message instanceof Response) {
  10. // 处理响应
  11. handleResponse(channel, (Response) message);
  12. } else if (message instanceof String) {
  13. // telnet 相关,忽略
  14. } else {
  15. handler.received(exchangeChannel, message);
  16. }
  17. } finally {
  18. HeaderExchangeChannel.removeChannelIfDisconnected(channel);
  19. }
  20. }
  21. static void handleResponse(Channel channel, Response response) throws RemotingException {
  22. if (response != null && !response.isHeartbeat()) {
  23. // 继续向下调用
  24. DefaultFuture.received(channel, response);
  25. }
  26. }
  27. }
  28. public class DefaultFuture implements ResponseFuture {
  29. private final Lock lock = new ReentrantLock();
  30. private final Condition done = lock.newCondition();
  31. private volatile Response response;
  32. public static void received(Channel channel, Response response) {
  33. try {
  34. // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
  35. DefaultFuture future = FUTURES.remove(response.getId());
  36. if (future != null) {
  37. // 继续向下调用
  38. future.doReceived(response);
  39. } else {
  40. logger.warn("The timeout response finally returned at ...");
  41. }
  42. } finally {
  43. CHANNELS.remove(response.getId());
  44. }
  45. }
  46. private void doReceived(Response res) {
  47. lock.lock();
  48. try {
  49. // 保存响应对象
  50. response = res;
  51. if (done != null) {
  52. // 唤醒用户线程
  53. done.signal();
  54. }
  55. } finally {
  56. lock.unlock();
  57. }
  58. if (callback != null) {
  59. invokeCallback(callback);
  60. }
  61. }
  62. }

以上逻辑是将响应对象保存到相应的 DefaultFuture 实例中,然后再唤醒用户线程,随后用户线程即可从 DefaultFuture 实例中获取到相应结果。

本篇文章在多个地方都强调过调用编号很重要,但一直没有解释原因,这里简单说明一下。一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:

2.5 服务消费方接收调用结果 - 图1