Oracle-CDC

概述

Oracle Extract 节点允许从 Oracle 数据库中读取快照数据和增量数据。本文档介绍如何设置 Oracle Extract 节点以对 Oracle 数据库运行 SQL 查询。

支持的版本

Extract 节点版本Driver
Oracle-CDCOracle: 11, 12, 19Oracle Driver: 19.3.0.0

依赖

为了设置 Oracle Extract 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-oracle-cdc</artifactId>
  4. <version>1.3.0-SNAPSHOT</version>
  5. </dependency>

连接 Oracle 数据库还需要 Oracle 驱动程序依赖项。请下载ojdbc8-19.3.0.0.jar 并将其放入 FLINK_HOME/lib/

设置 Oracle

你必须为 Oracle 数据库启用日志归档,并定义一个对 Debezium Oracle 连接器监控的所有数据库具有适当权限的 Oracle 用户。

对于非 CDB 数据库

  • 启用日志归档

    (1.1). 以 DBA 身份连接到数据库

    1. ORACLE_SID=SID
    2. export ORACLE_SID
    3. sqlplus /nolog
    4. CONNECT sys/password AS SYSDBA

    (1.2). 启用日志归档

    1. alter system set db_recovery_file_dest_size = 10G;
    2. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    3. shutdown immediate;
    4. startup mount;
    5. alter database archivelog;
    6. alter database open;

    Note:

    • Enable log archiving requires database restart, pay attention when try to do it
    • The archived logs will occupy a large amount of disk space, so consider clean the expired logs the periodically

    (1.3). 检查是否启用了日志归档

    1. -- Should now "Database log mode: Archive Mode"
    2. archive log list;

    注意:

    必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。 下面说明了如何在表/数据库级别进行配置。

    1. -- 为特定表启用补充日志记录:
    2. ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    1. -- 为数据库启用补充日志记录:
    2. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  • 创建具有权限的 Oracle 用户

    (2.1). 创建表空间

    1. sqlplus sys/password@host:port/SID AS SYSDBA;
    2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    3. exit;

    (2.2). 创建用户并授予权限

    1. sqlplus sys/password@host:port/SID AS SYSDBA;
    2. CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
    3. GRANT CREATE SESSION TO flinkuser;
    4. GRANT SET CONTAINER TO flinkuser;
    5. GRANT SELECT ON V_$DATABASE to flinkuser;
    6. GRANT FLASHBACK ANY TABLE TO flinkuser;
    7. GRANT SELECT ANY TABLE TO flinkuser;
    8. GRANT SELECT_CATALOG_ROLE TO flinkuser;
    9. GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
    10. GRANT SELECT ANY TRANSACTION TO flinkuser;
    11. GRANT LOGMINING TO flinkuser;
    12. GRANT CREATE TABLE TO flinkuser;
    13. GRANT LOCK ANY TABLE TO flinkuser;
    14. GRANT ALTER ANY TABLE TO flinkuser;
    15. GRANT CREATE SEQUENCE TO flinkuser;
    16. GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
    17. GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    18. GRANT SELECT ON V_$LOG TO flinkuser;
    19. GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
    20. GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
    21. GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
    22. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
    23. GRANT SELECT ON V_$LOGFILE TO flinkuser;
    24. GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
    25. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
    26. exit;

对于 CDB 数据库

总的来说,配置 CDB 数据库的步骤与非 CDB 数据库非常相似,但命令可能会有所不同。

  • 启用日志归档

    1. ORACLE_SID=ORCLCDB
    2. export ORACLE_SID
    3. sqlplus /nolog
    4. CONNECT sys/password AS SYSDBA
    5. alter system set db_recovery_file_dest_size = 10G;
    6. -- should exist
    7. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    8. shutdown immediate
    9. startup mount
    10. alter database archivelog;
    11. alter database open;
    12. -- Should show "Database log mode: Archive Mode"
    13. archive log list
    14. exit;

    注意: 您还可以使用以下命令启用补充日志记录:

    1. -- Enable supplemental logging for a specific table:
    2. ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    3. -- Enable supplemental logging for database
    4. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  • 创建具有权限的 Oracle 用户

    1. sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
    2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    3. exit
    1. sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
    2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    3. exit
    1. sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
    2. CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
    3. GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
    4. GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
    5. GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
    6. GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
    7. GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
    8. GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
    9. GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
    10. GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
    11. GRANT LOGMINING TO flinkuser CONTAINER=ALL;
    12. GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
    13. GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
    14. GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
    15. GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
    16. GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
    17. GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
    18. GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
    19. GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
    20. GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
    21. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
    22. GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
    23. GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
    24. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
    25. exit

查看更多关于 设置 Oracle

如何创建 Oracle Extract 节点

SQL API 用法

Oracle Extract 节点可以定义如下:

  1. -- 创建 an Oracle Extract 节点 'products' in Flink SQL
  2. Flink SQL> CREATE TABLE products (
  3. ID INT NOT NULL,
  4. NAME STRING,
  5. DESCRIPTION STRING,
  6. WEIGHT DECIMAL(10, 3),
  7. PRIMARY KEY(id) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'oracle-cdc-inlong',
  10. 'hostname' = 'localhost',
  11. 'port' = '1521',
  12. 'username' = 'flinkuser',
  13. 'password' = 'flinkpw',
  14. 'database-name' = 'XE',
  15. 'schema-name' = 'inlong',
  16. 'table-name' = 'user');
  17. Flink SQL> SELECT * FROM products;

