Canal Format
Changelog-Data-Capture Format Format: Deserialization Schema
Canal 是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf 序列化消息(Canal 默认使用 protobuf)。
Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图
- 关联维度数据库的变更历史,等等。
注意:未来会支持 Canal protobuf 类型消息的解析以及输出 Canal 格式的消息。
依赖
为了设置 Canal 格式,下表提供了使用自动化构建工具(例如:Maven 或 SBT)项目和使用绑定 SQL JAR 包的 SQL Client 所需的依赖信息。
Maven 依赖 | SQL Client JAR |
---|---|
flink-json | 内置 |
注意:有关如何部署 Canal 以将变更日志同步到消息队列,请参阅 Canal 文档。
如何使用 Canal Format
Canal 为变更日志提供了统一的格式,下面是一个从 MySQL 库 products
表中捕获更新操作的简单示例:
{
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}
注意:有关各个字段的含义,请参阅 Canal 文档
MySQL products
表有4列(id
,name
,description
和 weight
)。上面的 JSON 消息是 products
表上的一个更新事件,表示 id = 111
的行数据上 weight
字段值从5.15
变更成为 5.18
。假设消息已经同步到了一个 Kafka 主题:products_binlog
,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。
CREATE TABLE topic_products (
-- 元数据与 MySQL "products" 表完全相同
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- 使用 canal-json 格式
)
将 Kafka 主题注册成 Flink 表之后,就可以将 Canal 消息用作变更日志源。
-- 关于MySQL "products" 表的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引以供将来搜索
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
Format 参数
选项 | 要求 | 默认 | 类型 | 描述 |
---|---|---|---|---|
format | 必填 | (none) | String | 指定要使用的格式,此处应为 ‘canal-json’ . |
canal-json.ignore-parse-errors | 选填 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 。 |
canal-json.timestamp-format.standard | 选填 | ‘SQL’ | String | 指定输入和输出时间戳格式。 当前支持的值是 ‘SQL’ 和 ‘ISO-8601’ :
|
数据类型映射
目前,Canal Format 使用 JSON Format 进行反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档。