TiCDC Open Protocol

TiCDC Open Protocol is a row-level data change notification protocol that provides data sources for monitoring, caching, full-text indexing, analysis engines, and primary-secondary replication between different databases. TiCDC complies with TiCDC Open Protocol and replicates data changes of TiDB to third-party data medium such as MQ (Message Queue).

TiCDC Open Protocol uses Event as the basic unit to replicate data change events to the downstream. The Event is divided into three categories:

  • Row Changed Event: Represents the data change in a row. When a row is changed, this Event is sent and contains information about the changed row.
  • DDL Event: Represents the DDL change. This Event is sent after a DDL statement is successfully executed in the upstream. The DDL Event is broadcasted to every MQ Partition.
  • Resolved Event: Represents a special time point before which the Event received is complete.

Restrictions

  • In most cases, the Row Changed Event of a version is sent only once, but in special situations such as node failure and network partition, the Row Changed Event of the same version might be sent multiple times.
  • On the same table, the Row Changed Events of each version which is first sent are incremented in the order of timestamps (TS) in the Event stream.
  • Resolved Events are periodically broadcasted to each MQ Partition. The Resolved Event means that any Event with a TS earlier than Resolved Event TS has been sent to the downstream.
  • DDL Events are broadcasted to each MQ Partition.
  • Multiple Row Changed Events of a row are sent to the same MQ Partition.

Message format

A Message contains one or more Events, arranged in the following format:

Key:

Offset(Byte)0~78~1516~(15+length1)
ParameterProtocol versionLength1Event Key1LengthNEvent KeyN

Value:

Offset(Byte)0~78~(7+length1)
ParameterLength1Event Value1LengthNEvent ValueN
  • LengthN represents the length of the Nth key/value.
  • The length and protocol version are the big-endian int64 type.
  • The version of the current protocol is 1.

Event format

This section introduces the formats of Row Changed Event, DDL Event, and Resolved Event.

Row Changed Event

  • Key:

    1. {
    2. "ts":<TS>,
    3. "scm":<Schema Name>,
    4. "tbl":<Table Name>,
    5. "t":1
    6. }
    ParameterTypeDescription
    TSNumberThe timestamp of the transaction that causes the row change.
    Schema NameStringThe name of the schema where the row is in.
    Table NameStringThe name of the table where the row is in.
  • Value:

    Insert event. The newly added row data is output.

    1. {
    2. "u":{
    3. <Column Name>:{
    4. "t":<Column Type>,
    5. "h":<Where Handle>,
    6. "f":<Flag>,
    7. "v":<Column Value>
    8. },
    9. <Column Name>:{
    10. "t":<Column Type>,
    11. "h":<Where Handle>,
    12. "f":<Flag>,
    13. "v":<Column Value>
    14. }
    15. }
    16. }

    Update event. The newly added row data (“u”) and the row data before the update (“p”) are output. The latter (“p”) is output only when the old value feature is enabled.

    1. {
    2. "u":{
    3. <Column Name>:{
    4. "t":<Column Type>,
    5. "h":<Where Handle>,
    6. "f":<Flag>,
    7. "v":<Column Value>
    8. },
    9. <Column Name>:{
    10. "t":<Column Type>,
    11. "h":<Where Handle>,
    12. "f":<Flag>,
    13. "v":<Column Value>
    14. }
    15. },
    16. "p":{
    17. <Column Name>:{
    18. "t":<Column Type>,
    19. "h":<Where Handle>,
    20. "f":<Flag>,
    21. "v":<Column Value>
    22. },
    23. <Column Name>:{
    24. "t":<Column Type>,
    25. "h":<Where Handle>,
    26. "f":<Flag>,
    27. "v":<Column Value>
    28. }
    29. }
    30. }

    Delete event. The deleted row data is output. When the old value feature is enabled, the Delete event includes all the columns of the deleted row data; when this feature is disabled, the Delete event only includes the HandleKey column.

    1. {
    2. "d":{
    3. <Column Name>:{
    4. "t":<Column Type>,
    5. "h":<Where Handle>,
    6. "f":<Flag>,
    7. "v":<Column Value>
    8. },
    9. <Column Name>:{
    10. "t":<Column Type>,
    11. "h":<Where Handle>,
    12. "f":<Flag>,
    13. "v":<Column Value>
    14. }
    15. }
    16. }
    ParameterTypeDescription
    Column NameStringThe column name.
    Column TypeNumberThe column type. For details, see Column Type Code.
    Where HandleBooleanDetermines whether this column can be the filter condition of the Where clause. When this column is unique on the table, Where Handle is true.
    FlagNumberThe bit flags of columns. For details, see Bit flags of columns.
    Column ValueAnyThe Column value.