注意:

当使用 CDB + PDB 模型时,您需要在 Flink DDL 中添加一个额外的选项 'debezium.database.pdb.name' = 'xxx' 来指定要连接的 PDB 的名称。

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

Oracle Extact 节点参数

参数是否必须默认值数据类型描述
connectorrequired(none)String指定要使用的连接器,这里应该是 ‘oracle-cdc-inlong’
hostnamerequired(none)StringOracle 数据库服务器的 IP 地址或主机名。
usernamerequired(none)String连接到 Oracle 数据库服务器时要使用的 Oracle 数据库的名称。
passwordrequired(none)String连接到 Oracle 数据库服务器时使用的密码。
database-namerequired(none)String要监视的 Oracle 服务器的数据库名称。
schema-namerequired(none)String要监视的 Oracle 数据库的 Schema 名称。
table-namerequired(none)String要监视的 Oracle 数据库的表名。
portoptional1521IntegerOracle 数据库服务器的整数端口号。
scan.startup.modeoptionalinitialStringOracle CDC 消费者的可选启动模式,有效枚举为”initial” 和”latest-offset”。 请参阅启动阅读位置部分了解更多详细信息。
debezium.*optional(none)String将 Debezium 的属性整合到用于从 Oracle 服务器捕获数据更改的 Debezium Embedded Engine。 例如:‘debezium.snapshot.mode’ = ‘never’。 详细了解 Debezium 的 Oracle 连接器属性
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId=xxgroup&streamId=xxstream&nodeId=xxnode。

局限性

在扫描表的快照期间无法执行 checkpoint

在扫描数据库表的快照时,由于没有可恢复的位置,我们无法执行检查点。为了不执行检查点,Oracle CDC 源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发 Flink 作业的故障转移。所以如果数据库表很大,建议添加以下 Flink 配置,避免因为超时检查点而导致故障转移:

  1. execution.checkpointing.interval: 10min
  2. execution.checkpointing.tolerable-failed-checkpoints: 100
  3. restart-strategy: fixed-delay
  4. restart-strategy.fixed-delay.attempts: 2147483647

可用的元数据字段

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

字段名称数据类型描述
table_nameSTRING NOT NULLName of the table that contain the row.
schema_nameSTRING NOT NULLName of the schema that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

  1. CREATE TABLE products (
  2. db_name STRING METADATA FROM 'database_name' VIRTUAL,
  3. schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
  4. table_name STRING METADATA FROM 'table_name' VIRTUAL,
  5. operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  6. ID INT NOT NULL,
  7. NAME STRING,
  8. DESCRIPTION STRING,
  9. WEIGHT DECIMAL(10, 3),
  10. PRIMARY KEY(id) NOT ENFORCED
  11. ) WITH (
  12. 'connector' = 'oracle-cdc-inlong',
  13. 'hostname' = 'localhost',
  14. 'port' = '1521',
  15. 'username' = 'flinkuser',
  16. 'password' = 'flinkpw',
  17. 'database-name' = 'XE',
  18. 'schema-name' = 'inventory',
  19. 'table-name' = 'products'
  20. );

注意:Oracle 方言是区分大小写的,如果字段名没有被引用,它会将字段名转换为大写,Flink SQL 不会转换字段名。因此对于 oracle 数据库中的物理列,我们在 Flink SQL 中定义 oracle-cdc 表时应该使用其在 Oracle 中转换后的字段名称。

特征

Exactly-Once 处理

Oracle Extract 节点是一个 Flink Source 连接器,它将首先读取数据库快照,然后通过exactly-once 处理继续读取更改事件,即使发生故障。请阅读 连接器的工作原理

启动读取位置

配置选项 scan.startup.mode 指定 Oracle Extract 节点消费者的启动模式。有效的枚举是:

  • initial (默认): 首次启动时对被监控的数据库表进行初始快照,并继续读取最新的 Binlog。
  • latest-offset: 永远不要在第一次启动时对受监控的数据库表执行快照,只需从自连接器启动以来的更改。

注意: scan.startup.mode 选项的机制依赖于 Debezium 的snapshot.mode 配置。所以请不要一起使用它们。如果您在 DDL 表中同时指定了 scan.startup.modedebezium.snapshot.mode 选项,可能会导致 scan.startup.mode 不起作用。

单线程读取

Oracle Extract 节点不能并行读取,因为只有一个任务可以接收更改事件。

数据类型映射

Oracle typeFlink SQL type
NUMBER(p, s <= 0), p - s < 3TINYINT
NUMBER(p, s <= 0), p - s < 5SMALLINT
NUMBER(p, s <= 0), p - s < 10INT
NUMBER(p, s <= 0), p - s < 19BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38STRING
FLOAT
BINARY_FLOAT
FLOAT
DOUBLE PRECISION
BINARY_DOUBLE
DOUBLE
NUMBER(1)BOOLEAN
DATE
TIMESTAMP [(p)]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] WITH TIME ZONETIMESTAMP [(p)] WITH TIME ZONE
TIMESTAMP [(p)] WITH LOCAL TIME ZONETIMESTAMP_LTZ [(p)]
CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
STRING
BLOB
ROWID
BYTES
INTERVAL DAY TO SECOND
INTERVAL YEAR TO MONTH
BIGINT