同步数据到存储服务

从 TiDB v6.5.0 开始,TiCDC 支持将行变更事件保存至存储服务,如 Amazon S3、GCS、Azure Blob Storage 和 NFS。本文介绍如何使用 TiCDC 创建同步任务 (Changefeed) 将增量数据同步到这类存储服务,并介绍数据的存储方式。具体如下:

同步变更数据至存储服务

使用以下命令来创建同步任务:

  1. cdc cli changefeed create \
  2. --server=http://10.0.10.25:8300 \
  3. --sink-uri="s3://logbucket/storage_test?protocol=canal-json" \
  4. --changefeed-id="simple-replication-task"

输出结果如下:

  1. Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2024-02-29T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":false,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v7.5.1"}
  • --server:TiCDC 集群中任意一个 TiCDC 服务器的地址。
  • --changefeed-id:同步任务的 ID。格式需要符合正则表达式 ^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$。如果不指定该 ID,TiCDC 会自动生成一个 UUID(version 4 格式)作为 ID。
  • --sink-uri:同步任务下游的地址。具体可参考配置 Sink URI
  • --start-ts:指定 changefeed 的开始 TSO。TiCDC 集群将从这个 TSO 开始拉取数据。默认为当前时间。
  • --target-ts:指定 changefeed 的目标 TSO。TiCDC 集群拉取数据直到这个 TSO 停止。默认为空,即 TiCDC 不会自动停止。
  • --config:指定 changefeed 配置文件,详见 TiCDC Changefeed 配置参数

配置 Sink URI

本章节介绍如何在 Sink URI 中配置存储服务 Amazon S3、GCS、Azure Blob Storage 以及 NFS。Sink URI 用于指定 TiCDC 下游系统的连接信息,遵循以下格式:

  1. [scheme]://[host]/[path]?[query_parameters]

URI 的 [query_parameters] 中可配置的参数如下:

参数描述默认值取值范围
worker-count向下游存储服务保存数据变更记录的并发度16[1, 512]
flush-interval向下游存储服务保存数据变更记录的间隔5s[2s, 10m]
file-size单个数据变更文件的字节数超过 file-size 时将其保存至存储服务中67108864[1048576, 536870912]
protocol输出到存储服务的消息协议N/Acanal-jsoncsv
enable-tidb-extensionprotocol 参数为 canal-json 时,如果该值为 true,TiCDC 会发送 WATERMARK 事件,并在 canal-json 消息中添加 TiDB 扩展字段falsefalsetrue

同步数据到存储服务 - 图1

注意

flush-intervalfile-size 二者只要满足其一就会向下游写入数据变更文件。

protocol 是必选配置,如果 TiCDC 在创建 changefeed 时未解析到该配置,将会返回 CDC:ErrSinkUnknownProtocol 错误。

配置外部存储

将数据存储到云服务存储系统时,根据云服务供应商的不同,需要设置不同的鉴权参数。本节介绍使用 Amazon S3、Google Cloud Storage (GCS) 及 Azure Blob Storage 时所用存储服务的鉴权方式以及如何配置访问相应存储服务的账户。

  • Amazon S3
  • GCS
  • Azure Blob Storage

Amazon S3 配置样例如下:

  1. --sink-uri="s3://bucket/prefix?protocol=canal-json"

在同步数据之前,需要为 Amazon S3 中的目录设置相应的访问权限:

  • TiCDC 需要的最小权限是:s3:ListBuckets3:PutObjects3:GetObject
  • 如果 changefeed 的参数 sink.cloud-storage-config.flush-concurrency 大于 1,表示开启了单文件的并行上传,需要额外增加 ListParts 相关权限:
    • s3:AbortMultipartUpload
    • s3:ListMultipartUploadParts
    • s3:ListBucketMultipartUploads

如果你还没有创建同步数据保存目录,可以参考创建存储桶在指定的区域中创建一个 S3 存储桶。如果需要使用文件夹,可以参考使用文件夹在 Amazon S3 控制台中组织对象在存储桶中创建一个文件夹。

可以通过以下两种方式配置访问 Amazon S3 的账户:

  • 方式一:指定访问密钥

    如果指定访问密钥和秘密访问密钥,将按照指定的访问密钥和秘密访问密钥进行鉴权。除了在 URI 中指定密钥外,还支持以下方式:

    • 读取 $AWS_ACCESS_KEY_ID$AWS_SECRET_ACCESS_KEY 环境变量
    • 读取 $AWS_ACCESS_KEY$AWS_SECRET_KEY 环境变量
    • 读取共享凭证文件,路径由 $AWS_SHARED_CREDENTIALS_FILE 环境变量指定
    • 读取共享凭证文件,路径为 ~/.aws/credentials
  • 方式二:基于 IAM Role 进行访问

    为运行 TiCDC Server 的 EC2 实例关联一个配置了访问 S3 访问权限的 IAM role。设置成功后,TiCDC 可以直接访问对应的 S3 中的备份目录,而不需要额外的设置。

GCS 配置样例如下:

  1. --sink-uri="gcs://bucket/prefix?protocol=canal-json"

