指南:保存和获取状态

使用键值对来持久化状态

状态管理是任何应用程序最常见的需求之一:无论是新是旧,是单体还是微服务。 处理和测试不同的数据库库,并处理重试和故障可能既困难又耗时。

在本指南中,您将学习如何使用键/值状态 API 的基础知识,以允许应用程序保存、获取和删除状态。

如何使用Dapr扩展来开发和运行Dapr应用程序

下面的代码示例 大致 描述了一个使用具有 Dapr sidecar 的订单处理服务来处理订单的应用程序。 订单处理服务使用 Dapr 在 Redis 状态存储中存储状态。

Diagram showing state management of example service

建立一个状态存储

状态存储组件代表Dapr用来与数据库进行通信的资源。

为了本指南的目的,我们将使用一个Redis状态存储,但是来自支持列表的任何状态存储都可以使用。

当你在自托管模式下运行dapr init时,Dapr会创建一个默认的Redisstatestore.yaml并在你的本地机器上运行一个Redis状态存储,它位于:

  • 在Windows上,在 %UserProfile%\.dapr\components\statestore.yaml
  • 在Linux/MacOS上,在~/.dapr/components/statestore.yaml

通过 statestore.yaml 组件,您可以轻松更换底层组件,而无需更改应用程序代码。

要将其部署到Kubernetes集群中,请在下面的YAML中填写您的状态存储组件metadata连接详细信息,另存为statestore.yaml,然后运行kubectl apply -f statestore.yaml

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: statestore
  5. spec:
  6. type: state.redis
  7. version: v1
  8. metadata:
  9. - name: redisHost
  10. value: localhost:6379
  11. - name: redisPassword
  12. value: ""

请参阅 如何在 Kubernetes 上设置不同的状态存储

重要

设置一个app-id,因为状态键是以这个值为前缀的。 如果您不设置app-id,系统会在运行时为您生成一个。 下次运行该命令时,将生成一个新的app-id,您将不再能访问先前保存的状态。

保存和检索单个状态

