TiCDC Simple Protocol

TiCDC Simple protocol is a row-level data change notification protocol that provides data sources for monitoring, caching, full-text indexing, analysis engines, and primary-secondary replication between heterogeneous databases. This document describes how to use the TiCDC Simple protocol and the data format implementation.

Use the TiCDC Simple protocol

When you use Kafka as the downstream, specify protocol as "simple" in the changefeed configuration. Then TiCDC encodes each row change or DDL event as a message, and sends the data change event to the downstream.

The configuration example for using the Simple protocol is as follows:

sink-uri configuration:

  1. --sink-uri = "kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0"

Changefeed configuration:

  1. [sink]
  2. protocol = "simple"
  3. # The following configuration parameters control the sending behavior of bootstrap messages.
  4. # send-bootstrap-interval-in-sec controls the time interval for sending bootstrap messages, in seconds.
  5. # The default value is 120 seconds, which means that a bootstrap message is sent every 120 seconds for each table.
  6. send-bootstrap-interval-in-sec = 120
  7. # send-bootstrap-in-msg-count controls the message interval for sending bootstrap, in message count.
  8. # The default value is 10000, which means that a bootstrap message is sent every 10000 row changed messages for each table.
  9. send-bootstrap-in-msg-count = 10000
  10. # Note: If you want to disable the sending of bootstrap messages, set both send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count to 0.
  11. # send-bootstrap-to-all-partition controls whether to send bootstrap messages to all partitions.
  12. # The default value is true, which means that bootstrap messages are sent to all partitions of the corresponding table topic.
  13. # Setting it to false means bootstrap messages are sent to only the first partition of the corresponding table topic.
  14. send-bootstrap-to-all-partition = true
  15. [sink.kafka-config.codec-config]
  16. # encoding-format controls the encoding format of the Simple protocol messages. Currently, the Simple protocol message supports "json" and "avro" encoding formats.
  17. # The default value is "json".
  18. encoding-format = "json"

Message types

The TiCDC Simple protocol has the following message types.

DDL:

  • CREATE: the creating table event.
  • RENAME: the renaming table event.
  • CINDEX: the creating index event.
  • DINDEX: the deleting index event.
  • ERASE: the deleting table event.
  • TRUNCATE: the truncating table event.
  • ALTER: the altering table event, including adding columns, dropping columns, modifying column types, and other ALTER TABLE statements supported by TiCDC.
  • QUERY: other DDL events.

DML:

  • INSERT: the inserting event.
  • UPDATE: the updating event.
  • DELETE: the deleting event.

Other:

  • WATERMARK: containing a TSO (that is, a 64-bit timestamp) of the upstream TiDB cluster, which marks the table replication progress. All events earlier than the watermark have been sent to the downstream.
  • BOOTSTRAP: containing the schema information of a table, used to build the table schema for the downstream.

Message format

In the Simple protocol, each message contains only one event. The Simple protocol supports encoding messages in JSON and Avro formats. This document uses JSON format as an example. For Avro format messages, their fields and meanings are the same as those in JSON format messages, but the encoding format is different. For details about the Avro format, see Simple Protocol Avro Schema.

DDL