配置访问 GCS 的账户可以通过指定访问密钥的方式。如果指定了 credentials-file 参数,将按照指定的 credentials-file 进行鉴权。除了在 URI 中指定密钥文件外,还支持以下方式:

  • 读取位于 $GOOGLE_APPLICATION_CREDENTIALS 环境变量所指定路径的文件内容
  • 读取位于 ~/.config/gcloud/application_default_credentials.json 的文件内容
  • 在 GCE 或 GAE 中运行时,从元数据服务器中获取的凭证

Azure Blob Storage 配置样例如下:

  1. --sink-uri="azure://bucket/prefix?protocol=canal-json"

可以通过以下方式配置访问 Azure Blob Storage 的账户:

  • 方式一:指定共享访问签名

    在 URI 中配置 account-namesas-token,则使用该参数指定的存储账户名和共享访问签名令牌。由于共享访问签名令牌中带有 & 的字符,需要将其编码为 %26 后再添加到 URI 中。你也可以直接对整个 sas-token 进行一次百分号编码。

  • 方式二:指定访问密钥

    在 URI 中配置 account-nameaccount-key,则使用该参数指定的存储账户名和密钥。除了在 URI 中指定密钥文件外,还支持读取 $AZURE_STORAGE_KEY 的方式。

  • 方式三:使用 Azure AD 备份恢复

    运行环境配置环境变量 $AZURE_CLIENT_ID$AZURE_TENANT_ID$AZURE_CLIENT_SECRET

同步数据到存储服务 - 图2

小贴士

关于 Amazon S3、GCS 以及 Azure Blob Storage 的 URI 参数的详细参数说明,请参考外部存储服务的 URI 格式

配置 NFS

NFS 配置样例如下:

  1. --sink-uri="file:///my-directory/prefix?protocol=canal-json"

存储路径组织结构

本章节详细介绍数据变更记录、元数据与 DDL 事件的存储路径组织结构。

数据变更记录