DDL Event

  • Key:

    1. {
    2. "ts":<TS>,
    3. "scm":<Schema Name>,
    4. "tbl":<Table Name>,
    5. "t":2
    6. }
    ParameterTypeDescription
    TSNumberThe timestamp of the transaction that performs the DDL change.
    Schema NameStringThe schema name of the DDL change, which might be an empty string.
    Table NameStringThe table name of the DDL change, which might be am empty string.
  • Value:

    1. {
    2. "q":<DDL Query>,
    3. "t":<DDL Type>
    4. }
    ParameterTypeDescription
    DDL QueryStringDDL Query SQL
    DDL TypeStringThe DDL type. For details, see DDL Type Code.

Resolved Event

  • Key:

    1. {
    2. "ts":<TS>,
    3. "t":3
    4. }
    ParameterTypeDescription
    TSNumberThe Resolved timestamp. Any TS earlier than this Event has been sent.
  • Value: None

Examples of the Event stream output

This section shows and displays the output logs of the Event stream.

Suppose that you execute the following SQL statement in the upstream and the MQ Partition number is 2:

  1. CREATE TABLE test.t1(id int primary key, val varchar(16));

From the following Log 1 and Log 3, you can see that the DDL Event is broadcasted to all MQ Partitions, and that the Resolved Event is periodically broadcasted to each MQ Partition.

  1. 1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
  2. 2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
  3. 3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
  4. 4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]

Execute the following SQL statements in the upstream:

  1. BEGIN;
  2. INSERT INTO test.t1(id, val) VALUES (1, 'aa');
  3. INSERT INTO test.t1(id, val) VALUES (2, 'aa');
  4. UPDATE test.t1 SET val = 'bb' WHERE id = 2;
  5. INSERT INTO test.t1(id, val) VALUES (3, 'cc');
  6. COMMIT;
  • From the following Log 5 and Log 6, you can see that Row Changed Events on the same table might be sent to different partitions based on the primary key, but changes to the same row are sent to the same partition so that the downstream can easily process the Event concurrently.
  • From Log 6, multiple changes to the same row in a transaction are only sent in one Row Changed Event.
  • Log 8 is a repeated event of Log 7. Row Changed Event might be repeated, but the first Event of each version is sent orderly.
  1. 5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"YWE=\"}}}"]
  2. 6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"YmI=\"}}}"]
  3. 7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]
  4. 8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]

Execute the following SQL statements in the upstream:

  1. BEGIN;
  2. DELETE FROM test.t1 WHERE id = 1;
  3. UPDATE test.t1 SET val = 'dd' WHERE id = 3;
  4. UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2;
  5. COMMIT;
  • Log 9 is the Row Changed Event of the Delete type. This type of Event only contains primary key columns or unique index columns.
  • Log 13 and Log 14 are Resolved Events. The Resolved Event means that in this Partition, any events smaller than the Resolved TS (including Row Changed Event and DDL Event) have been sent.
  1. 9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"]
  2. 10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"]
  3. 11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"]
  4. 12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"]
  5. 13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
  6. 14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]

Protocol parsing for consumers

Currently, TiCDC does not provide the standard parsing library for TiCDC Open Protocol, but the Golang version and Java version of parsing examples are provided. You can refer to the data format provided in this document and the following examples to implement the protocol parsing for consumers.

Column type code

Column Type Code represents the column data type of the Row Changed Event.

