Build

Background Information

ShardingSphere CDC is divided into two parts, one is the CDC Server, and the other is the CDC Client. The CDC Server and ShardingSphere-Proxy are currently deployed together.

Users can introduce the CDC Client into their own projects to implement data consumption logic.

Constraints

  • Pure JAVA development, JDK recommended 1.8 or above.
  • CDC Server requires SharingSphere-Proxy to use cluster mode, currently supports ZooKeeper as the registry center.
  • CDC only synchronizes data, does not synchronize table structure, and currently does not support DDL statement synchronization.
  • CDC incremental task will not split transaction data of the database shards. If you want to enable XA transaction compatibility, both openGauss and ShardingSphere-Proxy need the GLT module.

CDC Server Deployment Steps

Here, the openGauss database is used as an example to introduce the deployment steps of the CDC Server.

Since the CDC Server is built into ShardingSphere-Proxy, you need to get ShardingSphere-Proxy. For details, please refer to the proxy startup manual.

Configure GLT Module (Optional)

The official website’s released binary package does not include the GLT module by default, if you are using the openGauss database with GLT functionality, you can additionally introduce the GLT module to ensure the integrity of XA transactions.

There are currently two ways to introduce the GLT module, and corresponding configurations need to be made in global.yaml.

1. Source code compilation and installation

1.1 Prepare the code environment, download in advance or use Git clone to download the ShardingSphere source code from Github.

1.2 Delete the <scope>provided</scope> tag of the shardingsphere-global-clock-tso-provider-redis dependency in kernel/global-clock/type/tso/core/pom.xml and the <scope>provided</scope> tag of jedis in kernel/global-clock/type/tso/provider/redis/pom.xml

1.3 Compile ShardingSphere-Proxy, for specific compilation steps, please refer to the ShardingSphere Compilation Manual.

2. Directly introduce GLT dependencies

Can be introduced from the maven repository

2.1. shardingsphere-global-clock-tso-provider-redis, download the same version as ShardingSphere-Proxy

2.2. jedis-4.3.1

CDC Server User Manual

  1. Modify the configuration file conf/global.yaml and turn on the CDC function. Currently, mode must be Cluster, and the corresponding registry center needs to be started in advance. If the GLT provider uses Redis, Redis needs to be started in advance.

Configuration example:

  1. Enable CDC function in global.yaml.
  1. mode:
  2. type: Cluster
  3. repository:
  4. type: ZooKeeper
  5. props:
  6. namespace: cdc_demo
  7. server-lists: localhost:2181
  8. retryIntervalMilliseconds: 500
  9. timeToLiveSeconds: 60
  10. maxRetries: 3
  11. operationTimeoutMilliseconds: 500
  12. authority:
  13. users:
  14. - user: root@%
  15. password: root
  16. privilege:
  17. type: ALL_PERMITTED
  18. # When using GLT, you also need to enable distributed transactions, GLT is only supported by the openGauss database currently.
  19. #transaction:
  20. # defaultType: XA
  21. # providerType: Atomikos
  22. #
  23. #globalClock:
  24. # enabled: true
  25. # type: TSO
  26. # provider: redis
  27. # props:
  28. # host: 127.0.0.1
  29. # port: 6379
  30. props:
  31. system-log-level: INFO
  32. proxy-default-port: 3307 # Proxy default port.
  33. cdc-server-port: 33071 # CDC Server port, must be configured
  34. proxy-frontend-database-protocol-type: openGauss # Consistent with the type of backend database
  1. Introduce JDBC driver.

Proxy has included JDBC driver of PostgreSQL and openGauss.

If the backend is connected to the following databases, download the corresponding JDBC driver jar package and put it into the ${shardingsphere-proxy}/ext-lib directory.

