Stream Data into Apache Kafka
Apache Kafka (opens new window) is a popular open-source distributed event streaming platform. EMQX’s integration with Apache Kafka/Confluent offers users dependable bi-directional data transport and processing capabilities in high-throughput situations. Furthermore, EMQX currently supports authentication with Apache Kafka/Confluent using SASL/SCRAM or SASL/GSSAPI.
Streaming data into or from Apache Kafka involves creating data bridges to Kafka in two roles: producer (sends messages to Kafka) and consumer (receives messages from Kafka). EMQX enables you to create data bridges in either of the roles.
TIP
EMQX Enterprise Edition features. EMQX Enterprise Edition provides comprehensive coverage of key business scenarios, rich data integration, product-level reliability, and 24/7 global technical support. Experience the benefits of this enterprise-ready MQTT messaging platform (opens new window) today.
Prerequisites
- Knowledge about EMQX data integration rules
- Knowledge about data bridge
Feature List
Quick Start Tutorial
This section introduces how to stream data into or from Kafka, covering topics like how to set up a Kafka server, how to create a bridge and a rule for forwarding data to the bridge and how to test the data bridge and rule.
This tutorial assumes that you run both EMQX and Kafka on the local machine. If you have Kafka and EMQX running remotely, please adjust the settings accordingly.
Install Kafka
This section takes macOS as an example to illustrate the process. You can install and run Kafka with the commands below:
wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
# Use KRaft to run Kafka (optional)
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
bin/kafka-server-start.sh config/kraft/server.properties
For detailed operation steps, you may refer to the Quick Start section in Kafka Documentation (opens new window).
Create Kafka Topics
Relevant Kafka topics should be created before creating the data bridge in EMQX. Use the commands below to create two topics in Kafka: testtopic-in
(for the producer role) and testtopic-out
(for the consumer role).
bin/kafka-topics.sh --create --topic testtopic-in --bootstrap-server localhost:9092
bin/kafka-topics.sh --create --topic testtopic-out --bootstrap-server localhost:9092
Create Kafka Data Bridge
This section demonstrates how to create Kafka producer or consumer data bridges via Dashboard.
Go to EMQX Dashboard, and click Integration -> Data Bridge.
Click Create on the top right corner of the page.
In the Create Data Bridge page, click to select Kafka, and then click Next.
In Bridge Role field, select Producer or Consumer. Click the corresponding tabs for the configuration of each role.
Fill in the required fields (marked with an asterisk).
Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.
Input the connection information. Input
127.0.0.1:9092
for the Bootstrap Hosts. For the other fields set as the actual condition.Source MQTT Topic: Set the MQTT topics to create the data bridge. In this example, it is set to
t/#
, indicating all MQTT messages matching this topic will be sent to Kafka. You can also leave it blank, and create a rule to specify data to be sent to Kafka.Kafka Topic Name: Input
testtopic-in
(the Kafka topic created before). Note: Variables are not supported here.Message Key: Kafka message key. Insert a string here, either a plain string or a string containing placeholders (${var}).
Message Value: Kafka message value. Insert a string here, either a plain string or a string containing placeholders (${var}).
Advanced settings (optional): Set the Max Batch Bytes, Compression, and Partition Strategy as your business needs.
Fill the required fields (marked with an asterisk).
Input a name for the data bridge. The name should be a combination of upper/lower case letters and numbers.
Input the connection information. Input
127.0.0.1:9092
for the Bootstrap Hosts. For the other fields set as the actual condition.The Topic Mapping field must contain at least one Kafka-to-MQTT topic mapping. The MQTT Payload Template subfield specifies the MQTT payload that should be used, and has the following Kafka message fields available for templating:
Field Name Description headers
An object containing string key-value pairs key
Kafka message key (uses the same encoding method as the selected key) offset
Offset for the message in Kafka’s topic partition topic
Originating Kafka topic ts
Message timestamp ts_type
Message timestamp type, which is one of create
,append
orundefined
value
Kafka message value (uses the same encoding method as the selected key) The default value for MQTT Payload Template is
${.}
, which includes all available data encoded as a JSON object. For example, choosing${.}
as a template will produce the following for a Kafka message:{
"value": "value",
"ts_type": "create",
"ts": 1679665968238,
"topic": "my-kafka-topic",
"offset": 2,
"key": "key",
"headers": {"header_key": "header_value"}
}
Subfields from the Kafka message may be accessed with dot notation. For example,
${.value}
will resolve to the Kafka message value, and${.headers.h1}
will resolve to the value of theh1
Kafka header if such a subfield exists. Absent values will be replaced by empty strings.Note: Each Kafka-to-MQTT topic mapping must contain a unique Kafka topic name. That is, the Kafka topic must not be present in more than one mapping.
Before clicking Create, you can click Test Connection to test that the bridge can connect to the Kafka server.
Click Create, you’ll be offered the option of creating an associated rule.
- For the Kafka producer data bridge, click Create Rule to create an associated rule. For detailed operating steps, see Create Rule for Kafka Producer Data Bridge.
- For the Kafka consumer data bridge, it’s not strictly necessary to create a rule.
TIP
Creating a rule allows Kafka messages matching the rule to be further transformed and filtered if needed, and then forwarded to other rule actions, like different bridges. Refer to the Rules for more information on creating rules. The MQTT topics defined in Topic Mapping will start having messages published to them without further configuration.
Now the Kafka data bridge should appear in the data bridge list (Integration -> Data Bridge) with Resource Status as Connected.
Create Rule for Kafka Producer Data Bridge
Go to EMQX Dashboard, and click Integration -> Rules.
Click Create on the top right corner of the page.
Input, for example,
my_rule
as the rule ID.Input the following statement in the SQL Editor if you want to save the MQTT messages under topic
t/#
to Kafka.
Note: If you want to specify your own SQL syntax, make sure that you have included all fields required by the data bridge in the SELECT
part.
SELECT
*
FROM
"t/#"
Click the Add Action button, select Forwarding with Data Bridge from the dropdown list and then select the data bridge you just created under Data bridge. Then click the Add button.
Click Create at the page bottom to finish the creation.
Now you have successfully created the data bridge to Kafka producer data bridge. You can click Integration -> Flows to view the topology. It can be seen that the messages under topic t/#
are sent and saved to Kafka after parsing by rule my_rule
.
Test the Data Bridge and Rule
Use MQTTX to send messages to topic t/1
:
mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Kafka" }'
Check the running status of the two data bridges, there should be one new incoming and one new outgoing message.
Check whether messages are written into the topic testtopic-in
with the following Kafka command:
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic testtopic-in --from-beginning