TiCDC Simple Protocol

TiCDC Simple Protocol 是一种行级别的数据变更通知协议,为监控、缓存、全文索引、分析引擎、异构数据库的主从复制等提供数据源。本文将介绍 TiCDC Simple Protocol 的使用方法和数据格式实现。

使用方式

当使用 Kafka 作为下游时,你可以在 changefeed 配置中指定 protocol"simple",TiCDC 会将每个行变更或者 DDL 事件 (event) 编码为一个 Message,向下游发送数据变更事件。

使用 Simple Protocol 时的配置样例如下所示:

sink-uri 配置:

  1. --sink-uri = "kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0"

changefeed 配置:

  1. [sink]
  2. protocol = "simple"
  3. # 以下为 Simple Protocol 参数,用来控制 bootstrap 消息的发送行为。
  4. # send-bootstrap-interval-in-sec 用来控制发送 bootstrap 消息的时间间隔,单位为秒。
  5. # 默认值为 120 秒,即每张表每隔 120 秒发送一次 bootstrap 消息。
  6. send-bootstrap-interval-in-sec = 120
  7. # send-bootstrap-in-msg-count 用来控制发送 bootstrap 的消息间隔,单位为消息数。
  8. # 默认值为 10000,即每张表每发送 10000 条行变更消息就发送一次 bootstrap 消息。
  9. send-bootstrap-in-msg-count = 10000
  10. # 注意:如果要关闭 bootstrap 消息的发送,则将 send-bootstrap-interval-in-sec 和 send-bootstrap-in-msg-count 均设置为 0。
  11. # send-bootstrap-to-all-partition 用来控制是否发送 bootstrap 消息到所有的 partition。
  12. # 默认值为 true,即发送 bootstrap 消息到对应表 topic 的所有的 partition。
  13. # 如果设置为 false,则只发送 bootstrap 消息到对应表 topic 的第一个 partition。
  14. send-bootstrap-to-all-partition = true
  15. [sink.kafka-config.codec-config]
  16. # encoding-format 用来控制消息的编码格式,目前支持 "json" 和 "avro" 两种格式。
  17. # 默认值为 "json"。
  18. encoding-format = "json"

Message 类型

TiCDC Simple Protocol 支持如下 Message 类型:

DDL:

  • CREATE:创建表。
  • RENAME:重命名表。
  • CINDEX:创建索引。
  • DINDEX:删除索引。
  • ERASE:删除表。
  • TRUNCATE:清空表。
  • ALTER:修改表结构,包括增加列、删除列、修改列类型和其他 TiCDC 支持的 ALTER TABLE 语句。
  • QUERY:其他 DDL 语句。

DML:

  • INSERT:插入事件。
  • UPDATE:更新事件。
  • DELETE:删除事件。

其他:

  • WATERMARK:与上游 TiDB 集群的 TSO 含义相同,包含一个 64 位的 timestamp,用于标记一个表的同步进度,所有早于 watermark 的事件都已经发送给下游。
  • BOOTSTRAP:包含了一张表的 schema 信息,用于给下游构建表的结构。

Message 格式

在 Simple Protocol 中,每一个 Message 都只会包含一个事件。当前 Simple Protocol 支持把消息编码为 JSON 格式和 Avro 格式。本文将以 JSON 格式为例进行说明。对于 Avro 格式的消息,其字段和含义与 JSON 格式的消息一致,只是编码格式不同,格式详见 Simple Protocol Avro Schema

DDL

