Oracle

概览

Oracle Load 节点支持将数据写入 Oracle 数据库。 本文档介绍如何设置 Oracle Load 节点以对 Oracle 数据库运行 SQL 查询。

支持的版本

Load 节点DriverGroup IdArtifact IdJAR
OracleOraclecom.oracle.database.jdbcojdbc8下载

依赖

为了设置 Oracle Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-jdbc</artifactId>
  4. <version>2.1.0-SNAPSHOT</version>
  5. </dependency>

Oracle license 和 Inlong license 是冲突的。 所以我们移除了pom中的 Oracle 驱动依赖。 如果我们想使用这个连接器,我们可以修改pom文件。

如何创建 Oracle Load 节点

SQL API 用法

  1. -- MySQL Extract 节点
  2. CREATE TABLE `mysql_extract_table`(
  3. PRIMARY KEY (`id`) NOT ENFORCED,
  4. `id` BIGINT,
  5. `name` STRING,
  6. `age` INT
  7. ) WITH (
  8. 'connector' = 'mysql-cdc-inlong',
  9. 'url' = 'jdbc:mysql://localhost:3306/read',
  10. 'username' = 'inlong',
  11. 'password' = 'inlong',
  12. 'table-name' = 'user'
  13. )
  14. -- Oracle Load 节点
  15. CREATE TABLE `oracle_load_table`(
  16. PRIMARY KEY (`id`) NOT ENFORCED,
  17. `id` BIGINT,
  18. `name` STRING,
  19. `age` INT
  20. ) WITH (
  21. 'connector' = 'jdbc-inlong',
  22. 'url' = 'jdbc:oracle:thin://host:port/database/',
  23. 'username' = 'inlong',
  24. 'password' = 'inlong',
  25. 'table-name' = 'public.user'
  26. )
  27. -- 写数据到 Oracle
  28. INSERT INTO oracle_load_table
  29. SELECT id, name , age FROM mysql_extract_table;

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

Oracle Load 节点参数

参数是否必选默认值数据类型描述
connector必选(none)String指定使用什么类型的连接器,这里应该是 ‘jdbc-inlong’。
url必选(none)StringJDBC 数据库 url。
table-name必选(none)String连接到 JDBC 表的名称。
driver可选(none)String用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username可选(none)StringJDBC 用户名。如果指定了 ‘username’ 和 ‘password’ 中的任一参数,则两者必须都被指定。
password可选(none)StringJDBC 密码。
connection.max-retry-timeout可选60sDuration最大重试超时时间,以秒为单位且不应该小于 1 秒。
sink.buffer-flush.max-rows可选100Integerflush 前缓存记录的最大值,可以设置为 ‘0’ 来禁用它。
sink.buffer-flush.interval可选1sDurationflush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 ‘0’ 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 ‘sink.buffer-flush.max-rows’ 设置为 ‘0’ 并配置适当的 flush 时间间隔。
sink.max-retries可选3Integer写入记录到数据库失败后的最大重试次数。
sink.parallelism可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。
sink.ignore.changelog可选falseBoolean忽略所有 RowKind 类型的变更日志,将它们当作 INSERT 的数据来采集。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}

数据类型映射

Oracle 类型Flink SQL 类型
BINARY_FLOATFLOAT
BINARY_DOUBLEDOUBLE
SMALLINT
FLOAT(s)
DOUBLE PRECISION
REAL
NUMBER(p, s)
DECIMAL(p, s)
DATEDATE
DECIMAL(20, 0)
REAL
FLOAT4
FLOAT
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE
TIMESTAMP [(p)]WITHOUT TIMEZONETIMESTAMP [(p)][WITHOUT TIMEZONE]
CHAR(n)
VARCHAR(n)
CLOB(n)
STRING
RAW(s)
BLOB
BYTES
ARRAY