Pulsar
Detailed documentation on the Pulsar pubsub component
Component format
To set up Apache Pulsar pub/sub, create a component of type pubsub.pulsar
. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
For more information on Apache Pulsar, read the official docs.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pulsar-pubsub
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: enableTLS
value: "false"
- name: tenant
value: "public"
- name: token
value: "eyJrZXlJZCI6InB1bHNhci1wajU0cXd3ZHB6NGIiLCJhbGciOiJIUzI1NiJ9.eyJzd"
- name: consumerID
value: "channel1"
- name: namespace
value: "default"
- name: persistent
value: "true"
- name: disableBatching
value: "false"
- name: <topic-name>.jsonschema # sets a json schema validation for the configured topic
value: |
{
"type": "record",
"name": "Example",
"namespace": "test",
"fields": [
{"name": "ID","type": "int"},
{"name": "Name","type": "string"}
]
}
- name: <topic-name>.avroschema # sets an avro schema validation for the configured topic
value: |
{
"type": "record",
"name": "Example",
"namespace": "test",
"fields": [
{"name": "ID","type": "int"},
{"name": "Name","type": "string"}
]
}
Warning
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets. This component supports storing the token
parameter and any other sensitive parameter and data as Kubernetes Secrets.
Spec metadata fields
Field | Required | Details | Example |
---|---|---|---|
host | Y | Address of the Pulsar broker. Default is “localhost:6650” | “localhost:6650” OR “http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080“ |
enableTLS | N | Enable TLS. Default: “false” | “true” , “false” |
token | N | Enable Authentication. | How to create pulsar token |
tenant | N | The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters. Default: “public” | “public” |
consumerID | N | Used to set the subscription name or consumer ID. | “channel1” |
namespace | N | The administrative unit of the topic, which acts as a grouping mechanism for related topics. Default: “default” | “default” |
persistent | N | Pulsar supports two kinds of topics: persistent and non-persistent. With persistent topics, all messages are durably persisted on disks (if the broker is not standalone, messages are durably persisted on multiple disks), whereas data for non-persistent topics is not persisted to storage disks. | |
disableBatching | N | disable batching.When batching enabled default batch delay is set to 10 ms and default batch size is 1000 messages,Setting disableBatching: true will make the producer to send messages individually. Default: “false” | “true” , “false” |
batchingMaxPublishDelay | N | batchingMaxPublishDelay set the time period within which the messages sent will be batched,if batch messages are enabled. If set to a non zero value, messages will be queued until this time interval or batchingMaxMessages (see below) or batchingMaxSize (see below). There are two valid formats, one is the fraction with a unit suffix format, and the other is the pure digital format that is processed as milliseconds. Valid time units are “ns”, “us” (or “µs”), “ms”, “s”, “m”, “h”. Default: “10ms” | “10ms” , “10” |
batchingMaxMessages | N | batchingMaxMessages set the maximum number of messages permitted in a batch.If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxSize (see below) has been reached or the batch interval has elapsed. Default: “1000” | “1000” |
batchingMaxSize | N | batchingMaxSize sets the maximum number of bytes permitted in a batch. If set to a value greater than 1, messages will be queued until this threshold is reached or batchingMaxMessages (see above) has been reached or the batch interval has elapsed. Default: “128KB” | “131072” |
N | Enforces JSON schema validation for the configured topic. | ||
N | Enforces Avro schema validation for the configured topic. | ||
publicKey | N | A public key to be used for publisher and consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value | |
privateKey | N | A private key to be used for consumer encryption. Value can be one of two options: file path for a local PEM cert, or the cert data string value | |
keys | N | A comma delimited string containing names of Pulsar session keys. Used in conjunction with publicKey for publisher encryption | |
processMode | N | Enable processing multiple messages at once. Default: “async” | “async” , “sync” |
subscribeType | N | Pulsar supports four kinds of subscription types. Default: “shared” | “shared” , “exclusive” , “failover” , “key_shared” |
partitionKey | N | Sets the key of the message for routing policy. Default: “” |
Enabling message delivery retries
The Pulsar pub/sub component has no built-in support for retry strategies. This means that sidecar sends a message to the service only once and is not retried in case of failures. To make Dapr use more spohisticated retry policies, you can apply a retry resiliency policy to the Pulsar pub/sub component. Note that it will be the same Dapr sidecar retrying the redelivery the message to the same app instance and not other instances.
Delay queue
When invoking the Pulsar pub/sub, it’s possible to provide an optional delay queue by using the metadata
query parameters in the request url.
These optional parameter names are metadata.deliverAt
or metadata.deliverAfter
:
deliverAt
: Delay message to deliver at a specified time (RFC3339 format); for example,"2021-09-01T10:00:00Z"
deliverAfter
: Delay message to deliver after a specified amount of time; for example,"4h5m3s"
Examples:
curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAt='2021-09-01T10:00:00Z' \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
Or
curl -X POST http://localhost:3500/v1.0/publish/myPulsar/myTopic?metadata.deliverAfter='4h5m3s' \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
E2E Encryption
Dapr supports setting public and private key pairs to enable Pulsar’s end-to-end encryption feature.
Enabling publisher encryption from file certs
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: publicKey
value: ./public.key
- name: keys
value: myapp.key
Enabling consumer encryption from file certs
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: publicKey
value: ./public.key
- name: privateKey
value: ./private.key
Enabling publisher encryption from value
Note: It is recommended to reference the public key from a secret.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: publicKey
value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
- name: keys
value: myapp.key
Enabling consumer encryption from value
Note: It is recommended to reference the public and private keys from a secret.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: publicKey
value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
- name: privateKey
value: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA1KDAM4L8RtJ+nLaXBrBhzVpvTemsKVZoAct8A+ShepOHT9lg\nHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDdruXSflvSdmYeFAw3Ypphc1A5oM53\nwSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/a3golb36GYFrY0MLFTv7wZ87pmMI\nPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eKjpwcg35gccvR6o/UhbKAuc60V1J9\nWof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0QiCdpIrXvYtANq0Id6gP8zJvUEdPIg\nNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ3QIDAQABAoIBAQCKuHnM4ac/eXM7\nQPDVX1vfgyHc3hgBPCtNCHnXfGFRvFBqavKGxIElBvGOcBS0CWQ+Rg1Ca5kMx3TQ\njSweSYhH5A7pe3Sa5FK5V6MGxJvRhMSkQi/lJZUBjzaIBJA9jln7pXzdHx8ekE16\nBMPONr6g2dr4nuI9o67xKrtfViwRDGaG6eh7jIMlEqMMc6WqyhvI67rlVDSTHFKX\njlMcozJ3IT8BtTzKg2Tpy7ReVuJEpehum8yn1ZVdAnotBDJxI07DC1cbOP4M2fHM\ngfgPYWmchauZuTeTFu4hrlY5jg0/WLs6by8r/81+vX3QTNvejX9UdTHMSIfQdX82\nAfkCKUVhAoGBAOvGv+YXeTlPRcYC642x5iOyLQm+BiSX4jKtnyJiTU2s/qvvKkIu\nxAOk3OtniT9NaUAHEZE9tI71dDN6IgTLQlAcPCzkVh6Sc5eG0MObqOO7WOMCWBkI\nlaAKKBbd6cGDJkwGCJKnx0pxC9f8R4dw3fmXWgWAr8ENiekMuvjSfjZ5AoGBAObd\ns2L5uiUPTtpyh8WZ7rEvrun3djBhzi+d7rgxEGdditeiLQGKyZbDPMSMBuus/5wH\nwfi0xUq50RtYDbzQQdC3T/C20oHmZbjWK5mDaLRVzWS89YG/NT2Q8eZLBstKqxkx\ngoT77zoUDfRy+CWs1xvXzgxagD5Yg8/OrCuXOqWFAoGAPIw3r6ELknoXEvihASxU\nS4pwInZYIYGXpygLG8teyrnIVOMAWSqlT8JAsXtPNaBtjPHDwyazfZrvEmEk51JD\nX0tA8M5ah1NYt+r5JaKNxp3P/8wUT6lyszyoeubWJsnFRfSusuq/NRC+1+KDg/aq\nKnSBu7QGbm9JoT2RrmBv5RECgYBRn8Lj1I1muvHTNDkiuRj2VniOSirkUkA2/6y+\nPMKi+SS0tqcY63v4rNCYYTW1L7Yz8V44U5mJoQb4lvpMbolGhPljjxAAU3hVkItb\nvGVRlSCIZHKczADD4rJUDOS7DYxO3P1bjUN4kkyYx+lKUMDBHFzCa2D6Kgt4dobS\n5qYajQKBgQC7u7MFPkkEMqNqNGu5erytQkBq1v1Ipmf9rCi3iIj4XJLopxMgw0fx\n6jwcwNInl72KzoUBLnGQ9PKGVeBcgEgdI+a+tq+1TJo6Ta+hZSx+4AYiKY18eRKG\neNuER9NOcSVJ7Eqkcw4viCGyYDm2vgNV9HJ0VlAo3RDh8x5spEN+mg==\n-----END RSA PRIVATE KEY-----\n"
Partition Key
When invoking the Pulsar pub/sub, it’s possible to provide an optional partition key by using the metadata
query parameter in the request url.
The parameter name is partitionKey
.
Example:
curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.partitionKey=key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
Message headers
All other metadata key/value pairs (that are not partitionKey
) are set as headers in the Pulsar message. For example, set a correlationId
for the message:
curl -X POST http://localhost:3500/v1.0/publish/myPlusar/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
Order guarantee
To ensure that messages arrive in order for each consumer subscribed to a specific key, three conditions must be met.
subscribeType
should be set tokey_shared
.partitionKey
must be set.processMode
should be set tosync
.
Create a Pulsar instance
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.5.1 \
bin/pulsar standalone
Refer to the following Helm chart Documentation.
Related links
- Basic schema for a Dapr component
- Read this guide for instructions on configuring pub/sub components
- Pub/Sub building block
Last modified June 19, 2023: Merge pull request #3565 from dapr/aacrawfi/skip-secrets-close (b1763bf)