AWS SNS/SQS

Detailed documentation on the AWS SNS/SQS pubsub component

Component format

To setup AWS SNS/SQS for pub/sub, you create a component of type pubsub.snssqs. See this guide on how to create and apply a pubsub configuration.

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: snssqs-pubsub
  5. namespace: default
  6. spec:
  7. type: pubsub.snssqs
  8. version: v1
  9. metadata:
  10. - name: accessKey
  11. value: "AKIAIOSFODNN7EXAMPLE"
  12. - name: secretKey
  13. value: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
  14. - name: region
  15. value: "us-east-1"
  16. # - name: endpoint # Optional.
  17. # value: "http://localhost:4566"
  18. # - name: sessionToken # Optional (mandatory if using AssignedRole, i.e. temporary accessKey and secretKey)
  19. # value: "TOKEN"
  20. # - name: messageVisibilityTimeout # Optional
  21. # value: 10
  22. # - name: messageRetryLimit # Optional
  23. # value: 10
  24. # - name: messageReceiveLimit # Optional
  25. # value: 10
  26. # - name: sqsDeadLettersQueueName # Optional
  27. # - value: "myapp-dlq"
  28. # - name: messageWaitTimeSeconds # Optional
  29. # value: 1
  30. # - name: messageMaxNumber # Optional
  31. # value: 10
  32. # - name: fifo # Optional
  33. # value: "true"
  34. # - name: fifoMessageGroupID # Optional
  35. # value: "app1-mgi"
  36. # - name: disableEntityManagement # Optional
  37. # value: "false"
  38. # - name: disableDeleteOnRetryLimit # Optional
  39. # value: "false"
  40. # - name: assetsManagementTimeoutSeconds # Optional
  41. # value: 5

Warning

The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.

Spec metadata fields

FieldRequiredDetailsExample
accessKeyYID of the AWS account/role with appropriate permissions to SNS and SQS (see below)“AKIAIOSFODNN7EXAMPLE”
secretKeyYSecret for the AWS user/role. If using an AssumeRole access, you will also need to provide a sessionToken“wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY”
regionYThe AWS region where the SNS/SQS assets are located or be created in. See this page for valid regions. Ensure that SNS and SQS are available in that region“us-east-1”
endpointNAWS endpoint for the component to use. Only used for local development with, for example, localstack. The endpoint is unncessary when running against production AWShttp://localhost:4566
sessionTokenNAWS session token to use. A session token is only required if you are using temporary security credentials“TOKEN”
messageReceiveLimitNNumber of times a message is received, after processing of that message fails, that once reached, results in removing of that message from the queue. If sqsDeadLettersQueueName is specified, messageReceiveLimit is the number of times a message is received, after processing of that message fails, that once reached, results in moving of the message to the SQS dead-letters queue. Default: 1010
sqsDeadLettersQueueNameNName of the dead letters queue for this application“myapp-dlq”
messageVisibilityTimeoutNAmount of time in seconds that a message is hidden from receive requests after it is sent to a subscriber. Default: 1010
messageRetryLimitNNumber of times to resend a message after processing of that message fails before removing that message from the queue. Default: 1010
messageWaitTimeSecondsNThe duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than messageWaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. Default: 11
messageMaxNumberNMaximum number of messages to receive from the queue at a time. Default: 10, Maximum: 1010
fifoNUse SQS FIFO queue to provide message ordering and deduplication. Default: “false”. See further details about SQS FIFO“true”, “false”
fifoMessageGroupIDNIf fifo is enabled, instructs Dapr to use a custom Message Group ID for the pubsub deployment. This is not mandatory as Dapr creates a custom Message Group ID for each producer, thus ensuring ordering of messages per a Dapr producer. Default: “”“app1-mgi”
disableEntityManagementNWhen set to true, SNS topics, SQS queues and the SQS subscriptions to SNS do not get created automatically. Default: “false”“true”, “false”
disableDeleteOnRetryLimitNWhen set to true, after retrying and failing of messageRetryLimit times processing a message, reset the message visibility timeout so that other consumers can try processing, instead of deleting the message from SQS (the default behvior). Default: “false”“true”, “false”
assetsManagementTimeoutSecondsNAmount of time in seconds, for an AWS asset management operation, before it times out and cancelled. Asset management operations are any operations performed on STS, SNS and SQS, except message publish and consume operations that implement the default Dapr component retry behavior. The value can be set to any non-negative float/integer. Default: 50.5, 10
  • Dapr created SNS topic and SQS queue names conform with AWS specifications. By default, Dapr creates an SQS queue name based on the consumer app-id, therefore Dapr might perform name standardization to meet with AWS specifications.
  • Using SQS FIFO (fifo metadata field set to "true"), per AWS specifications, provides message ordering and deduplication, but incurs a lower SQS processing throughput, among other caveats
  • Be aware that specifying fifoMessageGroupID limits the number of concurrent consumers of the FIFO queue used to only one but guarantees global ordering of messages published by the app’s Dapr sidecars. See this post to better understand the topic of Message Group IDs and FIFO queues.

