Dapr Java SDK

Java SDK packages for developing Dapr applications

Prerequisites

Complete initial setup and import the Java SDK into your project

Building blocks

The Java SDK allows you to interface with all of the Dapr building blocks.

Invoke a service

  1. import io.dapr.client.DaprClient;
  2. import io.dapr.client.DaprClientBuilder;
  3. try (DaprClient client = (new DaprClientBuilder()).build()) {
  4. // invoke a 'GET' method (HTTP) skipping serialization: \say with a Mono<byte[]> return type
  5. // for gRPC set HttpExtension.NONE parameters below
  6. response = client.invokeMethod(SERVICE_TO_INVOKE, METHOD_TO_INVOKE, "{\"name\":\"World!\"}", HttpExtension.GET, byte[].class).block();
  7. // invoke a 'POST' method (HTTP) skipping serialization: to \say with a Mono<byte[]> return type
  8. response = client.invokeMethod(SERVICE_TO_INVOKE, METHOD_TO_INVOKE, "{\"id\":\"100\", \"FirstName\":\"Value\", \"LastName\":\"Value\"}", HttpExtension.POST, byte[].class).block();
  9. System.out.println(new String(response));
  10. // invoke a 'POST' method (HTTP) with serialization: \employees with a Mono<Employee> return type
  11. Employee newEmployee = new Employee("Nigel", "Guitarist");
  12. Employee employeeResponse = client.invokeMethod(SERVICE_TO_INVOKE, "employees", newEmployee, HttpExtension.POST, Employee.class).block();
  13. }

Save & get application state

  1. import io.dapr.client.DaprClient;
  2. import io.dapr.client.DaprClientBuilder;
  3. import io.dapr.client.domain.State;
  4. import reactor.core.publisher.Mono;
  5. try (DaprClient client = (new DaprClientBuilder()).build()) {
  6. // Save state
  7. client.saveState(STATE_STORE_NAME, FIRST_KEY_NAME, myClass).block();
  8. // Get state
  9. State<MyClass> retrievedMessage = client.getState(STATE_STORE_NAME, FIRST_KEY_NAME, MyClass.class).block();
  10. // Delete state
  11. client.deleteState(STATE_STORE_NAME, FIRST_KEY_NAME).block();
  12. }

Publish & subscribe to messages

Publish messages
  1. import io.dapr.client.DaprClient;
  2. import io.dapr.client.DaprClientBuilder;
  3. import io.dapr.client.domain.Metadata;
  4. import static java.util.Collections.singletonMap;
  5. try (DaprClient client = (new DaprClientBuilder()).build()) {
  6. client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message, singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
  7. }
Subscribe to messages
  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import io.dapr.Topic;
  3. import io.dapr.client.domain.BulkSubscribeAppResponse;
  4. import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
  5. import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
  6. import io.dapr.client.domain.BulkSubscribeMessage;
  7. import io.dapr.client.domain.BulkSubscribeMessageEntry;
  8. import io.dapr.client.domain.CloudEvent;
  9. import io.dapr.springboot.annotations.BulkSubscribe;
  10. import org.springframework.web.bind.annotation.PostMapping;
  11. import org.springframework.web.bind.annotation.RequestBody;
  12. import org.springframework.web.bind.annotation.RestController;
  13. import reactor.core.publisher.Mono;
  14. @RestController
  15. public class SubscriberController {
  16. private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  17. @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
  18. @PostMapping(path = "/testingtopic")
  19. public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<?> cloudEvent) {
  20. return Mono.fromRunnable(() -> {
  21. try {
  22. System.out.println("Subscriber got: " + cloudEvent.getData());
  23. System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
  24. } catch (Exception e) {
  25. throw new RuntimeException(e);
  26. }
  27. });
  28. }
  29. @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
  30. rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
  31. @PostMapping(path = "/testingtopicV2")
  32. public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
  33. return Mono.fromRunnable(() -> {
  34. try {
  35. System.out.println("Subscriber got: " + cloudEvent.getData());
  36. System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
  37. } catch (Exception e) {
  38. throw new RuntimeException(e);
  39. }
  40. });
  41. }
  42. @BulkSubscribe()
  43. @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
  44. @PostMapping(path = "/testingtopicbulk")
  45. public Mono<BulkSubscribeAppResponse> handleBulkMessage(
  46. @RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
  47. return Mono.fromCallable(() -> {
  48. if (bulkMessage.getEntries().size() == 0) {
  49. return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
  50. }
  51. System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
  52. List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
  53. for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
  54. try {
  55. System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
  56. CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
  57. System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
  58. entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
  62. }
  63. }
  64. return new BulkSubscribeAppResponse(entries);
  65. });
  66. }
  67. }
