2.5 服务消费方接收调用结果
服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。这个过程和服务提供方接收请求的过程是一样的,因此这里就不重复分析了。本节我们重点分析两个方面的内容,一是响应数据的解码过程,二是 Dubbo 如何将调用结果传递给用户线程的。下面先来分析响应数据的解码过程。
2.5.1 响应数据解码
响应数据解码逻辑主要的逻辑封装在 DubboCodec 中,我们直接分析这个类的代码。如下:
public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 获取请求编号
long id = Bytes.bytes2long(header, 4);
// 检测消息类型,若下面的条件成立,表明消息类型为 Response
if ((flag & FLAG_REQUEST) == 0) {
// 创建 Response 对象
Response res = new Response(id);
// 检测事件标志位
if ((flag & FLAG_EVENT) != 0) {
// 设置心跳事件
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 获取响应状态
byte status = header[3];
// 设置响应状态
res.setStatus(status);
// 如果响应状态为 OK,表明调用过程正常
if (status == Response.OK) {
try {
Object data;
if (res.isHeartbeat()) {
// 反序列化心跳数据,已废弃
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {
// 反序列化事件数据
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcResult result;
// 根据 url 参数决定是否在 IO 线程上执行解码逻辑
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
// 创建 DecodeableRpcResult 对象
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
// 进行后续的解码工作
result.decode();
} else {
// 创建 DecodeableRpcResult 对象
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
// 设置 DecodeableRpcResult 对象到 Response 对象中
res.setResult(data);
} catch (Throwable t) {
// 解码过程中出现了错误,此时设置 CLIENT_ERROR 状态码到 Response 对象中
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
}
// 响应状态非 OK,表明调用过程出现了异常
else {
// 反序列化异常信息,并设置到 Response 对象中
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {
// 对请求数据进行解码,前面已分析过,此处忽略
}
}
}
以上就是响应数据的解码过程,上面逻辑看起来是不是似曾相识。对的,我们在前面章节分析过 DubboCodec 的 decodeBody 方法中关于请求数据的解码过程,该过程和响应数据的解码过程很相似。下面,我们继续分析调用结果的反序列化过程,如下:
public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable {
private Invocation invocation;
@Override
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
// 执行反序列化操作
decode(channel, inputStream);
} catch (Throwable e) {
// 反序列化失败,设置 CLIENT_ERROR 状态到 Response 对象中
response.setStatus(Response.CLIENT_ERROR);
// 设置异常信息
response.setErrorMessage(StringUtils.toString(e));
} finally {
hasDecoded = true;
}
}
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 反序列化响应类型
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
// ...
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
// ...
break;
// 返回值为空,且携带了 attachments 集合
case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
try {
// 反序列化 attachments 集合,并存储起来
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
// 返回值不为空,且携带了 attachments 集合
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
try {
// 获取返回值类型
Type[] returnType = RpcUtils.getReturnTypes(invocation);
// 反序列化调用结果,并保存起来
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
// 反序列化 attachments 集合,并存储起来
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
// 异常对象不为空,且携带了 attachments 集合
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
try {
// 反序列化异常对象
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
// 设置异常对象
setException((Throwable) obj);
// 反序列化 attachments 集合,并存储起来
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
return this;
}
}
本篇文章所分析的源码版本为 2.6.4,该版本下的 Response 支持 attachments 集合,所以上面仅对部分 case 分支进行了注释。其他 case 分支的逻辑比被注释分支的逻辑更为简单,这里就忽略了。我们所使用的测试服务接口 DemoService 包含了一个具有返回值的方法,正常调用下,线程会进入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后线程会从 invocation 变量(大家探索一下 invocation 变量的由来)中获取返回值类型,接着对调用结果进行反序列化,并将序列化后的结果存储起来。最后对 attachments 集合进行反序列化,并存到指定字段中。到此,关于响应数据的解码过程就分析完了。接下来,我们再来探索一下响应对象 Response 的去向。
2.5.2 向用户线程传递调用结果
响应数据解码完成后,Dubbo 会将响应对象派发到线程池上。要注意的是,线程池中的线程并非用户的调用线程,所以要想办法将响应对象从线程池线程传递到用户线程上。我们在 2.1 节分析过用户线程在发送完请求后的动作,即调用 DefaultFuture 的 get 方法等待响应对象的到来。当响应对象到来后,用户线程会被唤醒,并通过调用编号获取属于自己的响应对象。下面我们来看一下整个过程对应的代码。
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// 处理请求,前面已分析过,省略
} else if (message instanceof Response) {
// 处理响应
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// telnet 相关,忽略
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
// 继续向下调用
DefaultFuture.received(channel, response);
}
}
}
public class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private volatile Response response;
public static void received(Channel channel, Response response) {
try {
// 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
// 继续向下调用
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at ...");
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
// 保存响应对象
response = res;
if (done != null) {
// 唤醒用户线程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
以上逻辑是将响应对象保存到相应的 DefaultFuture 实例中,然后再唤醒用户线程,随后用户线程即可从 DefaultFuture 实例中获取到相应结果。
本篇文章在多个地方都强调过调用编号很重要,但一直没有解释原因,这里简单说明一下。一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图: