Asynchronous Call

In certain situations, we hope Dubbo interfaces can be called asynchronously to avoid unnecessary waiting.

Dubbo asynchronous calls can be divided into Provider-side asynchronous calls and Consumer-side asynchronous calls.

  • Consumer-side asynchronous means that after initiating an RPC call, it returns immediately, allowing the calling thread to continue processing other business logic. When the response returns, a callback function is used to notify the consumer of the result.
  • Provider-side asynchronous execution moves blocking business from Dubbo’s internal thread pool to a user-defined thread, avoiding excessive occupation of the Dubbo thread pool, which helps prevent interference between different services.

Below is a working example diagram of consumer asynchronous calls:

/user-guide/images/future.jpg

Provider-side asynchronous execution and Consumer-side asynchronous calls are mutually independent, and you can combine the configurations of both sides in any orthogonal manner.

  • Consumer synchronous - Provider synchronous
  • Consumer asynchronous - Provider synchronous
  • Consumer synchronous - Provider asynchronous
  • Consumer asynchronous - Provider asynchronous

Please refer to the complete example source code demonstrated in this document:

Provider Asynchronous

1 Using CompletableFuture

Interface Definition:

  1. public interface AsyncService {
  2. /**
  3. * Synchronous call method
  4. */
  5. String invoke(String param);
  6. /**
  7. * Asynchronous call method
  8. */
  9. CompletableFuture<String> asyncInvoke(String param);
  10. }

Service Implementation:

  1. @DubboService
  2. public class AsyncServiceImpl implements AsyncService {
  3. @Override
  4. public String invoke(String param) {
  5. try {
  6. long time = ThreadLocalRandom.current().nextLong(1000);
  7. Thread.sleep(time);
  8. StringBuilder s = new StringBuilder();
  9. s.append("AsyncService invoke param:").append(param).append(",sleep:").append(time);
  10. return s.toString();
  11. }
  12. catch (InterruptedException e) {
  13. Thread.currentThread().interrupt();
  14. }
  15. return null;
  16. }
  17. @Override
  18. public CompletableFuture<String> asyncInvoke(String param) {
  19. // It is recommended to provide a custom thread pool for supplyAsync
  20. return CompletableFuture.supplyAsync(() -> {
  21. try {
  22. // Do something
  23. long time = ThreadLocalRandom.current().nextLong(1000);
  24. Thread.sleep(time);
  25. StringBuilder s = new StringBuilder();
  26. s.append("AsyncService asyncInvoke param:").append(param).append(",sleep:").append(time);
  27. return s.toString();
  28. } catch (InterruptedException e) {
  29. Thread.currentThread().interrupt();
  30. }
  31. return null;
  32. });
  33. }
  34. }

By returning CompletableFuture.supplyAsync(), the business execution has switched from the Dubbo thread to the business thread, avoiding blocking the Dubbo thread pool.

2 Using AsyncContext

Dubbo provides an asynchronous interface AsyncContext similar to Servlet 3.0, which can also achieve Provider-side asynchronous execution without a CompletableFuture signature interface.

Interface Definition:

  1. public interface AsyncService {
  2. String sayHello(String name);
  3. }

Service Implementation:

  1. public class AsyncServiceImpl implements AsyncService {
  2. public String sayHello(String name) {
  3. final AsyncContext asyncContext = RpcContext.startAsync();
  4. new Thread(() -> {
  5. // If you want to use the context, it must be executed on the first line
  6. asyncContext.signalContextSwitch();
  7. try {
  8. Thread.sleep(500);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. // Write back the response
  13. asyncContext.write("Hello " + name + ", response from provider.");
  14. }).start();
  15. return null;
  16. }
  17. }

Consumer Asynchronous

1 Using CompletableFuture

  1. @DubboReference
  2. private AsyncService asyncService;
  3. @Override
  4. public void run(String... args) throws Exception {
  5. // Call asynchronous interface
  6. CompletableFuture<String> future1 = asyncService.asyncInvoke("async call request1");
  7. future1.whenComplete((v, t) -> {
  8. if (t != null) {
  9. t.printStackTrace();
  10. } else {
  11. System.out.println("AsyncTask Response-1: " + v);
  12. }
  13. });
  14. // Two calls do not return in order
  15. CompletableFuture<String> future2 = asyncService.asyncInvoke("async call request2");
  16. future2.whenComplete((v, t) -> {
  17. if (t != null) {
  18. t.printStackTrace();
  19. } else {
  20. System.out.println("AsyncTask Response-2: " + v);
  21. }
  22. });
  23. // Consumer asynchronous call
  24. CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
  25. return asyncService.invoke("invoke call request3");
  26. });
  27. future3.whenComplete((v, t) -> {
  28. if (t != null) {
  29. t.printStackTrace();
  30. } else {
  31. System.out.println("AsyncTask Response-3: " + v);
  32. }
  33. });
  34. System.out.println("AsyncTask Executed before response return.");
  35. }

2 Using RpcContext

Configure in the annotation:

  1. @DubboReference(async="true")
  2. private AsyncService asyncService;

You can also specify method-level asynchronous configurations:

  1. @DubboReference(methods = {@Method(name = "sayHello", timeout = 5000)})
  2. private AsyncService asyncService;

The subsequent calls will be asynchronous:

  1. // This call will immediately return null
  2. asyncService.sayHello("world");
  3. // Get the Future reference of the call, which will be notified and set to this Future when the result returns
  4. CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();
  5. // Add a callback to the Future
  6. helloFuture.whenComplete((retValue, exception) -> {
  7. if (exception == null) {
  8. System.out.println(retValue);
  9. } else {
  10. exception.printStackTrace();
  11. }
  12. });

Or, you can also do asynchronous calls this way

  1. CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall(
  2. () -> {
  3. asyncService.sayHello("oneway call request1");
  4. }
  5. );
  6. future.get();

Asynchronous calls never wait for a return, and you can also set whether to wait for the message to be sent

  • sent="true" waits for the message to be sent; an exception will be thrown if the message sending fails.
  • sent="false" does not wait for the message to be sent, puts the message into the IO queue, and returns immediately.
  1. @DubboReference(methods = {@Method(name = "sayHello", timeout = 5000, sent = true)})
  2. private AsyncService asyncService;

If you just want to be asynchronous and completely ignore the return value, you can configure return="false" to reduce the creation and management cost of Future objects

  1. @DubboReference(methods = {@Method(name = "sayHello", timeout = 5000, return = false)})
  2. private AsyncService asyncService;

Feedback

Was this page helpful?

Yes No

Last modified September 30, 2024: Update & Translate Overview Docs (#3040) (d37ebceaea7)