TiCDC 会把一个 DDL 事件编码成如下的 JSON 格式:

  1. {
  2. "version":1,
  3. "type":"ALTER",
  4. "sql":"ALTER TABLE `user` ADD COLUMN `createTime` TIMESTAMP",
  5. "commitTs":447987408682614795,
  6. "buildTs":1708936343598,
  7. "tableSchema":{
  8. "schema":"simple",
  9. "table":"user",
  10. "tableID":148,
  11. "version":447987408682614791,
  12. "columns":[
  13. {
  14. "name":"id",
  15. "dataType":{
  16. "mysqlType":"int",
  17. "charset":"binary",
  18. "collate":"binary",
  19. "length":11
  20. },
  21. "nullable":false,
  22. "default":null
  23. },
  24. {
  25. "name":"name",
  26. "dataType":{
  27. "mysqlType":"varchar",
  28. "charset":"utf8mb4",
  29. "collate":"utf8mb4_bin",
  30. "length":255
  31. },
  32. "nullable":true,
  33. "default":null
  34. },
  35. {
  36. "name":"age",
  37. "dataType":{
  38. "mysqlType":"int",
  39. "charset":"binary",
  40. "collate":"binary",
  41. "length":11
  42. },
  43. "nullable":true,
  44. "default":null
  45. },
  46. {
  47. "name":"score",
  48. "dataType":{
  49. "mysqlType":"float",
  50. "charset":"binary",
  51. "collate":"binary",
  52. "length":12
  53. },
  54. "nullable":true,
  55. "default":null
  56. },
  57. {
  58. "name":"createTime",
  59. "dataType":{
  60. "mysqlType":"timestamp",
  61. "charset":"binary",
  62. "collate":"binary",
  63. "length":19
  64. },
  65. "nullable":true,
  66. "default":null
  67. }
  68. ],
  69. "indexes":[
  70. {
  71. "name":"primary",
  72. "unique":true,
  73. "primary":true,
  74. "nullable":false,
  75. "columns":[
  76. "id"
  77. ]
  78. }
  79. ]
  80. },
  81. "preTableSchema":{
  82. "schema":"simple",
  83. "table":"user",
  84. "tableID":148,
  85. "version":447984074911121426,
  86. "columns":[
  87. {
  88. "name":"id",
  89. "dataType":{
  90. "mysqlType":"int",
  91. "charset":"binary",
  92. "collate":"binary",
  93. "length":11
  94. },
  95. "nullable":false,
  96. "default":null
  97. },
  98. {
  99. "name":"name",
  100. "dataType":{
  101. "mysqlType":"varchar",
  102. "charset":"utf8mb4",
  103. "collate":"utf8mb4_bin",
  104. "length":255
  105. },
  106. "nullable":true,
  107. "default":null
  108. },
  109. {
  110. "name":"age",
  111. "dataType":{
  112. "mysqlType":"int",
  113. "charset":"binary",
  114. "collate":"binary",
  115. "length":11
  116. },
  117. "nullable":true,
  118. "default":null
  119. },
  120. {
  121. "name":"score",
  122. "dataType":{
  123. "mysqlType":"float",
  124. "charset":"binary",
  125. "collate":"binary",
  126. "length":12
  127. },
  128. "nullable":true,
  129. "default":null
  130. }
  131. ],
  132. "indexes":[
  133. {
  134. "name":"primary",
  135. "unique":true,
  136. "primary":true,
  137. "nullable":false,
  138. "columns":[
  139. "id"
  140. ]
  141. }
  142. ]
  143. }
  144. }

以上 JSON 数据的字段解释如下:

字段类型说明
versionNumber协议版本号,目前为 1
typeStringDDL 事件类型,包括 CREATERENAMECINDEXDINDEXERASETRUNCATEALTERQUERY
sqlStringDDL 语句。
commitTsNumber该 DDL 在上游执行结束时的 commitTs
buildTsNumber该消息在 TiCDC 内部被编码成功时的 UNIX 时间戳。
tableSchemaObject表的当前 schema 信息,详见 TableSchema 定义
preTableSchemaObjectDDL 执行前的表的 schema 信息。除了 CREATE 类型的 DDL 事件外,其他类型的 DDL 事件都会包含该字段。

DML

INSERT