Important

When running the Dapr sidecar (daprd) with your application on EKS (AWS Kubernetes), if you’re using a node/pod that has already been attached to an IAM policy defining access to AWS resources, you must not provide AWS access-key, secret-key, and tokens in the definition of the component spec you’re using.

Create an SNS/SQS instance

For local development the localstack project is used to integrate AWS SNS/SQS. Follow the instructions here to run localstack.

To run localstack locally from the command line using Docker, apply the following cmd:

  1. docker run --rm -it -p 4566:4566 -p 4571:4571 -e SERVICES="sts,sns,sqs" -e AWS_DEFAULT_REGION="us-east-1" localstack/localstack

In order to use localstack with your pubsub binding, you need to provide the endpoint configuration in the component metadata. The endpoint is unncessary when running against production AWS.

See Authenticating to AWS for information about authentication-related attributes

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: snssqs-pubsub
  5. spec:
  6. type: pubsub.snssqs
  7. version: v1
  8. metadata:
  9. - name: accessKey
  10. value: "anyString"
  11. - name: secretKey
  12. value: "anyString"
  13. - name: endpoint
  14. value: http://localhost:4566
  15. # Use us-east-1 or any other region if provided to localstack as defined by "AWS_DEFAULT_REGION" envvar
  16. - name: region
  17. value: us-east-1

To run localstack on Kubernetes, you can apply the configuration below. Localstack is then reachable at the DNS name http://localstack.default.svc.cluster.local:4566 (assuming this was applied to the default namespace) and this should be used as the endpoint

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: localstack
  5. namespace: default
  6. spec:
  7. # using the selector, we will expose the running deployments
  8. # this is how Kubernetes knows, that a given service belongs to a deployment
  9. selector:
  10. matchLabels:
  11. app: localstack
  12. replicas: 1
  13. template:
  14. metadata:
  15. labels:
  16. app: localstack
  17. spec:
  18. containers:
  19. - name: localstack
  20. image: localstack/localstack:latest
  21. ports:
  22. # Expose the edge endpoint
  23. - containerPort: 4566
  24. ---
  25. kind: Service
  26. apiVersion: v1
  27. metadata:
  28. name: localstack
  29. labels:
  30. app: localstack
  31. spec:
  32. selector:
  33. app: localstack
  34. ports:
  35. - protocol: TCP
  36. port: 4566
  37. targetPort: 4566
  38. type: LoadBalancer

In order to run in AWS, you should create or assign an IAM user with permissions to the SNS and SQS services having a Policy such as:

  1. {
  2. "Version": "2012-10-17",
  3. "Statement": [
  4. {
  5. "Sid": "YOUR_POLICY_NAME",
  6. "Effect": "Allow",
  7. "Action": [
  8. "sns:CreateTopic",
  9. "sns:GetTopicAttributes",
  10. "sns:ListSubscriptionsByTopic",
  11. "sns:Publish",
  12. "sns:Subscribe",
  13. "sns:TagResource",
  14. "sqs:ChangeMessageVisibility",
  15. "sqs:CreateQueue",
  16. "sqs:DeleteMessage",
  17. "sqs:GetQueueAttributes",
  18. "sqs:GetQueueUrl",
  19. "sqs:ReceiveMessage",
  20. "sqs:SetQueueAttributes",
  21. "sqs:TagQueue"
  22. ],
  23. "Resource": [
  24. "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:*",
  25. "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:*"
  26. ]
  27. }
  28. ]
  29. }

Use the AWS account ID and AWS account secret and plug them into the accessKey and secretKey in the component metadata using Kubernetes secrets and secretKeyRef.

Alternatively, if you want to provision the SNS and SQS assets using your own tool of choice (e.g. Terraform), while preventing Dapr from doing so dynamically, you need to enable disableEntityManagement and assign your Dapr-using application with an IAM Role having a Policy such as:

  1. {
  2. "Version": "2012-10-17",
  3. "Statement": [
  4. {
  5. "Sid": "YOUR_POLICY_NAME",
  6. "Effect": "Allow",
  7. "Action": [
  8. "sqs:DeleteMessage",
  9. "sqs:ReceiveMessage",
  10. "sqs:ChangeMessageVisibility",
  11. "sqs:GetQueueUrl",
  12. "sqs:GetQueueAttributes",
  13. "sns:Publish",
  14. "sns:ListSubscriptionsByTopic",
  15. "sns:GetTopicAttributes"
  16. ],
  17. "Resource": [
  18. "arn:aws:sns:AWS_REGION:AWS_ACCOUNT_ID:APP_TOPIC_NAME",
  19. "arn:aws:sqs:AWS_REGION:AWS_ACCOUNT_ID:APP_ID"
  20. ]
  21. }
  22. ]
  23. }

If you are running your applications on an EKS cluster with dynamic assets creation (the default Dapr behavior)

Last modified June 23, 2022: Merge pull request #2550 from ItalyPaleAle/cosmosdb-harcoded-dapr-version (cf03237)