TiCDC Canal-JSON Protocol

Canal-JSON 是由 Alibaba Canal 定义的一种数据交换格式协议。通过本文,你可以了解 TiCDC 对 Canal-JSON 数据格式的实现,包括 TiDB 扩展字段、Canal-JSON 数据格式定义,以及和官方实现进行对比等相关内容。

使用 Canal-JSON

当使用 MQ (Message Queue) 作为下游 Sink 时,你可以在 sink-uri 中指定使用 Canal-JSON,TiCDC 将以 Event 为基本单位封装构造 Canal-JSON Message,向下游发送 TiDB 的数据变更事件。

Event 分为三类:

  • DDL Event:代表 DDL 变更记录,在上游成功执行 DDL 语句后发出,DDL Event 会被发送到索引为 0 的 MQ Partition。
  • DML Event:代表一行数据变更记录,在行变更发生时该类 Event 被发出,包含变更后该行的相关信息。
  • WATERMARK Event:代表一个特殊的时间点,表示在这个时间点前收到的 Event 是完整的。仅适用于 TiDB 扩展字段,当你在 sink-uri 中设置 enable-tidb-extension=true 时生效。

使用 Canal-JSON 时的配置样例如下所示:

  1. cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-canal-json" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json"

TiDB 扩展字段

Canal-JSON 协议本是为 MySQL 设计的,其中并不包含 TiDB 专有的 CommitTS 事务唯一标识等重要字段。为了解决这个问题,TiCDC 在 Canal-JSON 协议格式中附加了 TiDB 扩展字段。在 sink-uri 中设置 enable-tidb-extensiontrue(默认为 false)后,TiCDC 生成 Canal-JSON 消息时的行为如下:

  • TiCDC 发送的 DML Event 和 DDL Event 类型消息中,将会含有一个名为 _tidb 的字段。
  • TiCDC 将会发送 WATERMARK Event 消息。·

配置样例如下所示:

  1. cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-canal-json-enable-tidb-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json&enable-tidb-extension=true"

Message 格式定义

下面介绍 DDL Event、DML Event 和 WATERMARK Event 的格式定义,以及消费端的数据解析。

DDL Event

TiCDC 会把一个 DDL Event 编码成如下 Canal-JSON 格式:

  1. {
  2. "id": 0,
  3. "database": "test",
  4. "table": "",
  5. "pkNames": null,
  6. "isDdl": true,
  7. "type": "QUERY",
  8. "es": 1639633094670,
  9. "ts": 1639633095489,
  10. "sql": "drop database if exists test",
  11. "sqlType": null,
  12. "mysqlType": null,
  13. "data": null,
  14. "old": null,
  15. "_tidb": { // TiDB 的扩展字段
  16. "commitTs": 163963309467037594
  17. }
  18. }

以上 JSON 数据的字段解释如下:

字段类型说明
idNumberTiCDC 默认值为 0
databaseStringRow 所在的 Database 的名字
tableStringRow 所在的 Table 的名字
pkNamesArray组成 primary key 的所有列的名字
isDdlBool该条消息是否为 DDL 事件
typeStringCanal-JSON 定义的事件类型
esNumber产生该条消息的事件发生时的 13 位(毫秒级)时间戳
tsNumberTiCDC 生成该条消息时的 13 位(毫秒级)时间戳
sqlString当 isDdl 为 true 时,记录对应的 DDL 语句
sqlTypeObject当 isDdl 为 false 时,记录每一列数据类型在 Java 中的类型表示
mysqlTypeobject当 isDdl 为 false 时,记录每一列数据类型在 MySQL 中的类型表示
dataObject当 isDdl 为 false 时,记录每一列的名字及其数据值
oldObject仅当该条消息由 Update 类型事件产生时,记录每一列的名字,和 Update 之前的数据值
_tidbObjectTiDB 扩展字段,仅当 enable-tidb-extension 为 true 时才会存在。其中的 commitTs 值为造成 Row 变更的事务的 TSO

DML Event