Bulk Publish Messages

Note: API is in Alpha stage

  1. import io.dapr.client.DaprClientBuilder;
  2. import io.dapr.client.DaprPreviewClient;
  3. import io.dapr.client.domain.BulkPublishResponse;
  4. import io.dapr.client.domain.BulkPublishResponseFailedEntry;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. class Solution {
  8. public void publishMessages() {
  9. try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
  10. // Create a list of messages to publish
  11. List<String> messages = new ArrayList<>();
  12. for (int i = 0; i < NUM_MESSAGES; i++) {
  13. String message = String.format("This is message #%d", i);
  14. messages.add(message);
  15. System.out.println("Going to publish message : " + message);
  16. }
  17. // Publish list of messages using the bulk publish API
  18. BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block()
  19. }
  20. }
  21. }

Interact with output bindings

  1. import io.dapr.client.DaprClient;
  2. import io.dapr.client.DaprClientBuilder;
  3. try (DaprClient client = (new DaprClientBuilder()).build()) {
  4. // sending a class with message; BINDING_OPERATION="create"
  5. client.invokeBinding(BINDING_NAME, BINDING_OPERATION, myClass).block();
  6. // sending a plain string
  7. client.invokeBinding(BINDING_NAME, BINDING_OPERATION, message).block();
  8. }

Interact with input bindings

  1. import org.springframework.web.bind.annotation.*;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. @RestController
  5. @RequestMapping("/")
  6. public class myClass {
  7. private static final Logger log = LoggerFactory.getLogger(myClass);
  8. @PostMapping(path = "/checkout")
  9. public Mono<String> getCheckout(@RequestBody(required = false) byte[] body) {
  10. return Mono.fromRunnable(() ->
  11. log.info("Received Message: " + new String(body)));
  12. }
  13. }

Retrieve secrets

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import java.util.Map;
  5. try (DaprClient client = (new DaprClientBuilder()).build()) {
  6. Map<String, String> secret = client.getSecret(SECRET_STORE_NAME, secretKey).block();
  7. System.out.println(JSON_SERIALIZER.writeValueAsString(secret));
  8. }

Actors

An actor is an isolated, independent unit of compute and state with single-threaded execution. Dapr provides an actor implementation based on the Virtual Actor pattern, which provides a single-threaded programming model and where actors are garbage collected when not in use. With Dapr’s implementaiton, you write your Dapr actors according to the Actor model, and Dapr leverages the scalability and reliability that the underlying platform provides.

  1. import io.dapr.actors.ActorMethod;
  2. import io.dapr.actors.ActorType;
  3. import reactor.core.publisher.Mono;
  4. @ActorType(name = "DemoActor")
  5. public interface DemoActor {
  6. void registerReminder();
  7. @ActorMethod(name = "echo_message")
  8. String say(String something);
  9. void clock(String message);
  10. @ActorMethod(returns = Integer.class)
  11. Mono<Integer> incrementAndGet(int delta);
  12. }

Get & Subscribe to application configurations

Note this is a preview API and thus will only be accessible via the DaprPreviewClient interface and not the normal DaprClient interface

  1. import io.dapr.client.DaprClientBuilder;
  2. import io.dapr.client.DaprPreviewClient;
  3. import io.dapr.client.domain.ConfigurationItem;
  4. import io.dapr.client.domain.GetConfigurationRequest;
  5. import io.dapr.client.domain.SubscribeConfigurationRequest;
  6. import reactor.core.publisher.Flux;
  7. import reactor.core.publisher.Mono;
  8. try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
  9. // Get configuration for a single key
  10. Mono<ConfigurationItem> item = client.getConfiguration(CONFIG_STORE_NAME, CONFIG_KEY).block();
  11. // Get configurations for multiple keys
  12. Mono<Map<String, ConfigurationItem>> items =
  13. client.getConfiguration(CONFIG_STORE_NAME, CONFIG_KEY_1, CONFIG_KEY_2);
  14. // Subscribe to configuration changes
  15. Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(CONFIG_STORE_NAME, CONFIG_KEY_1, CONFIG_KEY_2);
  16. outFlux.subscribe(configItems -> configItems.forEach(...));
  17. // Unsubscribe from configuration changes
  18. Mono<UnsubscribeConfigurationResponse> unsubscribe = client.unsubscribeConfiguration(SUBSCRIPTION_ID, CONFIG_STORE_NAME)
  19. }

