Getting started with the Dapr client Python SDK
How to get up and running with the Dapr Python SDK
The Dapr client package allows you to interact with other Dapr applications from a Python application.
Pre-requisites
- Dapr CLI installed
- Initialized Dapr environment
- Python 3.7+ installed
- Dapr Python module installed
Import the client package
The dapr package contains the DaprClient
which will be used to create and use a client.
from dapr.clients import DaprClient
Building blocks
The Python SDK allows you to interface with all of the Dapr building blocks.
Invoke a service
from dapr.clients import DaprClient
with DaprClient() as d:
# invoke a method (gRPC or HTTP GET)
resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"message":"Hello World"}')
# for other HTTP verbs the verb must be specified
# invoke a 'POST' method (HTTP only)
resp = d.invoke_method('service-to-invoke', 'method-to-invoke', data='{"id":"100", "FirstName":"Value", "LastName":"Value"}', http_verb='post')
- For a full guide on service invocation visit How-To: Invoke a service.
- Visit Python SDK examples for code samples and instructions to try out service invocation
Save & get application state
from dapr.clients import DaprClient
with DaprClient() as d:
# Save state
d.save_state(store_name="statestore", key="key1", value="value1")
# Get state
data = d.get_state(store_name="statestore", key="key1").data
# Delete state
d.delete_state(store_name="statestore", key="key1")
- For a full list of state operations visit How-To: Get & save state.
- Visit Python SDK examples for code samples and instructions to try out state management
Query application state (Alpha)
from dapr import DaprClient
query = '''
{
"filter": {
"EQ": { "state": "CA" }
},
"sort": [
{
"key": "person.id",
"order": "DESC"
}
]
}
'''
with DaprClient() as d:
resp = d.query_state(
store_name='state_store',
query=query,
states_metadata={"metakey": "metavalue"}, # optional
)
- For a full list of state store query options visit How-To: Query state.
- Visit Python SDK examples for code samples and instructions to try out state store querying.
Publish & subscribe to messages
Publish messages
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.publish_event(pubsub_name='pubsub', topic_name='TOPIC_A', data='{"message":"Hello World"}')
Subscribe to messages
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import json
app = App()
# Default subscription for a topic
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A')
def mytopic(event: v1.Event) -> None:
data = json.loads(event.Data())
print(f'Received: id={data["id"]}, message="{data ["message"]}"'
' content_type="{event.content_type}"',flush=True)
# Specific handler using Pub/Sub routing
@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A',
rule=Rule("event.type == \"important\"", 1))
def mytopic_important(event: v1.Event) -> None:
data = json.loads(event.Data())
print(f'Received: id={data["id"]}, message="{data ["message"]}"'
' content_type="{event.content_type}"',flush=True)
- For more information about pub/sub, visit How-To: Publish & subscribe.
- Visit Python SDK examples for code samples and instructions to try out pub/sub.
Interact with output bindings
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.invoke_binding(binding_name='kafkaBinding', operation='create', data='{"message":"Hello World"}')
- For a full guide on output bindings visit How-To: Use bindings.
- Visit Python SDK examples for code samples and instructions to try out output bindings
Retrieve secrets
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.get_secret(store_name='localsecretstore', key='secretKey')
- For a full guide on secrets visit How-To: Retrieve secrets.
- Visit Python SDK examples for code samples and instructions to try out retrieving secrets
Get configuration
from dapr.clients import DaprClient
with DaprClient() as d:
# Get Configuration
configuration = d.get_configuration(store_name='configurationstore', keys=['orderId'], config_metadata={})
Subscribe to configuration
import asyncio
from time import sleep
from dapr.clients import DaprClient
async def executeConfiguration():
with DaprClient() as d:
storeName = 'configurationstore'
key = 'orderId'
# Wait for sidecar to be up within 20 seconds.
d.wait(20)
# Subscribe to configuration by key.
configuration = await d.subscribe_configuration(store_name=storeName, keys=[key], config_metadata={})
while True:
if configuration != None:
items = configuration.get_items()
for key, item in items:
print(f"Subscribe key={key} value={item.value} version={item.version}", flush=True)
else:
print("Nothing yet")
sleep(5)
asyncio.run(executeConfiguration())
- Learn more about managing configurations via the How-To: Manage configuration guide.
- Visit Python SDK examples for code samples and instructions to try out configuration
Distributed Lock
from dapr.clients import DaprClient
def main():
# Lock parameters
store_name = 'lockstore' # as defined in components/lockstore.yaml
resource_id = 'example-lock-resource'
client_id = 'example-client-id'
expiry_in_seconds = 60
with DaprClient() as dapr:
print('Will try to acquire a lock from lock store named [%s]' % store_name)
print('The lock is for a resource named [%s]' % resource_id)
print('The client identifier is [%s]' % client_id)
print('The lock will will expire in %s seconds.' % expiry_in_seconds)
with dapr.try_lock(store_name, resource_id, client_id, expiry_in_seconds) as lock_result:
assert lock_result.success, 'Failed to acquire the lock. Aborting.'
print('Lock acquired successfully!!!')
# At this point the lock was released - by magic of the `with` clause ;)
unlock_result = dapr.unlock(store_name, resource_id, client_id)
print('We already released the lock so unlocking will not work.')
print('We tried to unlock it anyway and got back [%s]' % unlock_result.status)
- Learn more about using a distributed lock: How-To: Use a lock.
- Visit Python SDK examples for code samples and instructions to try out distributed lock.
Workflow
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"
def main():
with DaprClient() as d:
host = settings.DAPR_RUNTIME_HOST
port = settings.DAPR_GRPC_PORT
workflowRuntime = WorkflowRuntime(host, port)
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf)
workflowRuntime.register_activity(hello_act)
workflowRuntime.start()
# Start the workflow
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# ...
# Pause Test
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
# Resume Test
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
sleep(1)
# Raise event
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
event_name=eventName, event_data=eventData)
sleep(5)
# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")
# Kick off another workflow for termination purposes
# This will also test using the same instance ID on a new workflow after
# the old instance was purged
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# Terminate Test
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
# Purge Test
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")
workflowRuntime.shutdown()
- Learn more about authoring and managing workflows:
- Visit Python SDK examples for code samples and instructions to try out Dapr Workflow.