SQLServer-CDC

概述

SQLServer 提取节点从 SQLServer 数据库中读取数据和增量数据。下面将介绍如何配置 SQLServer 抽取节点。

支持的版本

Extract NodeVersion
SQLServer-cdcSQLServer: 2014、2016、2017、2019、2022

依赖配置

通过 Maven 引入 sort-connector-sqlserver-cdc 构建自己的项目。 当然,你也可以直接使用 INLONG 提供的 jar 包。(sort-connector-sqlserver-cdc)

Maven依赖配置

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-sqlserver-cdc</artifactId>
  4. <version>1.2.0-incubating</version>
  5. </dependency>

配置 SQLServer 加载节点

SQLServer 加载节点需要开启库和表的 CDC 功能,配置步骤如下:

  1. 开启数据库 CDC 能力。
  1. if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0)
  2. begin
  3. exec sys.sp_cdc_enable_db
  4. end
  1. 检查数据库 CDC 是否开启。
  1. select is_cdc_enabled from sys.databases where name='dbName'

备注: “1”表示数据库 CDC 开启

  1. 开启表的 CDC 能力。
  1. IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0)
  2. BEGIN
  3. EXEC sys.sp_cdc_enable_table
  4. @source_schema = 'dbo', -- source_schema
  5. @source_name = 'tableName', -- table_name
  6. @capture_instance = NULL, -- capture_instance
  7. @supports_net_changes = 1, -- supports_net_changes
  8. @role_name = NULL, -- role_name
  9. @index_name = NULL, -- index_name
  10. @captured_column_list = NULL, -- captured_column_list
  11. @filegroup_name = 'PRIMARY' -- filegroup_name
  12. END

备注: 表必须有主键或者唯一索引。

  1. 检查表 CDC 是否开启。
  1. SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'

备注: “1”表示表 CDC 开启

如何创建一个 SQLServer 抽取节点

SQL API 的使用

使用 Flink SQL Cli :

  1. -- Set checkpoint every 3000 milliseconds
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. -- Create a SqlServer table 'sqlserver_extract_node' in Flink SQL Cli
  4. Flink SQL> CREATE TABLE sqlserver_extract_node (
  5. order_id INT,
  6. order_date TIMESTAMP(0),
  7. customer_name STRING,
  8. price DECIMAL(10, 5),
  9. product_id INT,
  10. order_status BOOLEAN,
  11. PRIMARY KEY(order_id) NOT ENFORCED
  12. ) WITH (
  13. 'connector' = 'sqlserver-cdc',
  14. 'hostname' = 'YourHostname',
  15. 'port' = 'port', --default:1433
  16. 'username' = 'YourUsername',
  17. 'password' = 'YourPassword',
  18. 'database-name' = 'YourDatabaseName',
  19. 'schema-name' = 'YourSchemaName' -- default:dbo
  20. 'table-name' = 'YourTableName');
  21. -- Read snapshot and binlog from sqlserver_extract_node
  22. Flink SQL> SELECT * FROM sqlserver_extract_node;

InLong Dashboard 方式

TODO

InLong Manager Client 方式

TODO

SQLServer 抽取节点参数信息

参数是否必须默认值数据类型描述
connector必须(none)String指定使用什么连接器,这里应该是 ‘sqlserver-cdc’。
hostname必须(none)StringSQLServer 数据库 IP 地址或者 hostname。
username必须(none)StringSQLServer 数据库用户名。
password必须(none)StringSQLServer 数据库用户密码。
database-name必须(none)StringSQLServer 数据库监控的数据库名称。
schema-name必须dboStringSQLServer 数据库监控的 schema 名称。
table-name必须(none)StringSQLServer 数据库监控的表名称。
port可选1433IntegerSQLServer 数据库端口。
server-time-zone可选UTCStringSQLServer 数据库连接配置时区。 例如: “Asia/Shanghai”。

可用的元数据字段

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

字段名称数据类型描述
meta.table_nameSTRING NOT NULL包含该行的表的名称。
meta.schema_nameSTRING NOT NULL包含该行 schema 的名称。
meta.database_nameSTRING NOT NULL包含该行数据库的名称。
meta.op_tsTIMESTAMP_LTZ(3) NOT NULL它表示在数据库中进行更改的时间。如果记录是从表的快照而不是 binlog 中读取的,则该值始终为 0。

使用元数据字段的例子:

  1. CREATE TABLE sqlserver_extract_node (
  2. table_name STRING METADATA FROM 'table_name' VIRTUAL,
  3. schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
  4. db_name STRING METADATA FROM 'database_name' VIRTUAL,
  5. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  6. id INT NOT NULL
  7. ) WITH (
  8. 'connector' = 'sqlserver-cdc',
  9. 'hostname' = 'localhost',
  10. 'port' = '1433',
  11. 'username' = 'sa',
  12. 'password' = 'password',
  13. 'database-name' = 'test',
  14. 'schema-name' = 'dbo',
  15. 'table-name' = 'worker'
  16. );

数据类型映射

SQLServer typeFlink SQL type
char(n)CHAR(n)
varchar(n)
nvarchar(n)
nchar(n)
VARCHAR(n)
text
ntext
xml
STRING
decimal(p, s)
money
smallmoney
DECIMAL(p, s)
numericNUMERIC
REAL
FLOAT
FLOAT
bitBOOLEAN
intINT
tinyintTINYINT
smallintSMALLINT
time (n)TIME (n)
bigintBIGINT
dateDATE
datetime2
datetime
smalldatetime
TIMESTAMP(n)
datetimeoffsetTIMESTAMP_LTZ(3)