TypeCodeOutput ExampleDescription
TINYINT/BOOLEAN1{“t”:1,”v”:1}
SMALLINT2{“t”:2,”v”:1}
INT3{“t”:3,”v”:123}
FLOAT4{“t”:4,”v”:153.123}
DOUBLE5{“t”:5,”v”:153.123}
NULL6{“t”:6,”v”:null}
TIMESTAMP7{“t”:7,”v”:”1973-12-30 15:30:00”}
BIGINT8{“t”:8,”v”:123}
MEDIUMINT9{“t”:9,”v”:123}
DATE10/14{“t”:10,”v”:”2000-01-01”}
TIME11{“t”:11,”v”:”23:59:59”}
DATETIME12{“t”:12,”v”:”2015-12-20 23:58:58”}
YEAR13{“t”:13,”v”:1970}
VARCHAR/VARBINARY15/253{“t”:15,”v”:”test”} / {“t”:15,”v”:”\x89PNG\r\n\x1a\n”}The value is encoded in UTF-8. When the upstream type is VARBINARY, invisible characters are escaped.
BIT16{“t”:16,”v”:81}
JSON245{“t”:245,”v”:”{\”key1\”: \”value1\”}”}
DECIMAL246{“t”:246,”v”:”129012.1230000”}
ENUM247{“t”:247,”v”:1}
SET248{“t”:248,”v”:3}
TINYTEXT/TINYBLOB249{“t”:249,”v”:”5rWL6K+VdGV4dA==”}The value is encoded in Base64.
MEDIUMTEXT/MEDIUMBLOB250{“t”:250,”v”:”5rWL6K+VdGV4dA==”}The value is encoded in Base64.
LONGTEXT/LONGBLOB251{“t”:251,”v”:”5rWL6K+VdGV4dA==”}The value is encoded in Base64.
TEXT/BLOB252{“t”:252,”v”:”5rWL6K+VdGV4dA==”}The value is encoded in Base64.
CHAR/BINARY254{“t”:254,”v”:”test”} / {“t”:254,”v”:”\x89PNG\r\n\x1a\n”}The value is encoded in UTF-8. When the upstream type is BINARY, invisible characters are escaped.
GEOMETRY255Unsupported

DDL Type Code

DDL Type Code represents the DDL statement type of the DDL Event.

TypeCode
Create Schema1
Drop Schema2
Create Table3
Drop Table4
Add Column5
Drop Column6
Add Index7
Drop Index8
Add Foreign Key9
Drop Foreign Key10
Truncate Table11
Modify Column12
Rebase Auto ID13
Rename Table14
Set Default Value15
Shard RowID16
Modify Table Comment17
Rename Index18
Add Table Partition19
Drop Table Partition20
Create View21
Modify Table Charset And Collate22
Truncate Table Partition23
Drop View24
Recover Table25
Modify Schema Charset And Collate26
Lock Table27
Unlock Table28
Repair Table29
Set TiFlash Replica30
Update TiFlash Replica Status31
Add Primary Key32
Drop Primary Key33
Create Sequence34
Alter Sequence35
Drop Sequence36

Bit flags of columns

The bit flags represent specific attributes of columns.

BitValueNameDescription
10x01BinaryFlagWhether the column is a binary-encoded column.
20x02HandleKeyFlagWhether the column is a Handle index column.
30x04GeneratedColumnFlagWhether the column is a generated column.
40x08PrimaryKeyFlagWhether the column is a primary key column.
50x10UniqueKeyFlagWhether the column is a unique index column.
60x20MultipleKeyFlagWhether the column is a composite index column.
70x40NullableFlagWhether the column is a nullable column.
80x80UnsignedFlagWhether the column is an unsigned column.

Example:

If the value of a column flag is 85, the column is a nullable column, a unique index column, a generated column, and a binary-encoded column.

  1. 85 == 0b_101_0101
  2. == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag

If the value of a column is 46, the column is a composite index column, a primary key column, a generated column, and a Handle key column.

  1. 46 == 0b_010_1110
  2. == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag

TiCDC Open Protocol - 图1

Note

  • BinaryFlag is meaningful only when the column type is BLOB/TEXT (including TINYBLOB/TINYTEXT and BINARY/CHAR). When the upstream column is the BLOB type, the BinaryFlag value is set to 1. When the upstream column is the TEXT type, the BinaryFlag value is set to 0.
  • To replicate a table from the upstream, TiCDC selects a valid index as the Handle index. The HandleKeyFlag value of the Handle index column is set to 1.