数据变更记录将会存储到以下路径:

  1. {scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC{num}.{extension}
  • scheme:存储服务类型。例如:s3gcsazurefile
  • prefix:用户指定的父目录。例如:s3://**bucket/bbb/ccc**
  • schema:表所属的库名。例如:s3://bucket/bbb/ccc/**test**
  • table:表名。例如:s3://bucket/bbb/ccc/test/**table1**
  • table-version-separator:将文件路径按照表的版本进行分隔。例如:s3://bucket/bbb/ccc/test/table1/**9999**
  • partition-separator:将文件路径按照表的分区号进行分隔。例如:s3://bucket/bbb/ccc/test/table1/9999/**20**
  • date-separator:将文件路径按照事务提交的日期进行分隔,默认值为 day,可选值如下:
    • none:不以 date-separator 分隔文件路径。例如:test.table1 版本号为 9999 的所有文件都存到 s3://bucket/bbb/ccc/test/table1/9999 路径下。
    • year:以事务提交的年份分隔文件路径。例如:s3://bucket/bbb/ccc/test/table1/9999/**2022**
    • month:以事务提交的年份和月份分隔文件路径。例如:s3://bucket/bbb/ccc/test/table1/9999/**2022-01**
    • day:以事务提交的年月日来分隔文件路径。例如:s3://bucket/bbb/ccc/test/table1/9999/**2022-01-02**
  • num:存储数据变更记录的目录下文件的序号。例如:s3://bucket/bbb/ccc/test/table1/9999/2022-01-02/CDC**000005**.csv
  • extension:文件的扩展名。v6.5.0 支持 CSV 和 Canal-JSON 格式。

同步数据到存储服务 - 图3

注意

表的版本仅在上游表发生 DDL 操作后才改变:表的版本为该 DDL 在上游 TiDB 执行结束的 TSO。但是,表版本的变化并不意味着表结构的变化。例如,在表中的某一列添加注释,不会导致 schema 文件内容发生变化。

Index 文件

Index 文件用于防止已写入的数据被错误覆盖,与数据变更记录存储在相同路径:

  1. {scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/meta/CDC.index

Index 文件记录了当前目录下所使用到的最大文件名,比如:

  1. CDC000005.csv

上述内容表明该目录下 CDC000001.csvCDC000004.csv 文件已被占用,当 TiCDC 集群中发生表调度或者节点重启时,新的节点会读取 Index 文件,并判断 CDC000005.csv 是否被占用。如果未被占用,则新节点会从 CDC000005.csv 开始写文件。如果已被占用,则从 CDC000006.csv 开始写文件,这样可防止覆盖其他节点写入的数据。

元数据

元数据信息将会存储到以下路径:

  1. {scheme}://{prefix}/metadata

元数据信息以 JSON 格式存储到如下的文件中:

  1. {
  2. "checkpoint-ts":433305438660591626
  3. }
  • checkpoint-ts:commit-ts 小于等于此 checkpoint-ts 的事务都被写入下游存储当中。

DDL 事件

表级 DDL 事件

当上游表的 DDL 事件引起表的版本变更时,TiCDC 将会自动进行以下操作:

  • 切换到新的路径下写入数据变更记录。例如,当 test.table1 的版本变更为 441349361156227074 时,TiCDC 将会在 s3://bucket/bbb/ccc/test/table1/441349361156227074/2022-01-02/ 路径下写入数据。

  • 生成一个 schema 文件存储表结构信息,文件路径如下:

    1. {scheme}://{prefix}/{schema}/{table}/meta/schema_{table-version}_{hash}.json

schema_441349361156227074_3131721815.json 为例,表结构信息文件的内容如下:

  1. {
  2. "Table":"table1",
  3. "Schema":"test",
  4. "Version":1,
  5. "TableVersion":441349361156227074,
  6. "Query":"ALTER TABLE test.table1 ADD OfficeLocation blob(20)",
  7. "Type":5,
  8. "TableColumns":[
  9. {
  10. "ColumnName":"Id",
  11. "ColumnType":"INT",
  12. "ColumnNullable":"false",
  13. "ColumnIsPk":"true"
  14. },
  15. {
  16. "ColumnName":"LastName",
  17. "ColumnType":"CHAR",
  18. "ColumnLength":"20"
  19. },
  20. {
  21. "ColumnName":"FirstName",
  22. "ColumnType":"VARCHAR",
  23. "ColumnLength":"30"
  24. },
  25. {
  26. "ColumnName":"HireDate",
  27. "ColumnType":"DATETIME"
  28. },
  29. {
  30. "ColumnName":"OfficeLocation",
  31. "ColumnType":"BLOB",
  32. "ColumnLength":"20"
  33. }
  34. ],
  35. "TableColumnsTotal":"5"
  36. }
  • Table:表名。
  • Schema:表所属的库名。
  • Version:Storage sink 协议版本号。
  • TableVersion:表的版本号。
  • Query:DDL 语句。
  • Type:DDL 类型。
  • TableColumns:该数组表示表中每一列的详细信息。
    • ColumnName:列名。
    • ColumnType:该列的类型。详见数据类型
    • ColumnLength:该列的长度。详见数据类型
    • ColumnPrecision:该列的精度。详见数据类型
    • ColumnScale:该列小数位的长度。详见数据类型
    • ColumnNullable:值为 true 时表示该列可以含 NULL 值。
    • ColumnIsPk:值为 true 时表示该列是主键的一部分。
  • TableColumnsTotalTableColumns 数组的大小。

库级 DDL 事件

当上游数据库发生库级 DDL 事件时,TiCDC 将会自动生成一个 schema 文件存储数据库结构信息,文件路径如下:

  1. {scheme}://{prefix}/{schema}/meta/schema_{table-version}_{hash}.json

schema_441349361156227000_3131721815.json 为例,数据库结构信息文件的内容如下:

  1. {
  2. "Table": "",
  3. "Schema": "schema1",
  4. "Version": 1,
  5. "TableVersion": 441349361156227000,
  6. "Query": "CREATE DATABASE `schema1`",
  7. "Type": 1,
  8. "TableColumns": null,
  9. "TableColumnsTotal": 0
  10. }

数据类型

本章节主要介绍 schema_{table-version}_{hash}.json 文件(以下简称为 schema 文件)中使用的各种数据类型。数据类型定义为 T(M[, D]),详见数据类型概述

整数类型

TiDB 中整数类型可被定义为 IT[(M)] [UNSIGNED],其中:

  • IT 为整数类型,包括 TINYINTSMALLINTMEDIUMINTINTBIGINTBIT
  • M 为该类型的显示宽度。

schema 文件中对整数类型定义如下:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{IT} [UNSIGNED]",
  4. "ColumnPrecision":"{M}"
  5. }

小数类型

TiDB 中的小数类型可被定义为 DT[(M,D)][UNSIGNED],其中:

  • DT 为小数类型,包括 FLOATDOUBLEDECIMALNUMERIC
  • M 为该类型数据的精度,即整数位加上小数位的总长度。
  • D 为小数位的长度。

schema 文件中对小数类型的定义如下:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{DT} [UNSIGNED]",
  4. "ColumnPrecision":"{M}",
  5. "ColumnScale":"{D}"
  6. }

时间和日期类型

TiDB 中的日期类型可被定义为 DT,其中:

  • DT 为日期类型,包括 DATEYEAR

schema 文件中对日期类型的定义如下:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{DT}"
  4. }

TiDB 中的时间类型可被定义为 TT[(M)],其中:

  • TT 为时间类型,包括 TIMEDATETIMETIMESTAMP
  • M 为秒的精度,取值范围为 0~6。

schema 文件中对时间类型的定义如下:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{TT}",
  4. "ColumnScale":"{M}"
  5. }

字符串类型

TiDB 中的字符串类型可被定义为 ST[(M)],其中:

  • ST 为字符串类型,包括 CHARVARCHARTEXTBINARYBLOBJSON 等。
  • M 表示字符串的最大长度。

schema 文件中对字符串类型的定义如下:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{ST}",
  4. "ColumnLength":"{M}"
  5. }

Enum/Set 类型

schema 文件中对 Enum/Set 类型的定义如下:

  1. {
  2. "ColumnName":"COL1",
  3. "ColumnType":"{ENUM/SET}",
  4. }