对于一行 DML 数据变更事件,TiCDC 会将其编码成如下形式:

  1. {
  2. "id": 0,
  3. "database": "test",
  4. "table": "tp_int",
  5. "pkNames": [
  6. "id"
  7. ],
  8. "isDdl": false,
  9. "type": "INSERT",
  10. "es": 1639633141221,
  11. "ts": 1639633142960,
  12. "sql": "",
  13. "sqlType": {
  14. "c_bigint": -5,
  15. "c_int": 4,
  16. "c_mediumint": 4,
  17. "c_smallint": 5,
  18. "c_tinyint": -6,
  19. "id": 4
  20. },
  21. "mysqlType": {
  22. "c_bigint": "bigint",
  23. "c_int": "int",
  24. "c_mediumint": "mediumint",
  25. "c_smallint": "smallint",
  26. "c_tinyint": "tinyint",
  27. "id": "int"
  28. },
  29. "data": [
  30. {
  31. "c_bigint": "9223372036854775807",
  32. "c_int": "2147483647",
  33. "c_mediumint": "8388607",
  34. "c_smallint": "32767",
  35. "c_tinyint": "127",
  36. "id": "2"
  37. }
  38. ],
  39. "old": null,
  40. "_tidb": { // TiDB 的扩展字段
  41. "commitTs": 163963314122145239
  42. }
  43. }

WATERMARK Event

仅当 enable-tidb-extensiontrue 时,TiCDC 才会发送 WATERMARK Event,其 type 字段值为 TIDB_WATERMARK。该类型事件具有 _tidb 字段,当前只含有 watermarkTs,其值为该 Event 发送时的 TSO。

当你收到一个该类型的事件,所有 commitTs 小于 watermarkTs 的事件均已发送完毕。因为 TiCDC 提供 At Least Once 语义,可能出现重复发送数据的情况。如果后续收到有 commitTs 小于 watermarkTs 的事件,可以忽略。

WATERMARK Event 的示例如下:

  1. {
  2. "id": 0,
  3. "database": "",
  4. "table": "",
  5. "pkNames": null,
  6. "isDdl": false,
  7. "type": "TIDB_WATERMARK",
  8. "es": 1640007049196,
  9. "ts": 1640007050284,
  10. "sql": "",
  11. "sqlType": null,
  12. "mysqlType": null,
  13. "data": null,
  14. "old": null,
  15. "_tidb": { // TiDB 的扩展字段
  16. "watermarkTs": 429918007904436226
  17. }
  18. }

消费端数据解析

从上面的示例中可知,Canal-JSON 具有统一的数据格式,针对不同的事件类型,有不同的字段填充规则。消费者可以使用统一的方法对该 JSON 格式的数据进行解析,然后通过判断字段值的方式,来确定具体事件类型:

  • isDdl 为 true 时,该消息含有一条 DDL Event。
  • isDdl 为 false 时,需要对 type 字段加以判断。如果 typeTIDB_WATERMARK,可得知其为 WATERMARK Event,否则就是 DML Event。

字段说明

Canal-JSON 格式会在 mysqlType 字段和 sqlType 字段中记录对应的数据类型。

MySQL Type 字段

Canal-JSON 格式会在 mysqlType 字段中记录每一列的 MySQL Type 的字符串表示。相关详情可以参考 TiDB Data Types

SQL Type 字段

Canal-JSON 格式会在 sqlType 字段中记录每一列的 Java SQL Type,即每条数据在 JDBC 中对应的数据类型,其值可以通过 MySQL Type 和具体数据值计算得到。具体对应关系如下:

MySQL TypeJava SQL Type Code
Boolean-6
Float7
Double8
Decimal3
Char1
Varchar12
Binary2004
Varbinary2004
Tinytext2005
Text2005
Mediumtext2005
Longtext2005
Tinyblob2004
Blob2004
Mediumblob2004
Longblob2004
Date91
Datetime93
Timestamp93
Time92
Year12
Enum4
Set-7
Bit-7
JSON12

整数类型

你需要考虑整数类型是否有 Unsigned 约束,以及当前取值大小,分别对应不同的 Java SQL Type Code。如下表所示。

