开始使用 Dapr Python gRPC 进行服务扩展

如何使用 Dapr Python gRPC 扩展启动和运行

Dapr Python SDK 提供了一个内置的 gRPC 服务器扩展,dapr.ext.grpc,用于创建 Dapr 服务。

安装

你可以通过下面的方式下载和安装 Dapr gRPC 服务器扩展模块:

  1. pip install dapr-ext-grpc

Note

  1. The development package will contain features and behavior that will be compatible with the pre-release version of the Dapr runtime. 在安装 <code>dapr-dev</code> 包之前,请务必卸载任何稳定版本的 Python SDK 扩展。
  1. pip3 install dapr-ext-grpc-dev

示例

App 对象可以用来创建服务器。

监听服务调用请求状态

InvokeMethodReqestInvokeMethodResponse 对象可用于处理传入请求。

一个将侦听和响应请求的简单服务将如下所示:

  1. from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
  2. app = App()
  3. @app.method(name='my-method')
  4. def mymethod(request: InvokeMethodRequest) -> InvokeMethodResponse:
  5. print(request.metadata, flush=True)
  6. print(request.text(), flush=True)
  7. return InvokeMethodResponse(b'INVOKE_RECEIVED', "text/plain; charset=UTF-8")
  8. app.run(50051)

完整的示例可以在这里找到。

订阅主题

当订阅一个主题时,您可以指示 Dapr 是否接受已传递的事件,或者是否应该丢弃它,或稍后重试。

  1. from typing import Optional
  2. from cloudevents.sdk.event import v1
  3. from dapr.ext.grpc import App
  4. from dapr.clients.grpc._response import TopicEventResponse
  5. app = App()
  6. # Default subscription for a topic
  7. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
  8. def mytopic(event: v1.Event) -> Optional[TopicEventResponse]:
  9. print(event.Data(),flush=True)
  10. # Returning None (or not doing a return explicitly) is equivalent
  11. # to returning a TopicEventResponse("success").
  12. # You can also return TopicEventResponse("retry") for dapr to log
  13. # the message and retry delivery later, or TopicEventResponse("drop")
  14. # for it to drop the message
  15. return TopicEventResponse("success")
  16. # Specific handler using Pub/Sub routing
  17. @app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
  18. rule=Rule("event.type == \"important\"", 1))
  19. def mytopic_important(event: v1.Event) -> None:
  20. print(event.Data(),flush=True)
  21. # Handler with disabled topic validation
  22. @app.subscribe(pubsub_name='pubsub-mqtt', topic='topic/#', disable_topic_validation=True,)
  23. def mytopic_wildcard(event: v1.Event) -> None:
  24. print(event.Data(),flush=True)
  25. app.run(50051)

完整的示例可以在这里找到。

设置输入绑定触发器

  1. from dapr.ext.grpc import App, BindingRequest
  2. app = App()
  3. @app.binding('kafkaBinding')
  4. def binding(request: BindingRequest):
  5. print(request.text(), flush=True)
  6. app.run(50051)

完整的示例可以在这里找到。

相关链接