TiCDC encodes a DDL event in the following JSON format:

  1. {
  2. "version":1,
  3. "type":"ALTER",
  4. "sql":"ALTER TABLE `user` ADD COLUMN `createTime` TIMESTAMP",
  5. "commitTs":447987408682614795,
  6. "buildTs":1708936343598,
  7. "tableSchema":{
  8. "schema":"simple",
  9. "table":"user",
  10. "tableID":148,
  11. "version":447987408682614791,
  12. "columns":[
  13. {
  14. "name":"id",
  15. "dataType":{
  16. "mysqlType":"int",
  17. "charset":"binary",
  18. "collate":"binary",
  19. "length":11
  20. },
  21. "nullable":false,
  22. "default":null
  23. },
  24. {
  25. "name":"name",
  26. "dataType":{
  27. "mysqlType":"varchar",
  28. "charset":"utf8mb4",
  29. "collate":"utf8mb4_bin",
  30. "length":255
  31. },
  32. "nullable":true,
  33. "default":null
  34. },
  35. {
  36. "name":"age",
  37. "dataType":{
  38. "mysqlType":"int",
  39. "charset":"binary",
  40. "collate":"binary",
  41. "length":11
  42. },
  43. "nullable":true,
  44. "default":null
  45. },
  46. {
  47. "name":"score",
  48. "dataType":{
  49. "mysqlType":"float",
  50. "charset":"binary",
  51. "collate":"binary",
  52. "length":12
  53. },
  54. "nullable":true,
  55. "default":null
  56. },
  57. {
  58. "name":"createTime",
  59. "dataType":{
  60. "mysqlType":"timestamp",
  61. "charset":"binary",
  62. "collate":"binary",
  63. "length":19
  64. },
  65. "nullable":true,
  66. "default":null
  67. }
  68. ],
  69. "indexes":[
  70. {
  71. "name":"primary",
  72. "unique":true,
  73. "primary":true,
  74. "nullable":false,
  75. "columns":[
  76. "id"
  77. ]
  78. }
  79. ]
  80. },
  81. "preTableSchema":{
  82. "schema":"simple",
  83. "table":"user",
  84. "tableID":148,
  85. "version":447984074911121426,
  86. "columns":[
  87. {
  88. "name":"id",
  89. "dataType":{
  90. "mysqlType":"int",
  91. "charset":"binary",
  92. "collate":"binary",
  93. "length":11
  94. },
  95. "nullable":false,
  96. "default":null
  97. },
  98. {
  99. "name":"name",
  100. "dataType":{
  101. "mysqlType":"varchar",
  102. "charset":"utf8mb4",
  103. "collate":"utf8mb4_bin",
  104. "length":255
  105. },
  106. "nullable":true,
  107. "default":null
  108. },
  109. {
  110. "name":"age",
  111. "dataType":{
  112. "mysqlType":"int",
  113. "charset":"binary",
  114. "collate":"binary",
  115. "length":11
  116. },
  117. "nullable":true,
  118. "default":null
  119. },
  120. {
  121. "name":"score",
  122. "dataType":{
  123. "mysqlType":"float",
  124. "charset":"binary",
  125. "collate":"binary",
  126. "length":12
  127. },
  128. "nullable":true,
  129. "default":null
  130. }
  131. ],
  132. "indexes":[
  133. {
  134. "name":"primary",
  135. "unique":true,
  136. "primary":true,
  137. "nullable":false,
  138. "columns":[
  139. "id"
  140. ]
  141. }
  142. ]
  143. }
  144. }

The fields in the preceding JSON data are explained as follows:

FieldTypeDescription
versionNumberThe version number of the protocol, which is currently 1.
typeStringThe DDL event type, including CREATE, RENAME, CINDEX, DINDEX, ERASE, TRUNCATE, ALTER, and QUERY.
sqlStringThe DDL statement.
commitTsNumberThe commit timestamp when the DDL statement execution is completed in the upstream.
buildTsNumberThe UNIX timestamp when the message is successfully encoded within TiCDC.
tableSchemaObjectThe current schema information of the table. For more information, see TableSchema definition.
preTableSchemaObjectThe schema information of the table before the DDL statement is executed. All DDL events, except the CREATE type of DDL event, have this field.

DML

INSERT

TiCEC encodes an INSERT event in the following JSON format:

  1. {
  2. "version":1,
  3. "database":"simple",
  4. "table":"user",
  5. "tableID":148,
  6. "type":"INSERT",
  7. "commitTs":447984084414103554,
  8. "buildTs":1708923662983,
  9. "schemaVersion":447984074911121426,
  10. "data":{
  11. "age":"25",
  12. "id":"1",
  13. "name":"John Doe",
  14. "score":"90.5"
  15. }
  16. }

The fields in the preceding JSON data are explained as follows:

FieldTypeDescription
versionNumberThe version number of the protocol, which is currently 1.
databaseStringThe name of the database.
tableStringThe name of the table.
tableIDNumberThe ID of the table.
typeStringThe DML event type, including INSERT, UPDATE, and DELETE.
commitTsNumberThe commit timestamp when the DML statement execution is completed in the upstream.
buildTsNumberThe UNIX timestamp when the message is successfully encoded within TiCDC.
schemaVersionNumberThe schema version number of the table when the DML message is encoded.
dataObjectThe inserted data, where the field name is the column name and the field value is the column value.

The INSERT event contains the data field, and does not contain the old field.

UPDATE

TiCDC encodes an UPDATE event in the following JSON format:

  1. {
  2. "version":1,
  3. "database":"simple",
  4. "table":"user",
  5. "tableID":148,
  6. "type":"UPDATE",
  7. "commitTs":447984099186180098,
  8. "buildTs":1708923719184,
  9. "schemaVersion":447984074911121426,
  10. "data":{
  11. "age":"25",
  12. "id":"1",
  13. "name":"John Doe",
  14. "score":"95"
  15. },
  16. "old":{
  17. "age":"25",
  18. "id":"1",
  19. "name":"John Doe",
  20. "score":"90.5"
  21. }
  22. }

