异步调用

某些情况下希望dubbo接口异步调用,避免不必要的等待。

Dubbo 异步调用分为 Provider 端异步调用和 Consumer 端异步两种模式。

  • Consumer 端异步是指发起 RPC 调用后立即返回,调用线程继续处理其他业务逻辑,当响应结果返回后通过回调函数通知消费端结果。
  • Provider 端异步执行将阻塞的业务从 Dubbo 内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响。

以下是消费端 consumer 异步调用的工作示例图:

/user-guide/images/future.jpg

Provider 端异步执行和 Consumer 端异步调用是相互独立的,你可以任意正交组合两端配置。

  • Consumer同步 - Provider同步
  • Consumer异步 - Provider同步
  • Consumer同步 - Provider异步
  • Consumer异步 - Provider异步

本文档演示的完整示例源码请参见:

Provider异步

1 使用CompletableFuture

接口定义:

  1. public interface AsyncService {
  2. /**
  3. * 同步调用方法
  4. */
  5. String invoke(String param);
  6. /**
  7. * 异步调用方法
  8. */
  9. CompletableFuture<String> asyncInvoke(String param);
  10. }

服务实现:

  1. @DubboService
  2. public class AsyncServiceImpl implements AsyncService {
  3. @Override
  4. public String invoke(String param) {
  5. try {
  6. long time = ThreadLocalRandom.current().nextLong(1000);
  7. Thread.sleep(time);
  8. StringBuilder s = new StringBuilder();
  9. s.append("AsyncService invoke param:").append(param).append(",sleep:").append(time);
  10. return s.toString();
  11. }
  12. catch (InterruptedException e) {
  13. Thread.currentThread().interrupt();
  14. }
  15. return null;
  16. }
  17. @Override
  18. public CompletableFuture<String> asyncInvoke(String param) {
  19. // 建议为supplyAsync提供自定义线程池
  20. return CompletableFuture.supplyAsync(() -> {
  21. try {
  22. // Do something
  23. long time = ThreadLocalRandom.current().nextLong(1000);
  24. Thread.sleep(time);
  25. StringBuilder s = new StringBuilder();
  26. s.append("AsyncService asyncInvoke param:").append(param).append(",sleep:").append(time);
  27. return s.toString();
  28. } catch (InterruptedException e) {
  29. Thread.currentThread().interrupt();
  30. }
  31. return null;
  32. });
  33. }
  34. }

通过 return CompletableFuture.supplyAsync() ,业务执行已从 Dubbo 线程切换到业务线程,避免了对 Dubbo 线程池的阻塞。

2 使用AsyncContext

Dubbo 提供了一个类似 Servlet 3.0 的异步接口AsyncContext,在没有 CompletableFuture 签名接口的情况下,也可以实现 Provider 端的异步执行。

接口定义:

  1. public interface AsyncService {
  2. String sayHello(String name);
  3. }

服务实现:

  1. public class AsyncServiceImpl implements AsyncService {
  2. public String sayHello(String name) {
  3. final AsyncContext asyncContext = RpcContext.startAsync();
  4. new Thread(() -> {
  5. // 如果要使用上下文,则必须要放在第一句执行
  6. asyncContext.signalContextSwitch();
  7. try {
  8. Thread.sleep(500);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. // 写回响应
  13. asyncContext.write("Hello " + name + ", response from provider.");
  14. }).start();
  15. return null;
  16. }
  17. }

Consumer异步

1 使用CompletableFuture

  1. @DubboReference
  2. private AsyncService asyncService;
  3. @Override
  4. public void run(String... args) throws Exception {
  5. //调用异步接口
  6. CompletableFuture<String> future1 = asyncService.asyncInvoke("async call request1");
  7. future1.whenComplete((v, t) -> {
  8. if (t != null) {
  9. t.printStackTrace();
  10. } else {
  11. System.out.println("AsyncTask Response-1: " + v);
  12. }
  13. });
  14. //两次调用并非顺序返回
  15. CompletableFuture<String> future2 = asyncService.asyncInvoke("async call request2");
  16. future2.whenComplete((v, t) -> {
  17. if (t != null) {
  18. t.printStackTrace();
  19. } else {
  20. System.out.println("AsyncTask Response-2: " + v);
  21. }
  22. });
  23. //consumer异步调用
  24. CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
  25. return asyncService.invoke("invoke call request3");
  26. });
  27. future3.whenComplete((v, t) -> {
  28. if (t != null) {
  29. t.printStackTrace();
  30. } else {
  31. System.out.println("AsyncTask Response-3: " + v);
  32. }
  33. });
  34. System.out.println("AsyncTask Executed before response return.");
  35. }

2 使用 RpcContext

在注解中配置:

  1. @DubboReference(async="true")
  2. private AsyncService asyncService;

也可以指定方法级别的异步配置:

  1. @DubboReference(methods = {@Method(name = "sayHello", timeout = 5000)})
  2. private AsyncService asyncService;

接下来的调用即会是异步的:

  1. // 此调用会立即返回null
  2. asyncService.sayHello("world");
  3. // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
  4. CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();
  5. // 为Future添加回调
  6. helloFuture.whenComplete((retValue, exception) -> {
  7. if (exception == null) {
  8. System.out.println(retValue);
  9. } else {
  10. exception.printStackTrace();
  11. }
  12. });

或者,也可以这样做异步调用

  1. CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall(
  2. () -> {
  3. asyncService.sayHello("oneway call request1");
  4. }
  5. );
  6. future.get();

异步总是不等待返回,你也可以设置是否等待消息发出

  • sent="true" 等待消息发出,消息发送失败将抛出异常。
  • sent="false" 不等待消息发出,将消息放入 IO 队列,即刻返回。
  1. @DubboReference(methods = {@Method(name = "sayHello", timeout = 5000 sent = true)})
  2. private AsyncService asyncService;

如果你只是想异步,完全忽略返回值,可以配置 return="false",以减少 Future 对象的创建和管理成本

  1. @DubboReference(methods = {@Method(name = "sayHello", timeout = 5000 return = false)})
  2. private AsyncService asyncService;

最后修改 September 13, 2024: Refactor website structure (#2860) (1a4b998f54b)