Query saved state

Note this is a preview API and thus will only be accessible via the DaprPreviewClient interface and not the normal DaprClient interface

  1. import io.dapr.client.DaprClient;
  2. import io.dapr.client.DaprClientBuilder;
  3. import io.dapr.client.DaprPreviewClient;
  4. import io.dapr.client.domain.QueryStateItem;
  5. import io.dapr.client.domain.QueryStateRequest;
  6. import io.dapr.client.domain.QueryStateResponse;
  7. import io.dapr.client.domain.query.Query;
  8. import io.dapr.client.domain.query.Sorting;
  9. import io.dapr.client.domain.query.filters.EqFilter;
  10. try (DaprClient client = builder.build(); DaprPreviewClient previewClient = builder.buildPreviewClient()) {
  11. String searchVal = args.length == 0 ? "searchValue" : args[0];
  12. // Create JSON data
  13. Listing first = new Listing();
  14. first.setPropertyType("apartment");
  15. first.setId("1000");
  16. ...
  17. Listing second = new Listing();
  18. second.setPropertyType("row-house");
  19. second.setId("1002");
  20. ...
  21. Listing third = new Listing();
  22. third.setPropertyType("apartment");
  23. third.setId("1003");
  24. ...
  25. Listing fourth = new Listing();
  26. fourth.setPropertyType("apartment");
  27. fourth.setId("1001");
  28. ...
  29. Map<String, String> meta = new HashMap<>();
  30. meta.put("contentType", "application/json");
  31. // Save state
  32. SaveStateRequest request = new SaveStateRequest(STATE_STORE_NAME).setStates(
  33. new State<>("1", first, null, meta, null),
  34. new State<>("2", second, null, meta, null),
  35. new State<>("3", third, null, meta, null),
  36. new State<>("4", fourth, null, meta, null)
  37. );
  38. client.saveBulkState(request).block();
  39. // Create query and query state request
  40. Query query = new Query()
  41. .setFilter(new EqFilter<>("propertyType", "apartment"))
  42. .setSort(Arrays.asList(new Sorting("id", Sorting.Order.DESC)));
  43. QueryStateRequest request = new QueryStateRequest(STATE_STORE_NAME)
  44. .setQuery(query);
  45. // Use preview client to call query state API
  46. QueryStateResponse<MyData> result = previewClient.queryState(request, MyData.class).block();
  47. // View Query state response
  48. System.out.println("Found " + result.getResults().size() + " items.");
  49. for (QueryStateItem<Listing> item : result.getResults()) {
  50. System.out.println("Key: " + item.getKey());
  51. System.out.println("Data: " + item.getValue());
  52. }
  53. }

Distributed lock

  1. package io.dapr.examples.lock.grpc;
  2. import io.dapr.client.DaprClientBuilder;
  3. import io.dapr.client.DaprPreviewClient;
  4. import io.dapr.client.domain.LockRequest;
  5. import io.dapr.client.domain.UnlockRequest;
  6. import io.dapr.client.domain.UnlockResponseStatus;
  7. import reactor.core.publisher.Mono;
  8. public class DistributedLockGrpcClient {
  9. private static final String LOCK_STORE_NAME = "lockstore";
  10. /**
  11. * Executes various methods to check the different apis.
  12. *
  13. * @param args arguments
  14. * @throws Exception throws Exception
  15. */
  16. public static void main(String[] args) throws Exception {
  17. try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
  18. System.out.println("Using preview client...");
  19. tryLock(client);
  20. unlock(client);
  21. }
  22. }
  23. /**
  24. * Trying to get lock.
  25. *
  26. * @param client DaprPreviewClient object
  27. */
  28. public static void tryLock(DaprPreviewClient client) {
  29. System.out.println("*******trying to get a free distributed lock********");
  30. try {
  31. LockRequest lockRequest = new LockRequest(LOCK_STORE_NAME, "resouce1", "owner1", 5);
  32. Mono<Boolean> result = client.tryLock(lockRequest);
  33. System.out.println("Lock result -> " + (Boolean.TRUE.equals(result.block()) ? "SUCCESS" : "FAIL"));
  34. } catch (Exception ex) {
  35. System.out.println(ex.getMessage());
  36. }
  37. }
  38. /**
  39. * Unlock a lock.
  40. *
  41. * @param client DaprPreviewClient object
  42. */
  43. public static void unlock(DaprPreviewClient client) {
  44. System.out.println("*******unlock a distributed lock********");
  45. try {
  46. UnlockRequest unlockRequest = new UnlockRequest(LOCK_STORE_NAME, "resouce1", "owner1");
  47. Mono<UnlockResponseStatus> result = client.unlock(unlockRequest);
  48. System.out.println("Unlock result ->" + result.block().name());
  49. } catch (Exception ex) {
  50. System.out.println(ex.getMessage());
  51. }
  52. }
  53. }

