Pulsar

Detailed documentation on the Pulsar pubsub component

Component format

To setup Apache Pulsar pubsub create a component of type pubsub.pulsar. See this guide on how to create and apply a pubsub configuration. For more information on Apache Pulsar read the docs

  1. apiVersion: dapr.io/v1alpha1
  2. kind: Component
  3. metadata:
  4. name: pulsar-pubsub
  5. spec:
  6. type: pubsub.pulsar
  7. version: v1
  8. metadata:
  9. - name: host
  10. value: "localhost:6650"
  11. - name: enableTLS
  12. value: "false"
  13. - name: tenant
  14. value: "public"
  15. - name: token
  16. value: "eyJrZXlJZCI6InB1bHNhci1wajU0cXd3ZHB6NGIiLCJhbGciOiJIUzI1NiJ9.eyJzd"
  17. - name: namespace
  18. value: "default"
  19. - name: persistent
  20. value: "true"
  21. - name: backOffPolicy
  22. value: "constant"
  23. - name: backOffMaxRetries
  24. value: "-1"
  25. - name: disableBatching
  26. value: "false"

Spec metadata fields

FieldRequiredDetailsExample
hostYAddress of the Pulsar broker. Default is “localhost:6650”“localhost:6650” OR http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080
enableTLSNEnable TLS. Default: “false”“true”, “false”
tokenNEnable Authentication.How to create pulsar token
tenantNThe topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters. Default: “public”“public”
namespaceNThe administrative unit of the topic, which acts as a grouping mechanism for related topics. Default: “default”“default”
persistentNPulsar supports two kind of topics: persistent and non-persistent. With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks. Note: the default retry behavior is to retry until it succeeds, so when you use a non-persistent theme, you can reduce or prohibit retries by defining backOffMaxRetries to 0. Default: “true”“true”, “false”
backOffPolicyNRetry policy, “constant” is a backoff policy that always returns the same backoff delay. “exponential” is a backoff policy that increases the backoff period for each retry attempt using a randomization function that grows exponentially. Defaults to “constant”.constantexponential
backOffDurationNThe fixed interval only takes effect when the backOffPolicy is “constant”. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “5s”.“5s”“5000”
backOffInitialIntervalNThe backoff initial interval on retry. Only takes effect when the backOffPolicy is “exponential”. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “500”“50”
backOffMaxIntervalNThe backoff initial interval on retry. Only takes effect when the backOffPolicy is “exponential”. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “60s”“60000”
backOffMaxRetriesNThe maximum number of retries to process the message before returning an error. Defaults to “0” which means the component will not retry processing the message. “-1” will retry indefinitely until the message is processed or the application is shutdown. Any positive number is treated as the maximum retry count.“3”
backOffRandomizationFactorNRandomization factor, between 1 and 0, including 0 but not 1. Randomized interval = RetryInterval * (1 ± backOffRandomizationFactor). Defaults to “0.5”.“0.5”
backOffMultiplierNBackoff multiplier for the policy. Increments the interval by multiplying it with the multiplier. Defaults to “1.5”“1.5”
backOffMaxElapsedTimeNAfter MaxElapsedTime the ExponentialBackOff returns Stop. There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Defaults to “15m”“15m”
disableBatchingNdisable batching.When batching enabled default batch delay is set to 10 ms and default batch size is 1000 messages,Setting disableBatching: true will make the producer to send messages individually. Default: “false”“true”, “false”
batchingMaxPublishDelayNbatchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Default: “10ms”“10ms”, “10”
batchingMaxMessagesNbatchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: “1000”“1000”
batchingMaxSizeNbatchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: “128KB”“131072”

Delay queue

When invoking the Pulsar pub/sub, it’s possible to provide an optional delay queue by using the metadata query parameters in the request url.

These optional parameter names are metadata.deliverAt or metadata.deliverAfter:

  • deliverAt: Delay message to deliver at a specified time (RFC3339 format), e.g. "2021-09-01T10:00:00Z"
  • deliverAfter: Delay message to deliver after a specified amount of time, e.g."4h5m3s"

Examples:

  1. curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAt='2021-09-01T10:00:00Z' \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Or

  1. curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAfter='4h5m3s' \
  2. -H "Content-Type: application/json" \
  3. -d '{
  4. "data": {
  5. "message": "Hi"
  6. }
  7. }'

Create a Pulsar instance

  1. docker run -it \
  2. -p 6650:6650 \
  3. -p 8080:8080 \
  4. --mount source=pulsardata,target=/pulsar/data \
  5. --mount source=pulsarconf,target=/pulsar/conf \
  6. apachepulsar/pulsar:2.5.1 \
  7. bin/pulsar standalone

Refer to the following Helm chart Documentation.

Last modified September 28, 2022: Upmerge v1.8 —> v1.9 - 9/28 (#2839) (9286e093)