DatabaseJDBC Driver
MySQLmysql-connector-j-8.3.0.jar
  1. Start ShardingSphere-Proxy:
  1. sh bin/start.sh
  1. View the proxy log logs/stdout.log. If you see the following statements:
  1. [INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy Cluster mode started successfully

The startup will have been successful.

  1. Configure CDC on demand.

6.1. Query configuration.

  1. SHOW STREAMING RULE;

The default configuration is as follows:

  1. +--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
  2. | read | write | stream_channel |
  3. +--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
  4. | {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | {"workerThread":20,"batchSize":1000} | {"type":"MEMORY","props":{"block-queue-size":"2000"}} |
  5. +--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+

6.2. Alter configuration (optional).

Since the streaming rule has default values, there is no need to create it, only the ALTER statement is provided.

A completely configured DistSQL is as follows.

  1. ALTER STREAMING RULE (
  2. READ(
  3. WORKER_THREAD=20,
  4. BATCH_SIZE=1000,
  5. SHARDING_SIZE=10000000,
  6. RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
  7. ),
  8. WRITE(
  9. WORKER_THREAD=20,
  10. BATCH_SIZE=1000,
  11. RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
  12. ),
  13. STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
  14. );

Configuration item description:

  1. ALTER STREAMING RULE (
  2. READ( -- Data reading configuration. If it is not configured, part of the parameters will take effect by default.
  3. WORKER_THREAD=20, -- Affects full and incremental tasks, obtain the thread pool size of all the data from the source side. If it is not configured, the default value is used. It needs to ensure that this value is not lower than the number of database shards.
  4. BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number of records returned by a query operation. If it is not configured, the default value is used. If the amount of data in a transaction is greater than this value, the incremental situation may exceed the set value.
  5. SHARDING_SIZE=10000000, -- Affects full tasks, sharding size of all the data. If it is not configured, the default value is used.
  6. RATE_LIMITER ( -- Affects full and incremental tasks, traffic limit algorithm. If it is not configured, traffic is not limited.
  7. TYPE( -- Algorithm type. Option: QPS
  8. NAME='QPS',
  9. PROPERTIES( -- Algorithm property
  10. 'qps'='500'
  11. )))
  12. ),
  13. WRITE( -- Data writing configuration. If it is not configured, part of the parameters will take effect by default.
  14. WORKER_THREAD=20, -- Affects full and incremental tasks, the size of the thread pool on which data is written into the target side. If it is not configured, the default value is used.
  15. BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number of records for a batch write operation. If it is not configured, the default value is used. If the amount of data in a transaction is greater than this value, the incremental situation may exceed the set value.
  16. RATE_LIMITER ( -- Traffic limit algorithm. If it is not configured, traffic is not limited.
  17. TYPE( -- Algorithm type. Option: TPS
  18. NAME='TPS',
  19. PROPERTIES( -- Algorithm property.
  20. 'tps'='2000'
  21. )))
  22. ),
  23. STREAM_CHANNEL ( -- Data channel. It connects producers and consumers, used for reading and writing procedures. If it is not configured, the MEMORY type is used by default.
  24. TYPE( -- Algorithm type. Option: MEMORY
  25. NAME='MEMORY',
  26. PROPERTIES( -- Algorithm property
  27. 'block-queue-size'='2000' -- Property: blocking queue size.
  28. )))
  29. );

CDC Client Manual

The CDC Client does not need to be deployed separately, just need to introduce the dependency of the CDC Client through maven to use it in the project. Users can interact with the server through the CDC Client.

If necessary, users can also implement a CDC Client themselves to consume data and ACK.

  1. <dependency>
  2. <groupId>org.apache.shardingsphere</groupId>
  3. <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
  4. <version>${version}</version>
  5. </dependency>

CDC Client Introduction

org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient is the entry class of the CDC Client. Users can interact with the CDC Server through this class. The main new methods are as follows.

Method NameReturn ValueDescription
connect(Consumer<List> dataConsumer, ExceptionHandler exceptionHandler, ServerErrorResultHandler errorResultHandlervoidConnect with the server, when connecting, you need to specify
1. Data consumption processing function
2. Exception handling logic during consumption
3. Server error exception handling function
login(CDCLoginParameter parameter)voidCDC login, parameters
username: username
password: password
startStreaming(StartStreamingParameter parameter)streamingIdStart CDC subscription
StartStreamingParameter parameters
database: logical database name
schemaTables: subscribed table name
full: whether to subscribe to full data
restartStreaming(String streamingId)voidRestart subscription
stopStreaming(String streamingId)voidStop subscription
dropStreaming(String streamingId)voidDelete subscription
await()voidBlock the CDC thread and wait for the channel to close
close()voidClose the channel, the process ends