以下示例演示如何使用 Dapr 状态管理 API 保存和检索单个键/值对。

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net.Http;
  5. using System.Net.Http.Headers;
  6. using System.Threading.Tasks;
  7. using Dapr.Client;
  8. using Microsoft.AspNetCore.Mvc;
  9. using System.Threading;
  10. using System.Text.Json;
  11. //code
  12. namespace EventService
  13. {
  14. class Program
  15. {
  16. static async Task Main(string[] args)
  17. {
  18. string DAPR_STORE_NAME = "statestore";
  19. while(true) {
  20. System.Threading.Thread.Sleep(5000);
  21. using var client = new DaprClientBuilder().Build();
  22. Random random = new Random();
  23. int orderId = random.Next(1,1000);
  24. //Using Dapr SDK to save and get state
  25. await client.SaveStateAsync(DAPR_STORE_NAME, "order_1", orderId.ToString());
  26. await client.SaveStateAsync(DAPR_STORE_NAME, "order_2", orderId.ToString());
  27. var result = await client.GetStateAsync<string>(DAPR_STORE_NAME, "order_1");
  28. Console.WriteLine("Result after get: " + result);
  29. }
  30. }
  31. }
  32. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.State;
  5. import io.dapr.client.domain.TransactionalStateOperation;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import reactor.core.publisher.Mono;
  10. import java.util.Random;
  11. import java.util.concurrent.TimeUnit;
  12. //code
  13. @SpringBootApplication
  14. public class OrderProcessingServiceApplication {
  15. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  16. private static final String STATE_STORE_NAME = "statestore";
  17. public static void main(String[] args) throws InterruptedException{
  18. while(true) {
  19. TimeUnit.MILLISECONDS.sleep(5000);
  20. Random random = new Random();
  21. int orderId = random.nextInt(1000-1) + 1;
  22. DaprClient client = new DaprClientBuilder().build();
  23. //Using Dapr SDK to save and get state
  24. client.saveState(STATE_STORE_NAME, "order_1", Integer.toString(orderId)).block();
  25. client.saveState(STATE_STORE_NAME, "order_2", Integer.toString(orderId)).block();
  26. Mono<State<String>> result = client.getState(STATE_STORE_NAME, "order_1", String.class);
  27. log.info("Result after get" + result);
  28. }
  29. }
  30. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. import random
  3. from time import sleep
  4. import requests
  5. import logging
  6. from dapr.clients import DaprClient
  7. from dapr.clients.grpc._state import StateItem
  8. from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
  9. #code
  10. logging.basicConfig(level = logging.INFO)
  11. DAPR_STORE_NAME = "statestore"
  12. while True:
  13. sleep(random.randrange(50, 5000) / 1000)
  14. orderId = random.randint(1, 1000)
  15. with DaprClient() as client:
  16. #Using Dapr SDK to save and get state
  17. client.save_state(DAPR_STORE_NAME, "order_1", str(orderId))
  18. result = client.get_state(DAPR_STORE_NAME, "order_1")
  19. logging.info('Result after get: ' + result.data.decode('utf-8'))

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
  1. // dependencies
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "strconv"
  7. "time"
  8. dapr "github.com/dapr/go-sdk/client"
  9. )
  10. // code
  11. func main() {
  12. const STATE_STORE_NAME = "statestore"
  13. rand.Seed(time.Now().UnixMicro())
  14. for i := 0; i < 10; i++ {
  15. orderId := rand.Intn(1000-1) + 1
  16. client, err := dapr.NewClient()
  17. if err != nil {
  18. panic(err)
  19. }
  20. defer client.Close()
  21. ctx := context.Background()
  22. err = client.SaveState(ctx, STATE_STORE_NAME, "order_1", []byte(strconv.Itoa(orderId)), nil)
  23. if err != nil {
  24. panic(err)
  25. }
  26. result, err := client.GetState(ctx, STATE_STORE_NAME, "order_1", nil)
  27. if err != nil {
  28. panic(err)
  29. }
  30. log.Println("Result after get:", string(result.Value))
  31. time.Sleep(2 * time.Second)
  32. }
  33. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
  1. //dependencies
  2. import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. var main = function() {
  6. for(var i=0;i<10;i++) {
  7. sleep(5000);
  8. var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
  9. start(orderId).catch((e) => {
  10. console.error(e);
  11. process.exit(1);
  12. });
  13. }
  14. }
  15. async function start(orderId) {
  16. const client = new DaprClient({
  17. daprHost,
  18. daprPort: process.env.DAPR_HTTP_PORT,
  19. communicationProtocol: CommunicationProtocolEnum.HTTP,
  20. });
  21. const STATE_STORE_NAME = "statestore";
  22. //Using Dapr SDK to save and get state
  23. await client.state.save(STATE_STORE_NAME, [
  24. {
  25. key: "order_1",
  26. value: orderId.toString()
  27. },
  28. {
  29. key: "order_2",
  30. value: orderId.toString()
  31. }
  32. ]);
  33. var result = await client.state.get(STATE_STORE_NAME, "order_1");
  34. console.log("Result after get: " + result);
  35. }
  36. function sleep(ms) {
  37. return new Promise(resolve => setTimeout(resolve, ms));
  38. }
  39. main();

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

启动 Dapr Sidecar:

  1. dapr run --app-id orderprocessing --dapr-http-port 3601

在一个单独的终端中,将一个键/值对保存到你的状态存储中:

  1. curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250"}]' http://localhost:3601/v1.0/state/statestore

现在获取你刚才保存的状态:

  1. curl http://localhost:3601/v1.0/state/statestore/order_1

重启你的sidecar,然后再次尝试检索状态,看看存储的状态是否与应用状态保持一致。

启动 Dapr Sidecar:

  1. dapr --app-id orderprocessing --dapr-http-port 3601 run

在一个单独的终端中,将一个键/值对保存到你的状态存储中:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{"key": "order_1", "value": "250"}]' -Uri 'http://localhost:3601/v1.0/state/statestore'

现在获取你刚才保存的状态:

  1. Invoke-RestMethod -Uri 'http://localhost:3601/v1.0/state/statestore/order_1'

重启你的sidecar,然后再次尝试检索状态,看看存储的状态是否与应用状态保持一致。

Delete state

下面是利用 Dapr SDKs 删除状态的代码例子。

  1. //dependencies
  2. using Dapr.Client;
  3. //code
  4. namespace EventService
  5. {
  6. class Program
  7. {
  8. static async Task Main(string[] args)
  9. {
  10. string DAPR_STORE_NAME = "statestore";
  11. //Using Dapr SDK to delete the state
  12. using var client = new DaprClientBuilder().Build();
  13. await client.DeleteStateAsync(DAPR_STORE_NAME, "order_1", cancellationToken: cancellationToken);
  14. }
  15. }
  16. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. //code
  6. @SpringBootApplication
  7. public class OrderProcessingServiceApplication {
  8. public static void main(String[] args) throws InterruptedException{
  9. String STATE_STORE_NAME = "statestore";
  10. //Using Dapr SDK to delete the state
  11. DaprClient client = new DaprClientBuilder().build();
  12. String storedEtag = client.getState(STATE_STORE_NAME, "order_1", String.class).block().getEtag();
  13. client.deleteState(STATE_STORE_NAME, "order_1", storedEtag, null).block();
  14. }
  15. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
  3. #code
  4. logging.basicConfig(level = logging.INFO)
  5. DAPR_STORE_NAME = "statestore"
  6. #Using Dapr SDK to delete the state
  7. with DaprClient() as client:
  8. client.delete_state(store_name=DAPR_STORE_NAME, key="order_1")

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
  1. //dependencies
  2. import (
  3. "context"
  4. dapr "github.com/dapr/go-sdk/client"
  5. )
  6. //code
  7. func main() {
  8. STATE_STORE_NAME := "statestore"
  9. //Using Dapr SDK to delete the state
  10. client, err := dapr.NewClient()
  11. if err != nil {
  12. panic(err)
  13. }
  14. defer client.Close()
  15. ctx := context.Background()
  16. if err := client.DeleteState(ctx, STATE_STORE_NAME, "order_1"); err != nil {
  17. panic(err)
  18. }
  19. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
  1. //dependencies
  2. import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. var main = function() {
  6. const STATE_STORE_NAME = "statestore";
  7. //Using Dapr SDK to save and get state
  8. const client = new DaprClient({
  9. daprHost,
  10. daprPort: process.env.DAPR_HTTP_PORT,
  11. communicationProtocol: CommunicationProtocolEnum.HTTP,
  12. });
  13. await client.state.delete(STATE_STORE_NAME, "order_1");
  14. }
  15. main();

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

用上面运行的同一个Dapr实例运行:

  1. curl -X DELETE 'http://localhost:3601/v1.0/state/statestore/order_1'

再次尝试获取状态。 请注意,不会返回任何值。

用上面运行的同一个Dapr实例运行:

  1. Invoke-RestMethod -Method Delete -Uri 'http://localhost:3601/v1.0/state/statestore/order_1'

再次尝试获取状态。 请注意,不会返回任何值。

保存和检索多个状态

下面是利用 Dapr SDK 保存和检索多个状态的代码示例。

  1. //dependencies
  2. using Dapr.Client;
  3. //code
  4. namespace EventService
  5. {
  6. class Program
  7. {
  8. static async Task Main(string[] args)
  9. {
  10. string DAPR_STORE_NAME = "statestore";
  11. //Using Dapr SDK to retrieve multiple states
  12. using var client = new DaprClientBuilder().Build();
  13. IReadOnlyList<BulkStateItem> mulitpleStateResult = await client.GetBulkStateAsync(DAPR_STORE_NAME, new List<string> { "order_1", "order_2" }, parallelism: 1);
  14. }
  15. }
  16. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run

上面的示例返回一个BulkStateItem,其中包含您保存到状态中的值的序列化格式。 如果您希望该值在SDK中反序列化每个批量响应项时,请改用以下方法:

  1. //dependencies
  2. using Dapr.Client;
  3. //code
  4. namespace EventService
  5. {
  6. class Program
  7. {
  8. static async Task Main(string[] args)
  9. {
  10. string DAPR_STORE_NAME = "statestore";
  11. //Using Dapr SDK to retrieve multiple states
  12. using var client = new DaprClientBuilder().Build();
  13. IReadOnlyList<BulkStateItem<Widget>> mulitpleStateResult = await client.GetBulkStateAsync<Widget>(DAPR_STORE_NAME, new List<string> { "widget_1", "widget_2" }, parallelism: 1);
  14. }
  15. }
  16. class Widget
  17. {
  18. string Size { get; set; }
  19. string Color { get; set; }
  20. }
  21. }
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.State;
  5. import java.util.Arrays;
  6. //code
  7. @SpringBootApplication
  8. public class OrderProcessingServiceApplication {
  9. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  10. public static void main(String[] args) throws InterruptedException{
  11. String STATE_STORE_NAME = "statestore";
  12. //Using Dapr SDK to retrieve multiple states
  13. DaprClient client = new DaprClientBuilder().build();
  14. Mono<List<State<String>>> resultBulk = client.getBulkState(STATE_STORE_NAME,
  15. Arrays.asList("order_1", "order_2"), String.class);
  16. }
  17. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. from dapr.clients import DaprClient
  3. from dapr.clients.grpc._state import StateItem
  4. #code
  5. logging.basicConfig(level = logging.INFO)
  6. DAPR_STORE_NAME = "statestore"
  7. orderId = 100
  8. #Using Dapr SDK to save and retrieve multiple states
  9. with DaprClient() as client:
  10. client.save_bulk_state(store_name=DAPR_STORE_NAME, states=[StateItem(key="order_2", value=str(orderId))])
  11. result = client.get_bulk_state(store_name=DAPR_STORE_NAME, keys=["order_1", "order_2"], states_metadata={"metakey": "metavalue"}).items
  12. logging.info('Result after get bulk: ' + str(result))

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
  1. //dependencies
  2. import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. var main = function() {
  6. const STATE_STORE_NAME = "statestore";
  7. var orderId = 100;
  8. //Using Dapr SDK to save and retrieve multiple states
  9. const client = new DaprClient({
  10. daprHost,
  11. daprPort: process.env.DAPR_HTTP_PORT,
  12. communicationProtocol: CommunicationProtocolEnum.HTTP,
  13. });
  14. await client.state.save(STATE_STORE_NAME, [
  15. {
  16. key: "order_1",
  17. value: orderId.toString()
  18. },
  19. {
  20. key: "order_2",
  21. value: orderId.toString()
  22. }
  23. ]);
  24. result = await client.state.getBulk(STATE_STORE_NAME, ["order_1", "order_2"]);
  25. }
  26. main();

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

用上面运行的同一个Dapr实例将两个键/值对保存到你的状态存储中。

  1. curl -X POST -H "Content-Type: application/json" -d '[{ "key": "order_1", "value": "250"}, { "key": "order_2", "value": "550"}]' http://localhost:3601/v1.0/state/statestore

现在获取你刚才保存的状态:

  1. curl -X POST -H "Content-Type: application/json" -d '{"keys":["order_1", "order_2"]}' http://localhost:3601/v1.0/state/statestore/bulk

用上面运行的同一个Dapr实例将两个键/值对保存到你的状态存储中。

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '[{ "key": "order_1", "value": "250"}, { "key": "order_2", "value": "550"}]' -Uri 'http://localhost:3601/v1.0/state/statestore'

现在获取你刚才保存的状态:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["order_1", "order_2"]}' -Uri 'http://localhost:3601/v1.0/state/statestore/bulk'

执行状态事务性操作

注意

状态事务性操作需要一个支持multi-item transactions的状态存储。 在支持的状态存储引擎页面查看完整列表。

下面是利用 Dapr SDK 执行状态事务的代码示例。

  1. //dependencies
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Net.Http;
  5. using System.Net.Http.Headers;
  6. using System.Threading.Tasks;
  7. using Dapr.Client;
  8. using Microsoft.AspNetCore.Mvc;
  9. using System.Threading;
  10. using System.Text.Json;
  11. //code
  12. namespace EventService
  13. {
  14. class Program
  15. {
  16. static async Task Main(string[] args)
  17. {
  18. string DAPR_STORE_NAME = "statestore";
  19. while(true) {
  20. System.Threading.Thread.Sleep(5000);
  21. Random random = new Random();
  22. int orderId = random.Next(1,1000);
  23. using var client = new DaprClientBuilder().Build();
  24. var requests = new List<StateTransactionRequest>()
  25. {
  26. new StateTransactionRequest("order_3", JsonSerializer.SerializeToUtf8Bytes(orderId.ToString()), StateOperationType.Upsert),
  27. new StateTransactionRequest("order_2", null, StateOperationType.Delete)
  28. };
  29. CancellationTokenSource source = new CancellationTokenSource();
  30. CancellationToken cancellationToken = source.Token;
  31. //Using Dapr SDK to perform the state transactions
  32. await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, requests, cancellationToken: cancellationToken);
  33. Console.WriteLine("Order requested: " + orderId);
  34. Console.WriteLine("Result: " + result);
  35. }
  36. }
  37. }
  38. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
  1. //dependencies
  2. import io.dapr.client.DaprClient;
  3. import io.dapr.client.DaprClientBuilder;
  4. import io.dapr.client.domain.State;
  5. import io.dapr.client.domain.TransactionalStateOperation;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import reactor.core.publisher.Mono;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. import java.util.Random;
  13. import java.util.concurrent.TimeUnit;
  14. //code
  15. @SpringBootApplication
  16. public class OrderProcessingServiceApplication {
  17. private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
  18. private static final String STATE_STORE_NAME = "statestore";
  19. public static void main(String[] args) throws InterruptedException{
  20. while(true) {
  21. TimeUnit.MILLISECONDS.sleep(5000);
  22. Random random = new Random();
  23. int orderId = random.nextInt(1000-1) + 1;
  24. DaprClient client = new DaprClientBuilder().build();
  25. List<TransactionalStateOperation<?>> operationList = new ArrayList<>();
  26. operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.UPSERT,
  27. new State<>("order_3", Integer.toString(orderId), "")));
  28. operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.DELETE,
  29. new State<>("order_2")));
  30. //Using Dapr SDK to perform the state transactions
  31. client.executeStateTransaction(STATE_STORE_NAME, operationList).block();
  32. log.info("Order requested: " + orderId);
  33. }
  34. }
  35. }

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
  1. #dependencies
  2. import random
  3. from time import sleep
  4. import requests
  5. import logging
  6. from dapr.clients import DaprClient
  7. from dapr.clients.grpc._state import StateItem
  8. from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
  9. #code
  10. logging.basicConfig(level = logging.INFO)
  11. DAPR_STORE_NAME = "statestore"
  12. while True:
  13. sleep(random.randrange(50, 5000) / 1000)
  14. orderId = random.randint(1, 1000)
  15. with DaprClient() as client:
  16. #Using Dapr SDK to perform the state transactions
  17. client.execute_state_transaction(store_name=DAPR_STORE_NAME, operations=[
  18. TransactionalStateOperation(
  19. operation_type=TransactionOperationType.upsert,
  20. key="order_3",
  21. data=str(orderId)),
  22. TransactionalStateOperation(key="order_3", data=str(orderId)),
  23. TransactionalStateOperation(
  24. operation_type=TransactionOperationType.delete,
  25. key="order_2",
  26. data=str(orderId)),
  27. TransactionalStateOperation(key="order_2", data=str(orderId))
  28. ])
  29. client.delete_state(store_name=DAPR_STORE_NAME, key="order_1")
  30. logging.basicConfig(level = logging.INFO)
  31. logging.info('Order requested: ' + str(orderId))
  32. logging.info('Result: ' + str(result))

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
  1. //dependencies
  2. import { DaprClient, HttpMethod, CommunicationProtocolEnum } from '@dapr/dapr';
  3. //code
  4. const daprHost = "127.0.0.1";
  5. var main = function() {
  6. for(var i=0;i<10;i++) {
  7. sleep(5000);
  8. var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
  9. start(orderId).catch((e) => {
  10. console.error(e);
  11. process.exit(1);
  12. });
  13. }
  14. }
  15. async function start(orderId) {
  16. const client = new DaprClient({
  17. daprHost,
  18. daprPort: process.env.DAPR_HTTP_PORT,
  19. communicationProtocol: CommunicationProtocolEnum.HTTP,
  20. });
  21. const STATE_STORE_NAME = "statestore";
  22. //Using Dapr SDK to save and retrieve multiple states
  23. await client.state.transaction(STATE_STORE_NAME, [
  24. {
  25. operation: "upsert",
  26. request: {
  27. key: "order_3",
  28. value: orderId.toString()
  29. }
  30. },
  31. {
  32. operation: "delete",
  33. request: {
  34. key: "order_2"
  35. }
  36. }
  37. ]);
  38. }
  39. function sleep(ms) {
  40. return new Promise(resolve => setTimeout(resolve, ms));
  41. }
  42. main();

要启动上述示例应用程序的 Dapr sidecar,请运行类似以下命令:

  1. dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

在上面运行的同一个Dapr实例中,执行两个状态事务。

  1. curl -X POST -H "Content-Type: application/json" -d '{"operations": [{"operation":"upsert", "request": {"key": "order_1", "value": "250"}}, {"operation":"delete", "request": {"key": "order_2"}}]}' http://localhost:3601/v1.0/state/statestore/transaction

现在可以看到你的状态事务操作的结果:

  1. curl -X POST -H "Content-Type: application/json" -d '{"keys":["order_1", "order_2"]}' http://localhost:3601/v1.0/state/statestore/bulk

用上面运行的同一个Dapr实例将两个键/值对保存到你的状态存储中。

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"operations": [{"operation":"upsert", "request": {"key": "order_1", "value": "250"}}, {"operation":"delete", "request": {"key": "order_2"}}]}' -Uri 'http://localhost:3601/v1.0/state/statestore/transaction'

现在可以看到你的状态事务操作的结果:

  1. Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"keys":["order_1", "order_2"]}' -Uri 'http://localhost:3601/v1.0/state/statestore/bulk'

下一步