Canal Format

Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema

Canal 是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf 序列化消息(Canal 默认使用 protobuf)。

Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。

注意:未来会支持 Canal protobuf 类型消息的解析以及输出 Canal 格式的消息。

依赖

In order to use the Canal format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-json</artifactId>
  4. <version>1.20.0</version>
  5. </dependency>
Copied to clipboard!
Built-in

注意:有关如何部署 Canal 以将变更日志同步到消息队列,请参阅 Canal 文档

如何使用 Canal Format

Canal 为变更日志提供了统一的格式,下面是一个从 MySQL 库 products 表中捕获更新操作的简单示例:

  1. {
  2. "data": [
  3. {
  4. "id": "111",
  5. "name": "scooter",
  6. "description": "Big 2-wheel scooter",
  7. "weight": "5.18"
  8. }
  9. ],
  10. "database": "inventory",
  11. "es": 1589373560000,
  12. "id": 9,
  13. "isDdl": false,
  14. "mysqlType": {
  15. "id": "INTEGER",
  16. "name": "VARCHAR(255)",
  17. "description": "VARCHAR(512)",
  18. "weight": "FLOAT"
  19. },
  20. "old": [
  21. {
  22. "weight": "5.15"
  23. }
  24. ],
  25. "pkNames": [
  26. "id"
  27. ],
  28. "sql": "",
  29. "sqlType": {
  30. "id": 4,
  31. "name": 12,
  32. "description": 12,
  33. "weight": 7
  34. },
  35. "table": "products",
  36. "ts": 1589373560798,
  37. "type": "UPDATE"
  38. }

注意:有关各个字段的含义,请参阅 Canal 文档

MySQL products 表有4列(idnamedescriptionweight)。上面的 JSON 消息是 products 表上的一个更新事件,表示 id = 111 的行数据上 weight 字段值从5.15变更成为 5.18。假设消息已经同步到了一个 Kafka 主题:products_binlog,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。

  1. CREATE TABLE topic_products (
  2. -- 元数据与 MySQL "products" 表完全相同
  3. id BIGINT,
  4. name STRING,
  5. description STRING,
  6. weight DECIMAL(10, 2)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'products_binlog',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'canal-json' -- 使用 canal-json 格式
  13. )

将 Kafka 主题注册成 Flink 表之后,就可以将 Canal 消息用作变更日志源。

  1. -- 关于MySQL "products" 表的实时物化视图
  2. -- 计算相同产品的最新平均重量
  3. SELECT name, AVG(weight) FROM topic_products GROUP BY name;
  4. -- MySQL "products" 表的所有数据和增量更改同步到
  5. -- Elasticsearch "products" 索引以供将来搜索
  6. INSERT INTO elasticsearch_products
  7. SELECT * FROM topic_products;

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

Attention Format metadata fields are only available if the corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose metadata fields for its value format.

KeyData TypeDescription
databaseSTRING NULLThe originating database. Corresponds to the database field in the Canal record if available.
tableSTRING NULLThe originating database table. Corresponds to the table field in the Canal record if available.
sql-typeMAP<STRING, INT> NULLMap of various sql types. Corresponds to the sqlType field in the Canal record if available.
pk-namesARRAY<STRING> NULLArray of primary key names. Corresponds to the pkNames field in the Canal record if available.
ingestion-timestampTIMESTAMP_LTZ(3) NULLThe timestamp at which the connector processed the event. Corresponds to the ts field in the Canal record.

The following example shows how to access Canal metadata fields in Kafka:

  1. CREATE TABLE KafkaTable (
  2. origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  3. origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  4. origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
  5. origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
  6. origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  7. user_id BIGINT,
  8. item_id BIGINT,
  9. behavior STRING
  10. ) WITH (
  11. 'connector' = 'kafka',
  12. 'topic' = 'user_behavior',
  13. 'properties.bootstrap.servers' = 'localhost:9092',
  14. 'properties.group.id' = 'testGroup',
  15. 'scan.startup.mode' = 'earliest-offset',
  16. 'value.format' = 'canal-json'
  17. );

Format 参数

选项要求默认类型描述
format
必填(none)String指定要使用的格式,此处应为 ‘canal-json’.
canal-json.ignore-parse-errors
选填falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
canal-json.timestamp-format.standard
选填‘SQL’String指定输入和输出时间戳格式。当前支持的值是 ‘SQL’‘ISO-8601’:
  • 选项 ‘SQL’ 将解析 “yyyy-MM-dd HH:mm:ss.s{precision}” 格式的输入时间戳,例如 ‘2020-12-30 12:13:14.123’,并以相同格式输出时间戳。
  • 选项 ‘ISO-8601’ 将解析 “yyyy-MM-ddTHH:mm:ss.s{precision}” 格式的输入时间戳,例如 ‘2020-12-30T12:13:14.123’,并以相同的格式输出时间戳。
canal-json.map-null-key.mode
选填‘FAIL’String指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’, ‘DROP’‘LITERAL’:
  • Option ‘FAIL’ 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option ‘DROP’ 将丢弃 Map 中 key 值为空的数据项。
  • Option ‘LITERAL’ 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ‘canal-json.map-null-key.literal’ 定义。
canal-json.map-null-key.literal
选填‘null’String‘canal-json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
canal-json.encode.decimal-as-plain-number
选填falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027
canal-json.database.include
optional(none)String一个可选的正则表达式,通过正则匹配 Canal 记录中的 “database” 元字段,仅读取指定数据库的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。
canal-json.table.include
optional(none)String一个可选的正则表达式,通过正则匹配 Canal 记录中的 “table” 元字段,仅读取指定表的 changelog 记录。正则字符串与 Java 的 Pattern 兼容。

注意事项

重复的变更事件

在正常的操作环境下,Canal 应用能以 exactly-once 的语义投递每条变更事件。在这种情况下,Flink 消费 Canal 产生的变更事件能够工作得很好。 然而,当有故障发生时,Canal 应用只能保证 at-least-once 的投递语义。 这也意味着,在非正常情况下,Canal 可能会投递重复的变更事件到消息队列中,当 Flink 从消息队列中消费的时候就会得到重复的事件。 这可能会导致 Flink query 的运行得到错误的结果或者非预期的异常。因此,建议在这种情况下,建议在这种情况下,将作业参数 table.exec.source.cdc-events-duplicate 设置成 true,并在该 source 上定义 PRIMARY KEY。 框架会生成一个额外的有状态算子,使用该 primary key 来对变更事件去重并生成一个规范化的 changelog 流。

数据类型映射

目前,Canal Format 使用 JSON Format 进行序列化和反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档