SQLServer-CDC

Overview

The SQLServer Extract Node reads data and incremental data from the SQLServer database. The following will describe how to set up the SQLServer extraction node.

Supported Version

Extract NodeVersion
SQLServer-cdcSQLServer: 2014、2016、2017、2019、2022

Dependencies

Introduce related SQLServer Extract Node dependencies through maven. Of course, you can also use INLONG to provide jar packages.(sort-connector-sqlserver-cdc)

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-sqlserver-cdc</artifactId>
  4. <version>1.4.0-SNAPSHOT</version>
  5. </dependency>

Setup SQLServer Extract Node

SQLServer Extract Node needs to open related libraries and tables, the steps are as follows:

  1. Enable the CDC function for the database.
  1. if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0)
  2. begin
  3. exec sys.sp_cdc_enable_db
  4. end
  1. Check the database CDC capability status.
  1. select is_cdc_enabled from sys.databases where name='dbName'

note: 1 is running CDC of DB.

  1. Turn on CDC for the table
  1. IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0)
  2. BEGIN
  3. EXEC sys.sp_cdc_enable_table
  4. @source_schema = 'dbo', -- source_schema
  5. @source_name = 'tableName', -- table_name
  6. @capture_instance = NULL, -- capture_instance
  7. @supports_net_changes = 1, -- supports_net_changes
  8. @role_name = NULL, -- role_name
  9. @index_name = NULL, -- index_name
  10. @captured_column_list = NULL, -- captured_column_list
  11. @filegroup_name = 'PRIMARY' -- filegroup_name
  12. END

note: The table must have a primary key or unique index.

  1. Check the table CDC capability status.
  1. SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'

note: 1 is running CDC of table.

How to create a SQLServer Extract Node

Usage for SQL API

The example below shows how to create a SQLServer Extract Node with Flink SQL Cli :

  1. -- Set checkpoint every 3000 milliseconds
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. -- Create a SQLServer table 'sqlserver_extract_node' in Flink SQL Cli
  4. Flink SQL> CREATE TABLE sqlserver_extract_node (
  5. order_id INT,
  6. order_date TIMESTAMP(0),
  7. customer_name STRING,
  8. price DECIMAL(10, 5),
  9. product_id INT,
  10. order_status BOOLEAN,
  11. PRIMARY KEY(order_id) NOT ENFORCED
  12. ) WITH (
  13. 'connector' = 'sqlserver-cdc-inlong',
  14. 'hostname' = 'YourHostname',
  15. 'port' = 'port', --default:1433
  16. 'username' = 'YourUsername',
  17. 'password' = 'YourPassword',
  18. 'database-name' = 'YourDatabaseName',
  19. 'schema-name' = 'YourSchemaName' -- default:dbo
  20. 'table-name' = 'YourTableName');
  21. -- Read snapshot and binlog from sqlserver_extract_node
  22. Flink SQL> SELECT * FROM sqlserver_extract_node;

Usage for InLong Dashboard

TODO

Usage for InLong Manager Client

TODO

SQLServer Extract Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be ‘sqlserver-cdc-inlong’.
hostnamerequired(none)StringIP address or hostname of the SQLServer database.
usernamerequired(none)StringUsername to use when connecting to the SQLServer database.
passwordrequired(none)StringPassword to use when connecting to the SQLServer database.
database-namerequired(none)StringDatabase name of the SQLServer database to monitor.
schema-namerequireddboStringSchema name of the SQLServer database to monitor.
table-namerequired(none)StringTable name of the SQLServer database to monitor.
portoptional1433IntegerInteger port number of the SQLServer database.
server-time-zoneoptionalUTCStringThe session time zone in database server, e.g. “Asia/Shanghai”.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId].

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.

KeyDataTypeDescription
meta.table_nameSTRING NOT NULLName of the table that contain the row.
meta.schema_nameSTRING NOT NULLName of the schema that contain the row.
meta.database_nameSTRING NOT NULLName of the database that contain the row.
meta.op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the binlog, the value is always 0.

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

  1. CREATE TABLE sqlserver_extract_node (
  2. table_name STRING METADATA FROM 'table_name' VIRTUAL,
  3. schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
  4. db_name STRING METADATA FROM 'database_name' VIRTUAL,
  5. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  6. id INT NOT NULL
  7. ) WITH (
  8. 'connector' = 'sqlserver-cdc-inlong',
  9. 'hostname' = 'localhost',
  10. 'port' = '1433',
  11. 'username' = 'sa',
  12. 'password' = 'password',
  13. 'database-name' = 'test',
  14. 'schema-name' = 'dbo',
  15. 'table-name' = 'worker'
  16. );

Data Type Mapping

SQLServer typeFlink SQL type
char(n)CHAR(n)
varchar(n)
nvarchar(n)
nchar(n)
VARCHAR(n)
text
ntext
xml
STRING
decimal(p, s)
money
smallmoney
DECIMAL(p, s)
numericNUMERIC
REAL
FLOAT
FLOAT
bitBOOLEAN
intINT
tinyintTINYINT
smallintSMALLINT
time (n)TIME (n)
bigintBIGINT
dateDATE
datetime2
datetime
smalldatetime
TIMESTAMP(n)
datetimeoffsetTIMESTAMP_LTZ(3)