The fields in the preceding JSON data are explained as follows:

FieldTypeDescription
versionNumberThe version number of the protocol, which is currently 1.
databaseStringThe name of the database.
tableStringThe name of the table.
tableIDNumberThe ID of the table.
typeStringThe DML event type, including INSERT, UPDATE, and DELETE.
commitTsNumberThe commit timestamp when the DML statement execution is completed in the upstream.
buildTsNumberThe UNIX timestamp when the message is successfully encoded within TiCDC.
schemaVersionNumberThe schema version number of the table when the DML message is encoded.
dataObjectThe data after updating, where the field name is the column name and the field value is the column value.
oldObjectThe data before updating, where the field name is the column name and the field value is the column value.

The UPDATE event contains both the data and old fields, which represent the data after and before updating respectively.

DELETE

TiCDC encodes a DELETE event in the following JSON format:

  1. {
  2. "version":1,
  3. "database":"simple",
  4. "table":"user",
  5. "tableID":148,
  6. "type":"DELETE",
  7. "commitTs":447984114259722243,
  8. "buildTs":1708923776484,
  9. "schemaVersion":447984074911121426,
  10. "old":{
  11. "age":"25",
  12. "id":"1",
  13. "name":"John Doe",
  14. "score":"95"
  15. }
  16. }

The fields in the preceding JSON data are explained as follows:

FieldTypeDescription
versionNumberThe version number of the protocol, which is currently 1.
databaseStringThe name of the database.
tableStringThe name of the table.
tableIDNumberThe ID of the table.
typeStringThe DML event type, including INSERT, UPDATE, and DELETE.
commitTsNumberThe commit timestamp when the DML statement execution is completed in the upstream.
buildTsNumberThe UNIX timestamp when the message is successfully encoded within TiCDC.
schemaVersionNumberThe schema version number of the table when the DML message is encoded.
oldObjectThe deleted data, where the field name is the column name and the field value is the column value.

The DELETE event contains the old field, and does not contain the data field.

WATERMARK

TiCDC encodes a WATERMARK event in the following JSON format:

  1. {
  2. "version":1,
  3. "type":"WATERMARK",
  4. "commitTs":447984124732375041,
  5. "buildTs":1708923816911
  6. }

The fields in the preceding JSON data are explained as follows:

FieldTypeDescription
versionNumberThe version number of the protocol, which is currently 1.
typeStringThe WATERMARK event type.
commitTsNumberThe commit timestamp of the WATERMARK.
buildTsNumberThe UNIX timestamp when the message is successfully encoded within TiCDC.

BOOTSTRAP

TiCDC encodes a BOOTSTRAP event in the following JSON format:

  1. {
  2. "version":1,
  3. "type":"BOOTSTRAP",
  4. "commitTs":0,
  5. "buildTs":1708924603278,
  6. "tableSchema":{
  7. "schema":"simple",
  8. "table":"new_user",
  9. "tableID":148,
  10. "version":447984074911121426,
  11. "columns":[
  12. {
  13. "name":"id",
  14. "dataType":{
  15. "mysqlType":"int",
  16. "charset":"binary",
  17. "collate":"binary",
  18. "length":11
  19. },
  20. "nullable":false,
  21. "default":null
  22. },
  23. {
  24. "name":"name",
  25. "dataType":{
  26. "mysqlType":"varchar",
  27. "charset":"utf8mb4",
  28. "collate":"utf8mb4_bin",
  29. "length":255
  30. },
  31. "nullable":true,
  32. "default":null
  33. },
  34. {
  35. "name":"age",
  36. "dataType":{
  37. "mysqlType":"int",
  38. "charset":"binary",
  39. "collate":"binary",
  40. "length":11
  41. },
  42. "nullable":true,
  43. "default":null
  44. },
  45. {
  46. "name":"score",
  47. "dataType":{
  48. "mysqlType":"float",
  49. "charset":"binary",
  50. "collate":"binary",
  51. "length":12
  52. },
  53. "nullable":true,
  54. "default":null
  55. }
  56. ],
  57. "indexes":[
  58. {
  59. "name":"primary",
  60. "unique":true,
  61. "primary":true,
  62. "nullable":false,
  63. "columns":[
  64. "id"
  65. ]
  66. }
  67. ]
  68. }
  69. }

The fields in the preceding JSON data are explained as follows:

