RocketMQ Connect in Action 3

RocketMQ Connect Overview

Preparation

Start RocketMQ

  1. Linux/Unix/Mac
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x+;
  4. Start RocketMQ;

tips : ${ROCKETMQ_HOME} locational instructions

bin-release.zip version:/rocketmq-all-4.9.4-bin-release

source-release.zip version:/rocketmq-all-4.9.4-source-release/distribution

Start Connect

Compiling Connector Plugin

Debezium RocketMQ Connector

  1. $ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
  2. $ mvn clean package -Dmaven.test.skip=true

Compile the Debezium MySQL, PostgreSQL, and RocketMQ Connector packages and place them in the Runtime loading directory. The command is as follows:

  1. mkdir -p /usr/local/connector-plugins
  2. cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins
  3. cp rocketmq-connect-debezium-mysql/target/rocketmq-connect-debezium-mysql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins

JDBC Connector

Move the compiled JDBC Connector package into the Runtime loading directory. The command is as follows:

  1. $ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/
  2. $ mvn clean package -Dmaven.test.skip=true
  3. cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins

Start Connect Runtime

  1. cd rocketmq-connect
  2. mvn -Prelease-connect -DskipTests clean install -U

Modify the configuration connect-standalone.conf, the main configuration is as follows

  1. $ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
  2. $ vim conf/connect-standalone.conf
  1. workerId=standalone-worker
  2. storePathRootDir=/tmp/storeRoot
  3. ## Http port for user to access REST API
  4. httpPort=8082
  5. # Rocketmq namesrvAddr
  6. namesrvAddr=localhost:9876
  7. # RocketMQ acl
  8. aclEnable=false
  9. accessKey=rocketmq
  10. secretKey=12345678
  11. autoCreateGroupEnable=false
  12. clusterName="DefaultCluster"
  13. # Core configuration, configure the plugin directory of the previously compiled debezium package here
  14. # Source or sink connector jar file dir,The default value is rocketmq-connect-sample
  15. pluginPaths=/usr/local/connector-plugins
  1. cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
  2. sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

Postgres image

Use debezium’s Postgres docker environment to set up the Postgres database

  1. # starting a pg instance
  2. docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14
  3. # bash into postgres instance
  4. docker exec -ti postgres /bin/bash

Postgres information Port:5432 Account:start_data_engineer/password Synchronize the source database:bank.user

MySQL image

Use debezium’s MySQL docker environment to set up the MySQL database

  1. docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9
  1. docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9

MySQL information

Port:3306

Account:root/debezium Synchronize the source database:bank.user

Target database:bank1.user

Test data

Login to the database using the root/debezium account

Source database table:bank.user

  1. create database bank;
  2. use bank;
  3. create table bank.user
  4. (
  5. id bigint NOT NULL AUTO_INCREMENT,
  6. user_id integer,
  7. name varchar(8),
  8. age integer,
  9. birthday date,
  10. datetime_created timestamp(3),
  11. datetime_updated timestamp(3),
  12. height decimal(11, 2) null,
  13. PRIMARY KEY (`id`)
  14. );
  15. insert into bank.user values (1003, 1, 'lilei2', 10, now(), now(), now(), 1.72);
  16. update bank.user set user_id = 1003 where id = 1003;

Login to the PostgreSQL database using the start_data_engineer/password account.

Source database table: bank.user

  1. CREATE SCHEMA bank;
  2. SET search_path TO bank,public;
  3. create table bank.user
  4. (
  5. id integer not null
  6. constraint user_pkey
  7. primary key,
  8. user_id integer,
  9. name varchar(8),
  10. age integer,
  11. birthday date,
  12. datetime_created timestamp(3),
  13. datetime_updated timestamp(3),
  14. height numeric(11, 2)
  15. );
  16. insert into bank.user values (1001, 1, 'lilei1', 10, now(), now(), now(), 1.72);
  17. update bank.user set user_id = 1001 where id = 1001;

Target database table: bank1.user

  1. create database bank1;
  2. create table bank1.user
  3. (
  4. id bigint auto_increment
  5. primary key,
  6. user_id int null,
  7. name varchar(8) null,
  8. age int null,
  9. birthday date null,
  10. datetime_created timestamp(3) null,
  11. datetime_updated timestamp(3) null,
  12. height decimal(11, 2) null
  13. );

Start Connector

Start Debezium source connector

Synchronize the original table:bank.user Purpose:Parse the MySQL binlog and encapsulate it into a common ConnectRecord object, sent to the RocketMQ Topic.

  1. curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/MySQLCDCSource1000 -d '{
  2. "connector.class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
  3. "max.task": "1",
  4. "connect.topicname": "debezium-source-topic1000",
  5. "kafka.transforms": "Unwrap",
  6. "kafka.transforms.Unwrap.delete.handling.mode": "none",
  7. "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  8. "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
  9. "database.history.skip.unparseable.ddl": true,
  10. "database.history.name.srv.addr": "localhost:9876",
  11. "database.history.rocketmq.topic": "db-history-debezium-topic1000",
  12. "database.history.store.only.monitored.tables.ddl": true,
  13. "include.schema.changes": false,
  14. "database.server.name": "dbserver1",
  15. "database.port": 3306,
  16. "database.hostname": "database ip",
  17. "database.connectionTimeZone": "UTC",
  18. "database.user": "debezium",
  19. "database.password": "dbz",
  20. "table.include.list": "bank.user",
  21. "max.batch.size": 50,
  22. "database.include.list": "bank",
  23. "snapshot.mode": "when_needed",
  24. "database.server.id": "184054",
  25. "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
  26. "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
  27. }'

Synchronize the original table:bank.user Purpose: Parse the Postgres binlog and encapsulate it into a common ConnectRecord object, sent to the RocketMQ Topic.

  1. curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector1000 -d '{
  2. "connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
  3. "max.task": "1",
  4. "connect.topicname": "debezium-source-topic1000",
  5. "kafka.transforms": "Unwrap",
  6. "kafka.transforms.Unwrap.delete.handling.mode": "none",
  7. "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  8. "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
  9. "database.history.skip.unparseable.ddl": true,
  10. "database.server.name": "bankserver1",
  11. "database.port": 5432,
  12. "database.hostname": "database ip",
  13. "database.connectionTimeZone": "UTC",
  14. "database.user": "start_data_engineer",
  15. "database.dbname": "start_data_engineer",
  16. "database.password": "password",
  17. "table.whitelist": "bank.user",
  18. "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
  19. "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
  20. }'

Start JDBC sink connector

Purpose: Consume the data in the Topic and write it to the target table through JDBC protocol.

  1. curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest1000 -d '{
  2. "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
  3. "max.task": "2",
  4. "connect.topicnames": "debezium-source-topic1000",
  5. "connection.url": "jdbc:mysql://database ip:3306/bank1",
  6. "connection.user": "root",
  7. "connection.password": "debezium",
  8. "pk.fields": "id",
  9. "table.name.from.header": "true",
  10. "pk.mode": "record_key",
  11. "insert.mode": "UPSERT",
  12. "db.timezone": "UTC",
  13. "table.types": "TABLE",
  14. "errors.deadletterqueue.topic.name": "dlq-topic",
  15. "errors.log.enable": "true",
  16. "errors.tolerance": "ALL",
  17. "delete.enabled": "true",
  18. "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
  19. "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
  20. }'

After the above three Connector tasks are created successfully, login to the PostgreSQL database using the start_data_engineer/password account or login to the MySQL database using the root/debezium account.

Modifying, deleting, or adding to the source database table bank.user will synchronize to the target MySQL table bank1.user.