Manual

Introduction to CDC Function

CDC only synchronizes data, it does not synchronize table structures, and currently does not support the synchronization of DDL statements.

Introduction to CDC Protocol

The CDC protocol uses Protobuf, and the corresponding Protobuf types are mapped based on the types in Java.

Here, taking openGauss as an example, the mapping relationship between the data types of the CDC protocol and the database types is as follows.

openGauss typeJava data typeCDC corresponding protobuf typeRemarks
tinyint, smallint, integerIntegerint32
bigintLongint64
numericBigDecimalstring
real, float4Floatfloat
binary_double, double precisionDoubledouble
booleanBooleanbool
char, varchar, text, clobStringstring
blob, bytea, rawbyte[]bytes
date, timestamp, timestamptz, smalldatetimejava.sql.TimestampTimestampThe Timestamp type of protobuf only contains seconds and nanoseconds, so it is irrelevant to the time zone
time, timetzjava.sql.Timeint64Represents the number of nanoseconds of the day, irrelevant to the time zone
interval, reltime, abstimeStringstring
point, lseg, box, path, polygon, circleStringstring
cidr, inet, macaddrStringstring
tsvectorStringstring
tsqueryStringString
uuidStringstring
json, jsonbStringstring
hllStringstring
int4range, daterange, tsrange, tstzrangeStringstring
hash16, hash32Stringstring
bit, bit varyingStringstringReturns Boolean type when bit(1)

openGauss User Manual

Environmental Requirements

Supported openGauss versions: 2.x ~ 3.x.

Permission Requirements

  1. Adjust the source end WAL configuration.

Example configuration for postgresql.conf:

  1. wal_level = logical
  2. max_wal_senders = 10
  3. max_replication_slots = 10
  4. wal_sender_timeout = 0
  5. max_connections = 600

For details, please refer to Write Ahead Log and Replication.

  1. Grant replication permission to the source end openGauss account.

Example configuration for pg_hba.conf:

  1. host replication repl_acct 0.0.0.0/0 md5
  2. # 0.0.0.0/0 means allowing access from any IP address, which can be adjusted to the IP address of the CDC Server according to the actual situation

For details, please refer to Configuring Client Access Authentication and Example: Logic Replication Code.

  1. Grant DDL DML permissions to the openGauss account.

If a non-super administrator account is used, it is required that this account has CREATE and CONNECT permissions on the database used.

Example:

  1. GRANT CREATE, CONNECT ON DATABASE source_ds TO cdc_user;

The account also needs to have access permissions to the table and schema to be subscribed, taking the t_order table under the test schema as an example.

  1. \c source_ds
  2. GRANT USAGE ON SCHEMA test TO GROUP cdc_user;
  3. GRANT SELECT ON TABLE test.t_order TO cdc_user;

openGauss has the concept of OWNER. If it is the OWNER of the database, SCHEMA, or table, the corresponding authorization steps can be omitted.

openGauss does not allow ordinary accounts to operate under the public schema. So if the table to be migrated is under the public schema, additional authorization is needed.

  1. GRANT ALL PRIVILEGES TO cdc_user;

For details, please refer to openGauss GRANT

Complete Process Example

Prerequisites

  1. Prepare the database, table, and data of the CDC source end.
  1. DROP DATABASE IF EXISTS ds_0;
  2. CREATE DATABASE ds_0;
  3. DROP DATABASE IF EXISTS ds_1;
  4. CREATE DATABASE ds_1;

Configure CDC Server

  1. Create a logical database.
  1. CREATE DATABASE sharding_db;
  2. \c sharding_db
  1. Register storage unit.
  1. REGISTER STORAGE UNIT ds_0 (
  2. URL="jdbc:opengauss://127.0.0.1:5432/ds_0",
  3. USER="gaussdb",
  4. PASSWORD="Root@123",
  5. PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
  6. ), ds_1 (
  7. URL="jdbc:opengauss://127.0.0.1:5432/ds_1",
  8. USER="gaussdb",
  9. PASSWORD="Root@123",
  10. PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
  11. );
  1. Create sharding rules.
  1. CREATE SHARDING TABLE RULE t_order(
  2. STORAGE_UNITS(ds_0,ds_1),
  3. SHARDING_COLUMN=order_id,
  4. TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="2")),
  5. KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
  6. );
  1. Create tables

Execute the creation table statement in the proxy.

  1. CREATE TABLE t_order (order_id INT NOT NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id));

Start CDC Client

Currently, the CDC Client only provides a Java API, and users need to implement the data consumption themselves.