FieldTypeDescription
versionNumberThe version number of the protocol, which is currently 1.
typeStringThe BOOTSTRAP event type.
commitTsNumberThe commitTs of the BOOTSTRAP is 0. Because it is generated internally by TiCDC, its commitTs is meaningless.
buildTsNumberThe UNIX timestamp when the message is successfully encoded within TiCDC.
tableSchemaObjectThe schema information of the table. For more information, see TableSchema definition.

Message generation and sending rules

DDL

  • Generation time: TiCDC sends a DDL event after all transactions before this DDL event have been sent.
  • Destination: TiCDC sends DDL events to all partitions of the corresponding topic.

DML

  • Generation time: TiCDC sends DML events in the order of the commitTs of the transaction.
  • Destination: TiCDC sends DDL events to the corresponding partition of the corresponding topic according to the user-configured dispatch rules.

WATERMARK

  • Generation time: TiCDC sends WATERMARK events periodically to mark the replication progress of a changefeed. The current interval is 1 second.
  • Destination: TiCDC sends WATERMARK events to all partitions of the corresponding topic.

BOOTSTRAP

  • Generation time:
    • After creating a new changefeed, before the first DML event of a table is sent, TiCDC sends a BOOTSTRAP event to the downstream to build the table schema.
    • Additionally, TiCDC sends BOOTSTRAP events periodically to allow newly joined consumers to build the table schema. The default interval is 120 seconds or every 10000 messages. You can adjust the sending interval by configuring the send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count parameters in the sink configuration.
    • If a table does not receive any new DML messages within 30 minutes, the table is considered inactive. TiCDC stops sending BOOTSTRAP events for the table until new DML events are received.
  • Destination: By default, TiCDC sends BOOTSTRAP events to all partitions of the corresponding topic. You can adjust the sending strategy by configuring the send-bootstrap-to-all-partition parameter in the sink configuration.

Message consumption methods

Because the TiCDC Simple protocol does not include the schema information of the table when sending a DML message, the downstream needs to receive the DDL or BOOTSTRAP message and cache the schema information of the table before consuming a DML message. When receiving a DML message, the downstream obtains the corresponding table schema information from the cache by searching the table name and schemaVersion fields of the DML message, and then correctly consumes the DML message.

The following describes how the downstream consumes DML messages based on DDL or BOOTSTRAP messages. According to preceding descriptions, the following information is known:

  • Each DML message contains a schemaVersion field to mark the schema version number of the table corresponding to the DML message.
  • Each DDL message contains a tableSchema and preTableSchema field to mark the schema information of the table before and after the DDL event.
  • Each BOOTSTRAP message contains a tableSchema field to mark the schema information of the table corresponding to the BOOTSTRAP message.

The consumption methods are introduced in the following two scenarios.

Scenario 1: The consumer starts consuming from the beginning

In this scenario, the consumer starts consuming from the creation of a table, so the consumer can receive all DDL and BOOTSTRAP messages of the table. In this case, the consumer can obtain the schema information of the table through the table name and schemaVersion field of the DML message. The detailed process is as follows:

TiCDC Simple Protocol consumer scene 1

Scenario 2: The consumer starts consuming from the middle

When a new consumer joins the consumer group, it might start consuming from the middle, so it might miss earlier DDL and BOOTSTRAP messages of the table. In this case, the consumer might receive some DML messages before obtaining the schema information of the table. Therefore, the consumer needs to wait for a period of time until it receives the DDL or BOOTSTRAP message to obtain the schema information of the table. Because TiCDC sends BOOTSTRAP messages periodically, the consumer can always obtain the schema information of the table within a period of time. The detailed process is as follows:

TiCDC Simple Protocol consumer scene 2

Reference

TableSchema definition

