声明式和编程式订阅方法

了解 Dapr 允许您订阅主题的方法。

Pub/sub API订阅方法

Dapr 应用程序可以通过两种方法订阅发布的主题,这两种方法支持相同的功能:声明式和编程式。

订阅方法说明
声明式订阅定义在一个外部文件中。 声明式方法会从您的代码中移除 Dapr 依赖,并允许现有的应用程序订阅 topics,而无需更改代码。
编程式订阅定义在应用程序代码中。 编程式方法在代码中实现订阅。

以下示例演示了checkout应用程序和orderprocessing应用程序通过orders主题进行pub/sub消息传递。 示例演示了相同的 Dapr 发布/订阅组件,首先以声明方式使用,然后以编程方式使用。

声明式订阅

您可以使用外部组件文件来声明式地订阅主题。 这个示例使用一个名为 subscription.yaml 的YAML组件文件:

  1. apiVersion: dapr.io/v2alpha1
  2. kind: Subscription
  3. metadata:
  4. name: order
  5. spec:
  6. topic: orders
  7. routes:
  8. default: /checkout
  9. pubsubname: pubsub
  10. scopes:
  11. - orderprocessing
  12. - checkout

这里的订阅称为 order:

  • 使用Pub/sub(发布/订阅)组件pubsub订阅名为orders的主题。
  • route字段设置为将所有主题消息发送到应用程序中的/checkout端点。
  • scopes 字段设置为仅限具有 orderprocessingcheckout 的应用程序访问此订阅。

在运行 Dapr 时,将 YAML 组件文件路径设置为指向组件的 Dapr。

  1. dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
  1. dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
  1. dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
  1. dapr run --app-id myapp --resources-path ./myComponents -- npm start
  1. dapr run --app-id myapp --resources-path ./myComponents -- go run app.go

在 Kubernetes 中,将该组件应用到集群中:

  1. kubectl apply -f subscription.yaml

在您的应用程序代码中,订阅 Dapr pub/sub 组件中指定的主题。

  1. //Subscribe to a topic
  2. [HttpPost("checkout")]
  3. public void getCheckout([FromBody] int orderId)
  4. {
  5. Console.WriteLine("Subscriber received : " + orderId);
  6. }
  1. import io.dapr.client.domain.CloudEvent;
  2. //Subscribe to a topic
  3. @PostMapping(path = "/checkout")
  4. public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  5. return Mono.fromRunnable(() -> {
  6. try {
  7. log.info("Subscriber received: " + cloudEvent.getData());
  8. }
  9. });
  10. }
  1. from cloudevents.sdk.event import v1
  2. #Subscribe to a topic
  3. @app.route('/checkout', methods=['POST'])
  4. def checkout(event: v1.Event) -> None:
  5. data = json.loads(event.Data())
  6. logging.info('Subscriber received: ' + str(data))
  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. // listen to the declarative route
  6. app.post('/checkout', (req, res) => {
  7. console.log(req.body);
  8. res.sendStatus(200);
  9. });
  1. //Subscribe to a topic
  2. var sub = &common.Subscription{
  3. PubsubName: "pubsub",
  4. Topic: "orders",
  5. Route: "/checkout",
  6. }
  7. func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
  8. log.Printf("Subscriber received: %s", e.Data)
  9. return false, nil
  10. }

/checkout 端点与订阅中定义的 route 相匹配,这是 Dapr 将所有主题消息发送至的位置。

编程方式订阅

动态编程方法返回 routes 代码中的 JSON 结构,与声明式方法的 route YAML 结构相对应。

注意: 编程订阅只在应用程序启动时读取一次。 你不能_动态_添加新的编程订阅,仅在编译时添加新的编程订阅。

在下面的示例中,您将在应用程序代码中定义在声明性YAML订阅上方找到的值。

  1. [Topic("pubsub", "orders")]
  2. [HttpPost("/checkout")]
  3. public async Task<ActionResult<Order>>Checkout(Order order, [FromServices] DaprClient daprClient)
  4. {
  5. // Logic
  6. return order;
  7. }

or

  1. // Dapr subscription in [Topic] routes orders topic to this route
  2. app.MapPost("/checkout", [Topic("pubsub", "orders")] (Order order) => {
  3. Console.WriteLine("Subscriber received : " + order);
  4. return Results.Ok(order);
  5. });

上面定义的这两个处理程序还需要映射到配置 dapr/subscribe 端点。 这是在定义端点时在应用程序启动代码中完成的。

  1. app.UseEndpoints(endpoints =>
  2. {
  3. endpoints.MapSubscribeHandler();
  4. });
  1. private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
  2. @Topic(name = "checkout", pubsubName = "pubsub")
  3. @PostMapping(path = "/orders")
  4. public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  5. return Mono.fromRunnable(() -> {
  6. try {
  7. System.out.println("Subscriber received: " + cloudEvent.getData());
  8. System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
  9. } catch (Exception e) {
  10. throw new RuntimeException(e);
  11. }
  12. });
  1. @app.route('/dapr/subscribe', methods=['GET'])
  2. def subscribe():
  3. subscriptions = [
  4. {
  5. 'pubsubname': 'pubsub',
  6. 'topic': 'checkout',
  7. 'routes': {
  8. 'rules': [
  9. {
  10. 'match': 'event.type == "order"',
  11. 'path': '/orders'
  12. },
  13. ],
  14. 'default': '/orders'
  15. }
  16. }]
  17. return jsonify(subscriptions)
  18. @app.route('/orders', methods=['POST'])
  19. def ds_subscriber():
  20. print(request.json, flush=True)
  21. return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  22. app.run()
  1. const express = require('express')
  2. const bodyParser = require('body-parser')
  3. const app = express()
  4. app.use(bodyParser.json({ type: 'application/*+json' }));
  5. const port = 3000
  6. app.get('/dapr/subscribe', (req, res) => {
  7. res.json([
  8. {
  9. pubsubname: "pubsub",
  10. topic: "checkout",
  11. routes: {
  12. rules: [
  13. {
  14. match: 'event.type == "order"',
  15. path: '/orders'
  16. },
  17. ],
  18. default: '/products'
  19. }
  20. }
  21. ]);
  22. })
  23. app.post('/orders', (req, res) => {
  24. console.log(req.body);
  25. res.sendStatus(200);
  26. });
  27. app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "github.com/gorilla/mux"
  8. )
  9. const appPort = 3000
  10. type subscription struct {
  11. PubsubName string `json:"pubsubname"`
  12. Topic string `json:"topic"`
  13. Metadata map[string]string `json:"metadata,omitempty"`
  14. Routes routes `json:"routes"`
  15. }
  16. type routes struct {
  17. Rules []rule `json:"rules,omitempty"`
  18. Default string `json:"default,omitempty"`
  19. }
  20. type rule struct {
  21. Match string `json:"match"`
  22. Path string `json:"path"`
  23. }
  24. // This handles /dapr/subscribe
  25. func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
  26. t := []subscription{
  27. {
  28. PubsubName: "pubsub",
  29. Topic: "checkout",
  30. Routes: routes{
  31. Rules: []rule{
  32. {
  33. Match: `event.type == "order"`,
  34. Path: "/orders",
  35. },
  36. },
  37. Default: "/orders",
  38. },
  39. },
  40. }
  41. w.WriteHeader(http.StatusOK)
  42. json.NewEncoder(w).Encode(t)
  43. }
  44. func main() {
  45. router := mux.NewRouter().StrictSlash(true)
  46. router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
  47. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
  48. }

下一步