Apache Kafka
Detailed documentation on the Apache Kafka pubsub component
Component format
To set up Apache Kafka pub/sub, create a component of type pubsub.kafka
. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.
All component metadata field values can carry templated metadata values, which are resolved on Dapr sidecar startup. For example, you can choose to use {namespace}
as the consumerGroup
to enable using the same appId
in different namespaces using the same topics as described in this article.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "{namespace}"
- name: consumerID # Optional. If not supplied, runtime will create one.
value: "channel1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "password"
- name: saslUsername # Required if authType is `password`.
value: "adminuser"
- name: saslPassword # Required if authType is `password`.
secretKeyRef:
name: kafka-secrets
key: saslPasswordSecret
- name: saslMechanism
value: "SHA-512"
- name: maxMessageBytes # Optional.
value: 1024
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
- name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
value: "true"
For details on using
secretKeyRef
, see the guide on how to reference secrets in components.
Spec metadata fields
Field | Required | Details | Example |
---|---|---|---|
brokers | Y | A comma-separated list of Kafka brokers. | “localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093” |
consumerGroup | N | A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic. | “group1” |
consumerID | N | Consumer ID (consumer tag) organizes one or more consumers into a group. Consumers with the same consumer ID work as one virtual consumer; for example, a message is processed only once by one of the consumers in the group. If the consumerID is not provided, the Dapr runtime set it to the Dapr application ID (appID ) value. | “channel1” |
clientID | N | A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to “namespace.appID” for Kubernetes mode or “appID” for Self-Hosted mode. | “my-namespace.my-dapr-app” , “my-dapr-app” |
authRequired | N | Deprecated Enable SASL authentication with the Kafka brokers. | “true” , “false” |
authType | Y | Configure or disable authentication. Supported values: none , password , mtls , or oidc | “password” , “none” |
saslUsername | N | The SASL username used for authentication. Only required if authType is set to “password” . | “adminuser” |
saslPassword | N | The SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authType is set to “password”`. | “” , “KeFg23!” |
saslMechanism | N | The SASL Authentication Mechanism you wish to use. Only required if authType is set to “password” . Defaults to PLAINTEXT | “SHA-512”, “SHA-256”, “PLAINTEXT” |
initialOffset | N | The initial offset to use if no offset was previously committed. Should be “newest” or “oldest”. Defaults to “newest”. | “oldest” |
maxMessageBytes | N | The maximum size in bytes allowed for a single Kafka message. Defaults to 1024. | 2048 |
consumeRetryInterval | N | The interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to 100ms. | 200ms |
consumeRetryEnabled | N | Disable consume retry by setting “false” | “true” , “false” |
version | N | Kafka cluster version. Defaults to 2.0.0. Note that this must be set to 1.0.0 if you are using Azure EventHubs with Kafka. | 0.10.2.0 |
caCert | N | Certificate authority certificate, required for using TLS. Can be secretKeyRef to use a secret reference | “——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“ |
clientCert | N | Client certificate, required for authType mtls . Can be secretKeyRef to use a secret reference | “——-BEGIN CERTIFICATE——-\n<base64-encoded DER>\n——-END CERTIFICATE——-“ |
clientKey | N | Client key, required for authType mtls Can be secretKeyRef to use a secret reference | “——-BEGIN RSA PRIVATE KEY——-\n<base64-encoded PKCS8>\n——-END RSA PRIVATE KEY——-“ |
skipVerify | N | Skip TLS verification, this is not recommended for use in production. Defaults to “false” | “true” , “false” |
disableTls | N | Disable TLS for transport security. To disable, you’re not required to set value to “true” . This is not recommended for use in production. Defaults to “false” . | “true” , “false” |
oidcTokenEndpoint | N | Full URL to an OAuth2 identity provider access token endpoint. Required when authType is set to oidc | “https://identity.example.com/v1/token” |
oidcClientID | N | The OAuth2 client ID that has been provisioned in the identity provider. Required when authType is set to oidc | dapr-kafka |
oidcClientSecret | N | The OAuth2 client secret that has been provisioned in the identity provider: Required when authType is set to oidc | “KeFg23!” |
oidcScopes | N | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc . Defaults to “openid” | “openid,kafka-prod” |
oidcExtensions | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token |
The secretKeyRef
above is referencing a kubernetes secrets store to access the tls information. Visit here to learn more about how to configure a secret store component.
Note
The metadata version
must be set to 1.0.0
when using Azure EventHubs with Kafka.
Authentication
Kafka supports a variety of authentication schemes and Dapr supports several: SASL password, mTLS, OIDC/OAuth2. With the added authentication methods, the authRequired
field has been deprecated from the v1.6 release and instead the authType
field should be used. If authRequired
is set to true
, Dapr will attempt to configure authType
correctly based on the value of saslPassword
. There are four valid values for authType
: none
, password
, certificate
, mtls
, and oidc
. Note this is authentication only; authorization is still configured within Kafka.
None
Setting authType
to none
will disable any authentication. This is NOT recommended in production.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-noauth
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "none"
- name: maxMessageBytes # Optional.
value: 1024
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
- name: disableTls
value: "true"
SASL Password
Setting authType
to password
enables SASL authentication. This requires setting the saslUsername
and saslPassword
fields.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-sasl
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "password"
- name: saslUsername # Required if authType is `password`.
value: "adminuser"
- name: saslPassword # Required if authType is `password`.
secretKeyRef:
name: kafka-secrets
key: saslPasswordSecret
- name: saslMechanism
value: "SHA-512"
- name: maxMessageBytes # Optional.
value: 1024
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
- name: caCert
secretKeyRef:
name: kafka-tls
key: caCert
Mutual TLS
Setting authType
to mtls
uses a x509 client certificate (the clientCert
field) and key (the clientKey
field) to authenticate. Note that mTLS as an authentication mechanism is distinct from using TLS to secure the transport layer via encryption. mTLS requires TLS transport (meaning disableTls
must be false
), but securing the transport layer does not require using mTLS. See Communication using TLS for configuring underlying TLS transport.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-mtls
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "mtls"
- name: caCert
secretKeyRef:
name: kafka-tls
key: caCert
- name: clientCert
secretKeyRef:
name: kafka-tls
key: clientCert
- name: clientKey
secretKeyRef:
name: kafka-tls
key: clientKey
- name: maxMessageBytes # Optional.
value: 1024
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
OAuth2 or OpenID Connect
Setting authType
to oidc
enables SASL authentication via the OAUTHBEARER mechanism. This supports specifying a bearer token from an external OAuth2 or OIDC identity provider. Currently, only the client_credentials grant is supported.
Configure oidcTokenEndpoint
to the full URL for the identity provider access token endpoint.
Set oidcClientID
and oidcClientSecret
to the client credentials provisioned in the identity provider.
If caCert
is specified in the component configuration, the certificate is appended to the system CA trust for verifying the identity provider certificate. Similarly, if skipVerify
is specified in the component configuration, verification will also be skipped when accessing the identity provider.
By default, the only scope requested for the token is openid
; it is highly recommended that additional scopes be specified via oidcScopes
in a comma-separated list and validated by the Kafka broker. If additional scopes are not used to narrow the validity of the access token, a compromised Kafka broker could replay the token to access other services as the Dapr clientID.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "oidc"
- name: oidcTokenEndpoint # Required if authType is `oidc`.
value: "https://identity.example.com/v1/token"
- name: oidcClientID # Required if authType is `oidc`.
value: "dapr-myapp"
- name: oidcClientSecret # Required if authType is `oidc`.
secretKeyRef:
name: kafka-secrets
key: oidcClientSecret
- name: oidcScopes # Recommended if authType is `oidc`.
value: "openid,kafka-dev"
- name: caCert # Also applied to verifying OIDC provider certificate
secretKeyRef:
name: kafka-tls
key: caCert
- name: maxMessageBytes # Optional.
value: 1024
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
Communication using TLS
By default TLS is enabled to secure the transport layer to Kafka. To disable TLS, set disableTls
to true
. When TLS is enabled, you can control server certificate verification using skipVerify
to disable verification (NOT recommended in production environments) and caCert
to specify a trusted TLS certificate authority (CA). If no caCert
is specified, the system CA trust will be used. To also configure mTLS authentication, see the section under Authentication. Below is an example of a Kafka pubsub component configured to use transport layer TLS:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: consumerGroup # Optional. Used for input bindings.
value: "group1"
- name: clientID # Optional. Used as client tracing ID by Kafka brokers.
value: "my-dapr-app-id"
- name: authType # Required.
value: "certificate"
- name: consumeRetryInterval # Optional.
value: 200ms
- name: version # Optional.
value: 0.10.2.0
- name: maxMessageBytes # Optional.
value: 1024
- name: caCert # Certificate authority certificate.
secretKeyRef:
name: kafka-tls
key: caCert
auth:
secretStore: <SECRET_STORE_NAME>
Sending and receiving multiple messages
Apache Kafka component supports sending and receiving multiple messages in a single operation using the bulk Pub/sub API.
Configuring bulk subscribe
When subscribing to a topic, you can configure bulkSubscribe
options. Refer to Subscribing messages in bulk for more details. Learn more about the bulk subscribe API.
Apache Kafka supports the following bulk metadata options:
Configuration | Default |
---|---|
maxBulkAwaitDurationMs | 10000 (10s) |
maxBulkSubCount | 80 |
Per-call metadata fields
Partition Key
When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the metadata
query param in the request url.
The param name is partitionKey
.
Example:
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partitionKey=key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
Message headers
All other metadata key/value pairs (that are not partitionKey
) are set as headers in the Kafka message. Here is an example setting a correlationId
for the message.
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
Create a Kafka instance
You can run Kafka locally using this Docker image. To run without Docker, see the getting started guide here.
To run Kafka on Kubernetes, you can use any Kafka operator, such as Strimzi.
Related links
- Basic schema for a Dapr component
- Read this guide for instructions on configuring pub/sub components
- Pub/Sub building block
Last modified October 12, 2023: Update config.toml (#3826) (0ffc2e7)