MySQL Type StringValue RangeJava SQL Type Code
tinyint[-128, 127]-6
tinyint unsigned[0, 127]-6
tinyint unsigned[128, 255]5
smallint[-32768, 32767]5
smallint unsigned[0, 32767]5
smallint unsigned[32768, 65535]4
mediumint[-8388608, 8388607]4
mediumint unsigned[0, 8388607]4
mediumint unsigned[8388608, 16777215]4
int[-2147483648, 2147483647]4
int unsigned[0, 2147483647]4
int unsigned[2147483648, 4294967295]-5
bigint[-9223372036854775808, 9223372036854775807]-5
bigint unsigned[0, 9223372036854775807]-5
bigint unsigned[9223372036854775808, 18446744073709551615]3

TiCDC 涉及的 Java SQL Type 及其 Code 映射关系如下表所示。

Java SQL TypeJava SQL Type Code
CHAR1
DECIMAL3
INTEGER4
SMALLINT5
REAL7
DOUBLE8
VARCHAR12
DATE91
TIME92
TIMESTAMP93
BLOB2004
CLOB2005
BIGINT-5
TINYINT-6
Bit-7

想要了解 Java SQL Type 的更多信息,请参考 Java SQL Class Types

TiCDC Canal-JSON 和 Canal 官方实现对比

TiCDC 对 Canal-JSON 数据格式的实现,包括 Update 类型事件和 mysqlType 字段,和官方有些许不同。主要差异见下表。

差异点TiCDCCanal
Update 类型事件old 字段包含所有列数据old 字段仅包含被修改的列数据
mysqlType 字段对于含有参数的类型,没有类型参数信息对于含有参数的类型,会包含完整的参数信息

Update 类型事件

对于 Update 类型事件,Canal 官方实现中,old 字段仅包含被修改的列数据,而 TiCDC 的实现则包含所有列数据。