Below is a simple example of starting the CDC Client.

  1. import lombok.SneakyThrows;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
  4. import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
  5. import org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler;
  6. import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
  7. import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
  8. import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
  9. import java.util.Collections;
  10. @Slf4j
  11. public final class Bootstrap {
  12. @SneakyThrows(InterruptedException.class)
  13. public static void main(final String[] args) {
  14. String address = "127.0.0.1";
  15. // Construct CDCClient, pass in CDCClientConfiguration, CDCClientConfiguration contains the address and port of the CDC Server, as well as the timeout time
  16. try (CDCClient cdcClient = new CDCClient(new CDCClientConfiguration(address, 33071, 10000))) {
  17. // First call connect to the CDC Server, you need to pass in 1. Data consumption processing logic 2. Exception handling logic during consumption 3. Server error exception handling logic
  18. cdcClient.connect(records -> log.info("records: {}", records), new RetryStreamingExceptionHandler(cdcClient, 5, 5000),
  19. (ctx, result) -> log.error("Server error: {}", result.getErrorMessage()));
  20. cdcClient.login(new CDCLoginParameter("root", "root"));
  21. // Start CDC data synchronization, the returned streamingId is the unique identifier of this CDC task, the basis for the CDC Server to generate a unique identifier is the name of the subscribed database + the subscribed table + whether it is full synchronization
  22. String streamingId = cdcClient.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), true));
  23. log.info("Streaming id={}", streamingId);
  24. // Prevent the main thread from exiting
  25. cdcClient.await();
  26. }
  27. }
  28. }

There are mainly 4 steps

  1. Construct CDCClient, pass in CDCClientConfiguration
  2. Call CDCClient.connect(), this step is to establish a connection with the CDC Server
  3. Call CDCClient.login(), log in with the username and password configured in global.yaml
  4. Call CDCClient.startStreaming(), start subscribing, you need to ensure that the subscribed database and table exist in ShardingSphere-Proxy, otherwise an error will be reported

CDCClient.await is to block the main thread, it is not a necessary step, other methods can also be used, as long as the CDC thread is always working.

If you need more complex data consumption implementation, such as writing to the database, you can refer to DataSourceRecordConsumer

Write Data

When write data through a proxy, the CDC Client is notified of the data change.

  1. INSERT INTO t_order (order_id, user_id, status) VALUES (1,1,'ok1'),(2,2,'ok2'),(3,3,'ok3');
  2. UPDATE t_order SET status='updated' WHERE order_id = 1;
  3. DELETE FROM t_order WHERE order_id = 2;

Bootstrap will output a similar log.

  1. records: [before {
  2. name: "order_id"
  3. value {
  4. type_url: "type.googleapis.com/google.protobuf.Empty"
  5. }
  6. ......

View the Running Status of the CDC Task

The start and stop of the CDC task can only be controlled by the CDC Client. You can view the status of the CDC task by executing DistSQL in the proxy

  1. View the CDC task list

SHOW STREAMING LIST;

Running result

  1. sharding_db=> SHOW STREAMING LIST;
  2. id | database | tables | job_item_count | active | create_time | stop_time
  3. --------------------------------------------+-------------+---------+----------------+--------+---------------------+-----------
  4. j0302p0000702a83116fcee83f70419ca5e2993791 | sharding_db | t_order | 1 | true | 2023-10-27 22:01:27 |
  5. (1 row)
  1. View the details of the CDC task

SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;

Running result

  1. sharding_db=> SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;
  2. item | data_source | status | active | processed_records_count | inventory_finished_percentage | incremental_idle_seconds | confirmed_position | current_position | error_message
  3. ------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+--------------------+------------------+---------------
  4. 0 | ds_0 | EXECUTE_INCREMENTAL_TASK | false | 2 | 100 | 115 | 5/597E43D0 | 5/597E4810 |
  5. 1 | ds_1 | EXECUTE_INCREMENTAL_TASK | false | 3 | 100 | 115 | 5/597E4450 | 5/597E4810 |
  6. (2 rows)
  1. Drop CDC task

DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;

The CDC task can only be deleted when there are no subscriptions. At this time, the replication slots on the openGauss physical database will also be deleted.

  1. sharding_db=> DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;
  2. SUCCESS

Precautions

Explanation of incremental data push

  1. The CDC incremental push is currently transactional, and the transactions of the physical database will not be split. Therefore, if there are data changes in multiple tables in a transaction, these data changes will be pushed together. If you want to support XA transactions (currently only supports openGauss), both openGauss and Proxy need the GLT module.
  2. The conditions for push are met when a certain amount of data is met or a certain time interval is reached (currently 300ms). When processing XA transactions, if the received multiple physical database incremental events exceed 300ms, it may cause the XA transaction to be split and pushed.

Handling of large transactions

Currently, large transactions are fully parsed, which may cause the CDC Server process to OOM. In the future, forced truncation may be considered.

There is no fixed value for the performance of CDC, you can focus on the batchSize of read/write in the configuration, and the size of the memory queue, and tune it according to the actual situation.