TableSchema is a JSON object that contains the schema information of the table, including the table name, table ID, table version number, column information, and index information. The JSON message format is as follows:

  1. {
  2. "schema":"simple",
  3. "table":"user",
  4. "tableID":148,
  5. "version":447984074911121426,
  6. "columns":[
  7. {
  8. "name":"id",
  9. "dataType":{
  10. "mysqlType":"int",
  11. "charset":"binary",
  12. "collate":"binary",
  13. "length":11
  14. },
  15. "nullable":false,
  16. "default":null
  17. },
  18. {
  19. "name":"name",
  20. "dataType":{
  21. "mysqlType":"varchar",
  22. "charset":"utf8mb4",
  23. "collate":"utf8mb4_bin",
  24. "length":255
  25. },
  26. "nullable":true,
  27. "default":null
  28. },
  29. {
  30. "name":"age",
  31. "dataType":{
  32. "mysqlType":"int",
  33. "charset":"binary",
  34. "collate":"binary",
  35. "length":11
  36. },
  37. "nullable":true,
  38. "default":null
  39. },
  40. {
  41. "name":"score",
  42. "dataType":{
  43. "mysqlType":"float",
  44. "charset":"binary",
  45. "collate":"binary",
  46. "length":12
  47. },
  48. "nullable":true,
  49. "default":null
  50. }
  51. ],
  52. "indexes":[
  53. {
  54. "name":"primary",
  55. "unique":true,
  56. "primary":true,
  57. "nullable":false,
  58. "columns":[
  59. "id"
  60. ]
  61. }
  62. ]
  63. }

The preceding JSON data is explained as follows:

FieldTypeDescription
schemaStringThe name of the database.
tableStringThe name of the table.
tableIDNumberThe ID of the table.
versionNumberThe schema version number of the table.
columnsArrayThe column information, including the column name, data type, whether it can be null, and the default value.
indexesArrayThe index information, including the index name, whether it is unique, whether it is a primary key, and the index columns.

You can uniquely identify the schema information of a table by the table name and the schema version number.

TiCDC Simple Protocol - 图3

Note

Due to the implementation limitations of TiDB, the schema version number of a table does not change when the RENAME TABLE DDL operation is executed.

Column definition

Column is a JSON object that contains the schema information of the column, including the column name, data type, whether it can be null, and the default value.

  1. {
  2. "name":"id",
  3. "dataType":{
  4. "mysqlType":"int",
  5. "charset":"binary",
  6. "collate":"binary",
  7. "length":11
  8. },
  9. "nullable":false,
  10. "default":null
  11. }

The preceding JSON data is explained as follows:

FieldTypeDescription
nameStringThe name of the column.
dataTypeObjectThe data type information, including the MySQL data type, character set, collation, and field length.
nullableBooleanWhether the column can be null.
defaultStringThe default value of the column.

Index definition

Index is a JSON object that contains the schema information of the index, including the index name, whether it is unique, whether it is a primary key, and the index column.

  1. {
  2. "name":"primary",
  3. "unique":true,
  4. "primary":true,
  5. "nullable":false,
  6. "columns":[
  7. "id"
  8. ]
  9. }

The preceding JSON data is explained as follows:

FieldTypeDescription
nameStringThe name of the index.
uniqueBooleanWhether the index is unique.
primaryBooleanWhether the index is a primary key.
nullableBooleanWhether the index can be null.
columnsArrayThe column names included in the index.

mysqlType reference table

The following table describes the value range of the mysqlType field in the TiCDC Simple protocol and its type in TiDB (Golang) and Avro (Java). When you need to parse DML messages, you can correctly parse the data according to this table and the mysqlType field in the DML message, depending on the protocol and language you use.

TiDB type (Golang) represents the type of the corresponding mysqlType when it is processed in TiDB and TiCDC (Golang). Avro type (Java) represents the type of the corresponding mysqlType when it is encoded into Avro format messages.

mysqlTypeValue rangeTiDB type (Golang)Avro type (Java)
tinyint[-128, 127]int64long
tinyint unsigned[0, 255]uint64long
smallint[-32768, 32767]int64long
smallint unsigned[0, 65535]uint64long
mediumint[-8388608, 8388607]int64long
mediumint unsigned[0, 16777215]uint64long
int[-2147483648, 2147483647]int64long
int unsigned[0, 4294967295]uint64long
bigint[-9223372036854775808, 9223372036854775807]int64long
bigint unsigned[0, 9223372036854775807]uint64long
bigint unsigned[9223372036854775808, 18446744073709551615]uint64string
float/float32float
double/float64double
decimal/stringstring
varchar/[]uint8string
char/[]uint8string
varbinary/[]uint8bytes
binary/[]uint8bytes
tinytext/[]uint8string
text/[]uint8string
mediumtext/[]uint8string
longtext/[]uint8string
tinyblob/[]uint8bytes
blob/[]uint8bytes
mediumblob/[]uint8bytes
longblob/[]uint8bytes
date/stringstring
datetime/stringstring
timestamp/stringstring
time/stringstring
year/int64long
enum/uint64long
set/uint64long
bit/uint64long
json/stringstring
bool/int64long

Avro schema definition

The Simple protocol supports outputting messages in Avro format. For details about the Avro format, see Simple Protocol Avro Schema.