TiCDC 会把一个 INSERT 事件编码成如下的 JSON 格式:

  1. {
  2. "version":1,
  3. "database":"simple",
  4. "table":"user",
  5. "tableID":148,
  6. "type":"INSERT",
  7. "commitTs":447984084414103554,
  8. "buildTs":1708923662983,
  9. "schemaVersion":447984074911121426,
  10. "data":{
  11. "age":"25",
  12. "id":"1",
  13. "name":"John Doe",
  14. "score":"90.5"
  15. }
  16. }

以上 JSON 数据的字段解释如下:

字段类型说明
versionNumber协议版本号,目前为 1
databaseString数据库名。
tableString表名。
tableIDNumber表的 ID。
typeStringDML 事件类型,包括 INSERTUPDATEDELETE
commitTsNumber该 DML 在上游执行结束时的 commitTs
buildTsNumber该消息在 TiCDC 内部被编码成功时的 UNIX 时间戳。
schemaVersionNumber编码该 DML 消息时所使用表的 schema 版本号。
dataObject插入的数据,字段名为列名,字段值为列值。

INSERT 类型的事件只包含 data 字段,不包含 old 字段。

UPDATE

TiCDC 会把一个 UPDATE 事件编码成如下的 JSON 格式:

  1. {
  2. "version":1,
  3. "database":"simple",
  4. "table":"user",
  5. "tableID":148,
  6. "type":"UPDATE",
  7. "commitTs":447984099186180098,
  8. "buildTs":1708923719184,
  9. "schemaVersion":447984074911121426,
  10. "data":{
  11. "age":"25",
  12. "id":"1",
  13. "name":"John Doe",
  14. "score":"95"
  15. },
  16. "old":{
  17. "age":"25",
  18. "id":"1",
  19. "name":"John Doe",
  20. "score":"90.5"
  21. }
  22. }

以上 JSON 数据的字段解释如下:

字段类型说明
versionNumber协议版本号,目前为 1
databaseString数据库名。
tableString表名。
tableIDNumber表的 ID。
typeStringDML 事件类型,包括 INSERTUPDATEDELETE
commitTsNumber该 DML 在上游执行结束时的 commitTs
buildTsNumber该消息在 TiCDC 内部被编码成功时的 UNIX 时间戳。
schemaVersionNumber编码该 DML 消息时所使用表的 schema 版本号。
dataObject更新后的数据,字段名为列名,字段值为列值。
oldObject更新前的数据,字段名为列名,字段值为列值。

UPDATE 类型的事件包含 dataold 两个字段,分别表示更新后的数据和更新前的数据。

DELETE

TiCDC 会把一个 DELETE 事件编码成如下的 JSON 格式:

  1. {
  2. "version":1,
  3. "database":"simple",
  4. "table":"user",
  5. "tableID":148,
  6. "type":"DELETE",
  7. "commitTs":447984114259722243,
  8. "buildTs":1708923776484,
  9. "schemaVersion":447984074911121426,
  10. "old":{
  11. "age":"25",
  12. "id":"1",
  13. "name":"John Doe",
  14. "score":"95"
  15. }
  16. }

以上 JSON 数据的字段解释如下:

字段类型说明
versionNumber协议版本号,目前为 1
databaseString数据库名。
tableString表名。
tableIDNumber表的 ID。
typeStringDML 事件类型,包括 INSERTUPDATEDELETE
commitTsNumber该 DML 在上游执行结束的 commitTs
buildTsNumber该消息在 TiCDC 内部被编码成功时的 UNIX 时间戳。
schemaVersionNumber编码该 DML 消息时所使用表的 schema 版本号。
oldObject删除的数据,字段名为列名,字段值为列值。

DELETE 类型的事件只包含 old 字段,不包含 data 字段。

WATERMARK

TiCDC 会把一个 WATERMARK 事件编码成如下的 JSON 格式:

  1. {
  2. "version":1,
  3. "type":"WATERMARK",
  4. "commitTs":447984124732375041,
  5. "buildTs":1708923816911
  6. }

以上 JSON 数据的字段解释如下:

字段类型说明
versionNumber协议版本号,目前为 1
typeStringWATERMARK 事件类型。
commitTsNumber该 WATERMARK 的 commitTs
buildTsNumber该消息在 TiCDC 内部被编码成功时的 UNIX 时间戳。

BOOTSTRAP

TiCDC 会把一个 BOOTSTRAP 事件编码成如下的 JSON 格式:

  1. {
  2. "version":1,
  3. "type":"BOOTSTRAP",
  4. "commitTs":0,
  5. "buildTs":1708924603278,
  6. "tableSchema":{
  7. "schema":"simple",
  8. "table":"new_user",
  9. "tableID":148,
  10. "version":447984074911121426,
  11. "columns":[
  12. {
  13. "name":"id",
  14. "dataType":{
  15. "mysqlType":"int",
  16. "charset":"binary",
  17. "collate":"binary",
  18. "length":11
  19. },
  20. "nullable":false,
  21. "default":null
  22. },
  23. {
  24. "name":"name",
  25. "dataType":{
  26. "mysqlType":"varchar",
  27. "charset":"utf8mb4",
  28. "collate":"utf8mb4_bin",
  29. "length":255
  30. },
  31. "nullable":true,
  32. "default":null
  33. },
  34. {
  35. "name":"age",
  36. "dataType":{
  37. "mysqlType":"int",
  38. "charset":"binary",
  39. "collate":"binary",
  40. "length":11
  41. },
  42. "nullable":true,
  43. "default":null
  44. },
  45. {
  46. "name":"score",
  47. "dataType":{
  48. "mysqlType":"float",
  49. "charset":"binary",
  50. "collate":"binary",
  51. "length":12
  52. },
  53. "nullable":true,
  54. "default":null
  55. }
  56. ],
  57. "indexes":[
  58. {
  59. "name":"primary",
  60. "unique":true,
  61. "primary":true,
  62. "nullable":false,
  63. "columns":[
  64. "id"
  65. ]
  66. }
  67. ]
  68. }
  69. }

以上 JSON 数据的字段解释如下:

字段类型说明
versionNumber协议版本号,目前为 1
typeStringBOOTSTRAP 事件类型。
commitTsNumberBOOTSTRAP 的 commitTs0,因为它是 TiCDC 内部生成的,其 commitTs 没有意义。
buildTsNumber该消息在 TiCDC 内部被编码成功时的 UNIX 时间戳。
tableSchemaObject表的 schema 信息,详见 TableSchema 定义

Message 生成和发送规则

DDL

  • 生成时机:DDL 事件将会在该 DDL 发生之前的所有事务都被发送完毕后发送。
  • 发送目的地:DDL 事件将会被发送到对应 Topic 的所有的 Partition。

DML

  • 生成时机:DML 事件会按照事务的 commitTs 顺序被发送。
  • 发送目的地:DML 事件将会按照用户配置的 Dispatch 规则发送到对应 Topic 的对应 Partition。

WATERMARK

  • 生成时机:TiCDC 会周期性地发送 WATERMARK 事件,用于标记一个 changefeed 的同步进度,目前的周期为 1 秒。
  • 发送目的地:WATERMARK 事件将会被发送到对应 Topic 的所有 Partition。

BOOTSTRAP

  • 生成时机:
    • 创建一个新的 changefeed 后,在一张表的第一条 DML 事件发送之前,TiCDC 会发送 BOOTSTRAP 事件给下游,用于给下游构建表的结构。
    • 此外,TiCDC 会周期性地发送 BOOTSTRAP 事件,以供下游新加入的 consumer 构建表的结构。目前默认每 120 秒或者每间隔 10000 个消息发送一次,可以通过 sink 配置项 send-bootstrap-interval-in-secsend-bootstrap-in-msg-count 来调整发送周期。
    • 如果一张表在 30 分钟内没有收到任何新的 DML 消息,那么该表将被认为是不活跃的。TiCDC 将停止为该表发送 BOOTSTRAP 事件,直到该表收到新的 DML 事件。
  • 发送目的地:BOOTSTRAP 事件默认发送到对应 Topic 的所有 Partition,可以通过 sink 配置项 send-bootstrap-to-all-partition 来调整该发送策略。

