RocketMQ Connect实战2

PostgreSQL Source(CDC) - >RocketMQ Connect -> MySQL Sink(JDBC)

准备

启动RocketMQ

  1. Linux/Unix/Mac
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x或以上版本;
  4. 启动 RocketMQ;

tips : ${ROCKETMQ_HOME} 位置说明

bin-release.zip 版本:/rocketmq-all-4.9.4-bin-release

source-release.zip 版本:/rocketmq-all-4.9.4-source-release/distribution

启动Connect

Connector插件编译

Debezium RocketMQ Connector

  1. $ cd rocketmq-connect/connectors/rocketmq-connect-debezium/
  2. $ mvn clean package -Dmaven.test.skip=true

将 Debezium PostgreSQL RocketMQ Connector 编译好的包放入Runtime加载目录。命令如下:

  1. mkdir -p /usr/local/connector-plugins
  2. cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins

JDBC Connector

将 JDBC Connector 编译好的包放入Runtime加载目录。命令如下:

  1. $ cd rocketmq-connect/connectors/rocketmq-connect-jdbc/
  2. $ mvn clean package -Dmaven.test.skip=true
  3. cp rocketmq-connect-jdbc/target/rocketmq-connect-jdbc-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins

启动Connect Runtime

  1. cd rocketmq-connect
  2. mvn -Prelease-connect -DskipTests clean install -U

修改配置connect-standalone.conf ,重点配置如下

  1. $ cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
  2. $ vim conf/connect-standalone.conf
  1. workerId=standalone-worker
  2. storePathRootDir=/tmp/storeRoot
  3. ## Http port for user to access REST API
  4. httpPort=8082
  5. # Rocketmq namesrvAddr
  6. namesrvAddr=localhost:9876
  7. # RocketMQ acl
  8. aclEnable=false
  9. accessKey=rocketmq
  10. secretKey=12345678
  11. autoCreateGroupEnable=false
  12. clusterName="DefaultCluster"
  13. # 核心配置,将之前编译好debezium包的插件目录配置在此;
  14. # Source or sink connector jar file dir,The default value is rocketmq-connect-sample
  15. pluginPaths=/usr/local/connector-plugins
  1. cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
  2. sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

Postgres镜像

使用debezium的Postgres docker搭建环境MySQL数据库

  1. # starting a pg instance
  2. docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14
  3. # bash into postgres instance
  4. docker exec -ti postgres /bin/bash

Postgres信息 端口:5432 账号:start_data_engineer/password 同步的源数据库:bank.holding 目标库:bank1.holding

MySQL镜像

使用debezium的MySQL docker搭建环境MySQL数据库

  1. docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9

MySQL信息

端口:3306

账号:root/debezium

测试数据

通过start_data_engineer/password账号登录数据库

源数据库表:bank.holding

  1. CREATE SCHEMA bank;
  2. SET search_path TO bank,public;
  3. CREATE TABLE bank.holding (
  4. holding_id int,
  5. user_id int,
  6. holding_stock varchar(8),
  7. holding_quantity int,
  8. datetime_created timestamp,
  9. datetime_updated timestamp,
  10. primary key(holding_id)
  11. );
  12. ALTER TABLE bank.holding replica identity FULL;
  13. insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
  14. \q
  15. insert into bank.holding values (1000, 1, 'VFIAX', 10, now(), now());
  16. insert into bank.holding values (1001, 2, 'SP500', 1, now(), now());
  17. insert into bank.holding values (1003, 3, 'SP500', 1, now(), now());
  18. update bank.holding set holding_quantity = 300 where holding_id=1000;

目标表:bank1.holding

  1. create database bank1;
  2. CREATE TABLE holding (
  3. holding_id int,
  4. user_id int,
  5. holding_stock varchar(8),
  6. holding_quantity int,
  7. datetime_created bigint,
  8. datetime_updated bigint,
  9. primary key(holding_id)
  10. );

启动Connector

启动Debezium source connector

同步原表数据:bank.holding 作用:通过解析Postgres binlog 封装成通用的ConnectRecord对象,发送的RocketMQ Topic当中

  1. curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/postgres-connector -d '{
  2. "connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
  3. "max.task": "1",
  4. "connect.topicname": "debezium-postgres-source-01",
  5. "kafka.transforms": "Unwrap",
  6. "kafka.transforms.Unwrap.delete.handling.mode": "none",
  7. "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  8. "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
  9. "database.history.skip.unparseable.ddl": true,
  10. "database.server.name": "bankserver1",
  11. "database.port": 5432,
  12. "database.hostname": "数据库ip",
  13. "database.connectionTimeZone": "UTC",
  14. "database.user": "start_data_engineer",
  15. "database.dbname": "start_data_engineer",
  16. "database.password": "password",
  17. "table.whitelist": "bank.holding",
  18. "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
  19. "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
  20. }'

启动 jdbc sink connector

作用:通过消费Topic中的数据,通过JDBC协议写入到目标表当中

  1. curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/jdbcmysqlsinktest201 -d '{
  2. "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector",
  3. "max.task": "2",
  4. "connect.topicnames": "debezium-postgres-source-01",
  5. "connection.url": "jdbc:mysql://数据库ip:3306/bank1",
  6. "connection.user": "root",
  7. "connection.password": "debezium",
  8. "pk.fields": "holding_id",
  9. "table.name.from.header": "true",
  10. "pk.mode": "record_key",
  11. "insert.mode": "UPSERT",
  12. "db.timezone": "UTC",
  13. "table.types": "TABLE",
  14. "errors.deadletterqueue.topic.name": "dlq-topic",
  15. "errors.log.enable": "true",
  16. "errors.tolerance": "ALL",
  17. "delete.enabled": "true",
  18. "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
  19. "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
  20. }'

以上两个Connector任务创建成功以后 通过start_data_engineer/password账号登录数据库 账号登录数据库

对源数据库表:bankholding增删改 即可同步到目标表bank1.holding