Debezium Format
Changelog-Data-Capture Format Format: Deserialization Schema
Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图
- 关联维度数据库的变更历史,等等。
注意: 支持解析 Debezium Avro 消息和输出 Debezium 消息已经规划在路线图上了。
依赖
为了设置 Debezium Format,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 SQL JAR 包的 SQL Client 的两个项目的依赖项信息。
Maven 依赖 | SQL Client JAR |
---|---|
flink-json | 内置 |
注意: 请参考 Debezium 文档,了解如何设置 Debezium Kafka Connect 用来将变更日志同步到 Kafka 主题。
如何使用 Debezium Format
Debezium 为变更日志提供了统一的格式,这是一个从 MySQL product 表捕获的更新操作的简单示例:
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
注意: 请参考 Debezium 文档,了解每个字段的含义。
MySQL 产品表有4列(id
、name
、description
、weight
)。上面的 JSON 消息是 products
表上的一条更新事件,其中 id = 111
的行的 weight
值从 5.18
更改为 5.15
。假设此消息已同步到 Kafka 主题 products_binlog
,则可以使用以下 DDL 来使用此主题并解析更改事件。
CREATE TABLE topic_products (
-- schema 与 MySQL 的 products 表完全相同
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- 使用 debezium-json 作为 format
)
在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 'value.converter.schemas.enable'
,用来在消息体中包含 schema 信息。然后,Debezium JSON 消息可能如下所示:
{
"schema": {...},
"payload": {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
}
为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 'debezium-json.schema-include' = 'true'
(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。
在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。
-- MySQL "products" 的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引,供将来查找
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
Format 参数
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 指定要使用的格式,此处应为 ‘debezium-json’ 。 |
debezium-json.schema-include | 可选 | false | Boolean | 设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 ‘value.converter.schemas.enable’ 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。 |
debezium-json.ignore-parse-errors | 可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 。 |
debezium-json.timestamp-format.standard | 可选 | ‘SQL’ | String | 声明输入和输出的时间戳格式。当前支持的格式为‘SQL’ 以及 ‘ISO-8601’ :
|
数据类型映射
目前,Debezium Format 使用 JSON Format 进行反序列化。有关数据类型映射的更多详细信息,请参考 JSON Format 文档。