Workflow

Dapr Workflow is currently in beta state.

  1. package io.dapr.examples.workflows;
  2. import io.dapr.workflows.client.DaprWorkflowClient;
  3. import io.dapr.workflows.client.WorkflowInstanceStatus;
  4. import java.time.Duration;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * For setup instructions, see the README.
  9. */
  10. public class DemoWorkflowClient {
  11. /**
  12. * The main method.
  13. *
  14. * @param args Input arguments (unused).
  15. * @throws InterruptedException If program has been interrupted.
  16. */
  17. public static void main(String[] args) throws InterruptedException {
  18. DaprWorkflowClient client = new DaprWorkflowClient();
  19. try (client) {
  20. String separatorStr = "*******";
  21. System.out.println(separatorStr);
  22. String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
  23. System.out.printf("Started new workflow instance with random ID: %s%n", instanceId);
  24. System.out.println(separatorStr);
  25. System.out.println("**GetInstanceMetadata:Running Workflow**");
  26. WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);
  27. System.out.printf("Result: %s%n", workflowMetadata);
  28. System.out.println(separatorStr);
  29. System.out.println("**WaitForInstanceStart**");
  30. try {
  31. WorkflowInstanceStatus waitForInstanceStartResult =
  32. client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
  33. System.out.printf("Result: %s%n", waitForInstanceStartResult);
  34. } catch (TimeoutException ex) {
  35. System.out.printf("waitForInstanceStart has an exception:%s%n", ex);
  36. }
  37. System.out.println(separatorStr);
  38. System.out.println("**SendExternalMessage**");
  39. client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
  40. System.out.println(separatorStr);
  41. System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **");
  42. client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
  43. client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
  44. client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
  45. System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId);
  46. System.out.println(separatorStr);
  47. System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **");
  48. client.raiseEvent(instanceId, "e2", "event 2 Payload");
  49. System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId);
  50. System.out.println(separatorStr);
  51. System.out.println("**WaitForInstanceCompletion**");
  52. try {
  53. WorkflowInstanceStatus waitForInstanceCompletionResult =
  54. client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
  55. System.out.printf("Result: %s%n", waitForInstanceCompletionResult);
  56. } catch (TimeoutException ex) {
  57. System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex);
  58. }
  59. System.out.println(separatorStr);
  60. System.out.println("**purgeInstance**");
  61. boolean purgeResult = client.purgeInstance(instanceId);
  62. System.out.printf("purgeResult: %s%n", purgeResult);
  63. System.out.println(separatorStr);
  64. System.out.println("**raiseEvent**");
  65. String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
  66. System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId);
  67. client.raiseEvent(eventInstanceId, "TestException", null);
  68. System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId);
  69. System.out.println(separatorStr);
  70. String instanceToTerminateId = "terminateMe";
  71. client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId);
  72. System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId);
  73. TimeUnit.SECONDS.sleep(5);
  74. System.out.println("Terminate this workflow instance manually before the timeout is reached");
  75. client.terminateWorkflow(instanceToTerminateId, null);
  76. System.out.println(separatorStr);
  77. String restartingInstanceId = "restarting";
  78. client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId);
  79. System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId);
  80. System.out.println("Sleeping 30 seconds to restart the workflow");
  81. TimeUnit.SECONDS.sleep(30);
  82. System.out.println("**SendExternalMessage: RestartEvent**");
  83. client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
  84. System.out.println("Sleeping 30 seconds to terminate the eternal workflow");
  85. TimeUnit.SECONDS.sleep(30);
  86. client.terminateWorkflow(restartingInstanceId, null);
  87. }
  88. System.out.println("Exiting DemoWorkflowClient.");
  89. System.exit(0);
  90. }
  91. }