Message 消费方法

由于 Simple Protocol 在发送 DML 消息时没有包含表的 schema 信息,因此在消费一条 DML 消息之前,下游需要先接收到 DDL 或者 BOOTSTRAP 消息,并且把表的 schema 信息缓存起来。在接收到 DML 消息时,通过 DML 消息中的 table 名和 schemaVersion 字段去缓存中查找对应的 tableSchema 信息,从而正确地消费 DML 消息。

下面介绍如何根据 DDL 或者 BOOTSTRAP 消息来消费 DML 消息。

根据上文描述,已知如下信息:

  • 每个 DML 消息都会包含一个 schemaVersion 字段,用于标记该 DML 消息对应的表的 schema 版本号。
  • 每个 DDL 消息都会包含一个 tableSchemapreTableSchema 字段,用于标记该 DDL 发生前后的表的 schema 信息。
  • 每个 BOOTSTRAP 消息都会包含一个 tableSchema 字段,用于标记该 BOOTSTRAP 对应的表的 schema 信息。

接下来介绍两种场景下的消费方法。

场景一:消费者从头开始消费

在此场景下,消费者从创建表开始消费,因此消费者能够接收到该表的所有 DDL 和 BOOTSTRAP 消息。此时,消费者可以通过一个 DML 消息中的 table 名和 schemaVersion 字段来获取对应的 tableSchema 信息。具体过程如下图所示:

TiCDC Simple Protocol consumer scene 1

场景二:消费者从中间开始消费

在一个新的消费者加入到消费者组时,它可能会从中间开始消费,因此它可能会错过之前的 DDL 和 BOOTSTRAP 消息。在这种情况下,消费者可能会先接收到一些 DML 消息,但是此时它还没有该表的 schema 信息。因此,它需要先等待一段时间,直到它接收到该表 DDL 或 BOOTSTRAP 消息,从而获取到该表的 schema 信息。由于 TiCDC 会周期性地发送 BOOTSTRAP 消息,消费者总是能够在一段时间内获取到该表的 schema 信息。具体过程如下图所示:

TiCDC Simple Protocol consumer scene 2

参考

TableSchema 定义

TableSchema 是一个 JSON 对象,包含了表的 schema 信息,包括表名、表 ID、表的版本号、列信息和索引信息。其 JSON 消息格式如下:

  1. {
  2. "schema":"simple",
  3. "table":"user",
  4. "tableID":148,
  5. "version":447984074911121426,
  6. "columns":[
  7. {
  8. "name":"id",
  9. "dataType":{
  10. "mysqlType":"int",
  11. "charset":"binary",
  12. "collate":"binary",
  13. "length":11
  14. },
  15. "nullable":false,
  16. "default":null
  17. },
  18. {
  19. "name":"name",
  20. "dataType":{
  21. "mysqlType":"varchar",
  22. "charset":"utf8mb4",
  23. "collate":"utf8mb4_bin",
  24. "length":255
  25. },
  26. "nullable":true,
  27. "default":null
  28. },
  29. {
  30. "name":"age",
  31. "dataType":{
  32. "mysqlType":"int",
  33. "charset":"binary",
  34. "collate":"binary",
  35. "length":11
  36. },
  37. "nullable":true,
  38. "default":null
  39. },
  40. {
  41. "name":"score",
  42. "dataType":{
  43. "mysqlType":"float",
  44. "charset":"binary",
  45. "collate":"binary",
  46. "length":12
  47. },
  48. "nullable":true,
  49. "default":null
  50. }
  51. ],
  52. "indexes":[
  53. {
  54. "name":"primary",
  55. "unique":true,
  56. "primary":true,
  57. "nullable":false,
  58. "columns":[
  59. "id"
  60. ]
  61. }
  62. ]
  63. }

以上 JSON 数据的字段解释如下:

字段类型说明
schemaString数据库名。
tableString表名。
tableIDNumber表的 ID。
versionNumber表的 schema 版本号。
columnsArray列信息,包括列名、数据类型、是否可为空、默认值等。
indexesArray索引信息,包括索引名、是否唯一、是否为主键、索引列等。

你可以通过表名和表的 schema 版本号来唯一标识一张表的 schema 信息。

TiCDC Simple Protocol - 图3

注意

由于 TiDB 的实现限制,在执行 RENAME TABLE 的 DDL 操作时,表的 schema 版本号不会发生变化。

Column 定义

Column 是一个 JSON 对象,包含了列的 schema 信息,包括列名、数据类型、是否可为空、默认值等。

  1. {
  2. "name":"id",
  3. "dataType":{
  4. "mysqlType":"int",
  5. "charset":"binary",
  6. "collate":"binary",
  7. "length":11
  8. },
  9. "nullable":false,
  10. "default":null
  11. }

以上 JSON 数据的字段解释如下:

字段类型说明
nameString列名。
dataTypeObject数据类型信息,包括 MySQL 数据类型、字符集、字符序、字段长度。
nullableBoolean是否可为空。
defaultString默认值。

Index 定义

Index 是一个 JSON 对象,包含了索引的 schema 信息,包括索引名、是否唯一、是否为主键、索引列等。

  1. {
  2. "name":"primary",
  3. "unique":true,
  4. "primary":true,
  5. "nullable":false,
  6. "columns":[
  7. "id"
  8. ]
  9. }

以上 JSON 数据的字段解释如下:

字段类型说明
nameString索引名。
uniqueBoolean是否唯一。
primaryBoolean是否为主键。
nullableBoolean是否可为空。
columnsArray索引包含的列名。

mysqlType 参考表格

以下表格描述了 TiCDC Simple Protocol 中所有的 mysqlType 字段的取值范围及其在 TiDB (Golang) 和 Avro (JAVA) 中的类型。当你需要对 DML 消息进行解析时,取决于你所使用的协议和语言,可以根据该表格和 DML 消息中的 mysqlType 字段来正确地解析数据。

其中,TiDB Type (Golang) 代表了对应 mysqlType 在 TiDB 和 TiCDC (Golang) 中处理时的类型,Avro Type (Java) 代表了对应 mysqlType 在编码为 Avro 格式消息时的类型。

mysqlType取值范围TiDB Type (Golang)Avro Type (Java)
tinyint[-128, 127]int64long
tinyint unsigned[0, 255]uint64long
smallint[-32768, 32767]int64long
smallint unsigned[0, 65535]uint64long
mediumint[-8388608, 8388607]int64long
mediumint unsigned[0, 16777215]uint64long
int[-2147483648, 2147483647]int64long
int unsigned[0, 4294967295]uint64long
bigint[-9223372036854775808, 9223372036854775807]int64long
bigint unsigned[0, 9223372036854775807]uint64long
bigint unsigned[9223372036854775808, 18446744073709551615]uint64string
float/float32float
double/float64double
decimal/stringstring
varchar/[]uint8string
char/[]uint8string
varbinary/[]uint8bytes
binary/[]uint8bytes
tinytext/[]uint8string
text/[]uint8string
mediumtext/[]uint8string
longtext/[]uint8string
tinyblob/[]uint8bytes
blob/[]uint8bytes
mediumblob/[]uint8bytes
longblob/[]uint8bytes
date/stringstring
datetime/stringstring
timestamp/stringstring
time/stringstring
year/int64long
enum/uint64long
set/uint64long
bit/uint64long
json/stringstring
bool/int64long

Avro Schema 定义

Simple Protocol 支持输出 Avro 格式的消息,Avro Schema 格式请参考 Simple Protocol Avro Schema