假设在上游 TiDB 按顺序执行如下 SQL 语句:

  1. create table tp_int
  2. (
  3. id int auto_increment,
  4. c_tinyint tinyint null,
  5. c_smallint smallint null,
  6. c_mediumint mediumint null,
  7. c_int int null,
  8. c_bigint bigint null,
  9. constraint pk
  10. primary key (id)
  11. );
  12. insert into tp_int(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
  13. values (127, 32767, 8388607, 2147483647, 9223372036854775807);
  14. update tp_int set c_int = 0, c_tinyint = 0 where c_smallint = 32767;

对于 update 语句,TiCDC 将会输出一条 typeUPDATE 的事件消息,如下所示。该 update 语句仅对 c_intc_tinyint 两列进行了修改。输出事件消息的 old 字段,则包含所有列数据。

  1. {
  2. "id": 0,
  3. ...
  4. "type": "UPDATE",
  5. ...
  6. "sqlType": {
  7. ...
  8. },
  9. "mysqlType": {
  10. ...
  11. },
  12. "data": [
  13. {
  14. "c_bigint": "9223372036854775807",
  15. "c_int": "0",
  16. "c_mediumint": "8388607",
  17. "c_smallint": "32767",
  18. "c_tinyint": "0",
  19. "id": "2"
  20. }
  21. ],
  22. "old": [ // TiCDC 输出事件消息的 `old` 字段,则包含所有列数据。
  23. {
  24. "c_bigint": "9223372036854775807",
  25. "c_int": "2147483647", // 修改的列
  26. "c_mediumint": "8388607",
  27. "c_smallint": "32767",
  28. "c_tinyint": "127", // 修改的列
  29. "id": "2"
  30. }
  31. ]
  32. }

官方 Canal 输出事件消息的 old 字段仅包含被修改的列数据。示例如下。

  1. {
  2. "id": 0,
  3. ...
  4. "type": "UPDATE",
  5. ...
  6. "sqlType": {
  7. ...
  8. },
  9. "mysqlType": {
  10. ...
  11. },
  12. "data": [
  13. {
  14. "c_bigint": "9223372036854775807",
  15. "c_int": "0",
  16. "c_mediumint": "8388607",
  17. "c_smallint": "32767",
  18. "c_tinyint": "0",
  19. "id": "2"
  20. }
  21. ],
  22. "old": [ // Canal 输出事件消息的 `old` 字段,仅包含被修改的列的数据。
  23. {
  24. "c_int": "2147483647", // 修改的列
  25. "c_tinyint": "127", // 修改的列
  26. }
  27. ]
  28. }

mysqlType 字段

对于 mysqlType 字段,Canal 官方实现中,对于含有参数的类型,会包含完整的参数信息,TiCDC 实现则没有类型参数信息。

在下面示例的表定义 SQL 语句中,如 decimal / char / varchar / enum 等类型,都含有参数。对比 TiCDC 和 Canal 官方实现分别生成的 Canal-JSON 格式数据可知,在 mysqlType 字段中的数据,TiCDC 实现只包含基本 MySQL Type。如果业务需要类型参数信息,需要你自行通过其他方式实现。

假设在上游数据库按顺序执行如下 SQL 语句:

  1. create table t (
  2. id int auto_increment,
  3. c_decimal decimal(10, 4) null,
  4. c_char char(16) null,
  5. c_varchar varchar(16) null,
  6. c_binary binary(16) null,
  7. c_varbinary varbinary(16) null,
  8. c_enum enum('a','b','c') null,
  9. c_set set('a','b','c') null,
  10. c_bit bit(64) null,
  11. constraint pk
  12. primary key (id)
  13. );
  14. insert into t (c_decimal, c_char, c_varchar, c_binary, c_varbinary, c_enum, c_set, c_bit)
  15. values (123.456, "abc", "abc", "abc", "abc", 'a', 'a,b', b'1000001');

TiCDC 输出内容如下:

  1. {
  2. "id": 0,
  3. ...
  4. "isDdl": false,
  5. "sqlType": {
  6. ...
  7. },
  8. "mysqlType": {
  9. "c_binary": "binary",
  10. "c_bit": "bit",
  11. "c_char": "char",
  12. "c_decimal": "decimal",
  13. "c_enum": "enum",
  14. "c_set": "set",
  15. "c_varbinary": "varbinary",
  16. "c_varchar": "varchar",
  17. "id": "int"
  18. },
  19. "data": [
  20. {
  21. ...
  22. }
  23. ],
  24. "old": null,
  25. }

Canal 官方实现输出内容如下:

  1. {
  2. "id": 0,
  3. ...
  4. "isDdl": false,
  5. "sqlType": {
  6. ...
  7. },
  8. "mysqlType": {
  9. "c_binary": "binary(16)",
  10. "c_bit": "bit(64)",
  11. "c_char": "char(16)",
  12. "c_decimal": "decimal(10, 4)",
  13. "c_enum": "enum('a','b','c')",
  14. "c_set": "set('a','b','c')",
  15. "c_varbinary": "varbinary(16)",
  16. "c_varchar": "varchar(16)",
  17. "id": "int"
  18. },
  19. "data": [
  20. {
  21. ...
  22. }
  23. ],
  24. "old": null,
  25. }

TiCDC Canal-JSON 改动说明

Delete 类型事件中 Old 字段的变化说明

TiCDC 实现的 Canal-JSON 格式,v5.4.0 及以后版本的实现,和之前的有些许不同,具体如下:

  • Delete 类型事件,Old 字段的内容发生了变化。

如下是一个 DELETE 事件的数据内容,在 v5.4.0 前的实现中,”old” 的内容和 “data” 相同,在 v5.4.0 及之后的实现中,”old” 将被设为 null。你可以通过 “data” 字段获取到被删除的数据。

  1. {
  2. "id": 0,
  3. "database": "test",
  4. ...
  5. "type": "DELETE",
  6. ...
  7. "sqlType": {
  8. ...
  9. },
  10. "mysqlType": {
  11. ...
  12. },
  13. "data": [
  14. {
  15. "c_bigint": "9223372036854775807",
  16. "c_int": "0",
  17. "c_mediumint": "8388607",
  18. "c_smallint": "32767",
  19. "c_tinyint": "0",
  20. "id": "2"
  21. }
  22. ],
  23. "old": null,
  24. // 以下示例是 v5.4.0 之前的实现,`old` 内容等同于 `data` 内容
  25. "old": [
  26. {
  27. "c_bigint": "9223372036854775807",
  28. "c_int": "0",
  29. "c_mediumint": "8388607",
  30. "c_smallint": "32767",
  31. "c_tinyint": "0",
  32. "id": "2"
  33. }
  34. ]
  35. }