Kafka Producer
Description
The Kafka Producer transform allows you to publish messages in near-real-time across worker nodes where multiple, subscribed members have access.
A Kafka Producer transform publishes a stream of records to one Kafka topic.
Options
Option | Description |
---|---|
Transform name | the name for this transform |
Bootstrap servers | comma separated list of bootstrap servers in a Kafka cluster |
Client ID | The unique Client identifier, used to identify and set up a durable connection path to the server to make requests and to distinguish between different clients. |
Topic | The category to which records are published. |
Key Field | In Kafka, all messages can be keyed, allowing for messages to be distributed to partitions based on their keys in a default routing scheme. If no key is present, messages are randomly distributed to partitions. |
Message Field | The individual record contained in a topic. |
Options
Use this tab to configure the property formats of the Kafka consumer broker sources. A few of the most common property formats have been included for your convenience. You can enter any desired Kafka property. For further information on these input names, see the Apache Kafka documentation site: https://kafka.apache.org/documentation/.
The options that are included by default are:
Option | Value |
---|---|
auto.offset.reset | latest |
ssl.key.password | |
ssl.keystore.location | |
ssl.keystore.password | |
ssl.truststore.location | |
ssl.truststore.password |
Avro and Schema registry
Here are some options you need to send Avro Record values to a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.
Option | Example |
---|---|
schema.registry.url | |
value.converter.schema.registry.url | |
auto.register.schemas | true |
security.protocol | SASL_SSL |
sasl.jaas.config | org.apache.kafka.common.security.plain.PlainLoginModule required username=”CLUSTER_API_KEY” password=”CLUSTER_API_SECRET”; |
username | CLUSTER_API_KEY |
password | CLUSTER_API_SECRET |
sasl.mechanism | PLAIN |
client.dns.lookup | use_all_dns_ips |
acks | ALL |
basic.auth.credentials.source | USER_INFO |
basic.auth.user.info | CLUSTER_API_KEY:CLUSTER_API_SECRET |
schema.registry.basic.auth.user.info | SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET |