Iceberg

概览

Apache Iceberg是一种用于大型分析表的高性能格式。

版本

提取节点版本
IcebergIceberg:0.12.x,0.13.x

依赖项

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-iceberg</artifactId>
  4. <version>1.4.0-SNAPSHOT</version>
  5. </dependency>

用法

SQL API 用法

在 flink 中创建Iceberg表,我们推荐使用Flink SQL Client,因为它更便于用户理解概念。

Step.1 在hadoop环境下启动一个独立的flink集群。

  1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
  2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  3. # Start the flink standalone cluster
  4. ./bin/start-cluster.sh

Step.2 启动flink SQL客户端。

flink-runtime在 iceberg 项目中创建了一个单独的模块来生成一个捆绑的 jar,可以直接由 flink SQL 客户端加载。

如果想要flink-runtime手动构建捆绑的 jar,只需构建inlong项目,它将在<inlong-root-dir>/inlong-sort/sort-connectors/iceberg/target

默认情况下,iceberg 包含用于 hadoop 目录的 hadoop jars。如果我们要使用 hive 目录,我们需要在打开 flink sql 客户端时加载 hive jars。幸运的是,apache inlong将 一个捆绑的hive jar打包进入Iceberg。所以我们可以如下打开sql客户端:

  1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
  2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  3. ./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell

Step.3 在当前 Flink 目录中创建表

默认情况下,我们不需要创建目录,只需使用内存目录即可。在目录中如果catalog-database.catalog-table不存在,会自动创建。这里我们只是加载数据。

在 Hive 目录中管理的表

下面的 SQL 会在当前 Flink 目录中创建一个 Flink 表,映射到 iceberg 目录中default_database.iceberg_table管理的 iceberg 表。由于目录类型默认是 hive,所以这里不需要放catalog-type.

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='hive_prod',
  7. 'uri'='thrift://localhost:9083',
  8. 'warehouse'='hdfs://nn:8020/path/to/warehouse'
  9. );

如果要创建 Flink 表映射到 Hive 目录中管理的不同Iceberg表(例如hive_db.hive_iceberg_table在 Hive 中),则可以创建 Flink 表,如下所示:

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='hive_prod',
  7. 'catalog-database'='hive_db',
  8. 'catalog-table'='hive_iceberg_table',
  9. 'uri'='thrift://localhost:9083',
  10. 'warehouse'='hdfs://nn:8020/path/to/warehouse'
  11. );

将记录写入 Flink 表时,如果底层目录数据库(hive_db上例中)不存在,则会自动创建它。

在 hadoop 目录中管理的表

以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到default_database.flink_tablehadoop 目录中管理Iceberg表。

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='hadoop_prod',
  7. 'catalog-type'='hadoop',
  8. 'warehouse'='hdfs://nn:8020/path/to/warehouse'
  9. );

Step.6 向Iceberg表中插入数据

  1. INSERT INTO `flink_table`
  2. SELECT
  3. `id` AS `id`,
  4. `d` AS `name`
  5. FROM `source_table`

在自定义Catalog中管理的表

以下 SQL 将在当前 Flink 目录中创建一个 Flink 表,该表映射到default_database.flink_table自定义目录中管理的Iceberg表。

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='custom_prod',
  7. 'catalog-type'='custom',
  8. 'catalog-impl'='com.my.custom.CatalogImpl',
  9. -- More table properties for the customized catalog
  10. 'my-additional-catalog-config'='my-value',
  11. ...
  12. );

请检查“集成”选项卡下的部分以获取所有自定义目录。

InLong Dashboard 用法

TODO

InLong Manager Client 用法

TODO

特征

多表写入

目前 Iceberg 支持多表同时写入,需要在 FLINK SQL 的建表参数上添加 'sink.multiple.enable' = 'true' 并且目标表的schema 只能定义成 BYTES 或者 STRING ,以下是一个建表语句举例:

  1. CREATE TABLE `table_2`(
  2. `data` STRING)
  3. WITH (
  4. 'connector'='iceberg-inlong',
  5. 'catalog-name'='hive_prod',
  6. 'uri'='thrift://localhost:9083',
  7. 'warehouse'='hdfs://localhost:8020/hive/warehouse',
  8. 'sink.multiple.enable' = 'true',
  9. 'sink.multiple.format' = 'canal-json',
  10. 'sink.multiple.add-column.policy' = 'TRY_IT_BEST',
  11. 'sink.multiple.database-pattern' = '${database}',
  12. 'sink.multiple.table-pattern' = 'test_${table}'
  13. );

要支持多表写入同时需要设置上游数据的序列化格式(通过选项 ‘sink.multiple.format’ 来设置, 目前仅支持 [canal-json|debezium-json])。

动态表名映射

Iceberg 在多表写入的时可以自定义映射的数据库名和表名的规则,可以填充占位符然后添加前后缀来修改映射的目标表名称。 Iceberg Load Node 会解析 'sink.multiple.database-pattern' 作为目的端的 数据库名, 解析 'sink.multiple.table-pattern' 作为目的端的表名,占位符是从数据中解析出来的,变量是严格通过 ‘${VARIABLE_NAME}’ 来表示, 变量的取值来自于数据本身, 即可以是通过 'sink.multiple.format' 指定的某种 Format 的元数据字段, 也可以是数据中的物理字段。 关于 ‘topic-parttern’ 的例子如下:

  • ‘sink.multiple.format’ 为 ‘canal-json’:

上游数据为:

  1. {
  2. "data": [
  3. {
  4. "id": "111",
  5. "name": "scooter",
  6. "description": "Big 2-wheel scooter",
  7. "weight": "5.18"
  8. }
  9. ],
  10. "database": "inventory",
  11. "es": 1589373560000,
  12. "id": 9,
  13. "isDdl": false,
  14. "mysqlType": {
  15. "id": "INTEGER",
  16. "name": "VARCHAR(255)",
  17. "description": "VARCHAR(512)",
  18. "weight": "FLOAT"
  19. },
  20. "old": [
  21. {
  22. "weight": "5.15"
  23. }
  24. ],
  25. "pkNames": [
  26. "id"
  27. ],
  28. "sql": "",
  29. "sqlType": {
  30. "id": 4,
  31. "name": 12,
  32. "description": 12,
  33. "weight": 7
  34. },
  35. "table": "products",
  36. "ts": 1589373560798,
  37. "type": "UPDATE"
  38. }

‘topic-pattern’ 为 ‘{database}_${table}’, 提取后的 Topic 为 ‘inventory_products’ (‘database’, ‘table’ 为元数据字段, ‘id’ 为物理字段)

‘topic-pattern’ 为 ‘{database}${table}${id}’, 提取后的 Topic 为 ‘inventory_products_111’ (‘database’, ‘table’ 为元数据字段, ‘id’ 为物理字段)

动态建库、建表

Iceberg在多表写入时遇到不存在的表和不存在的库时会自动创建数据库和数据表,并且支持在运行过程中新增捕获额外的表入库。 默认的Iceberg表参数为:'format-version' = '2''write.upsert.enabled' = 'true'''engine.hive.enabled' = 'true'

动态schema变更

Iceberg在多表写入时支持同步源表结构变更到目标表(DDL同步),支持的schema变更如下:

schema变更类型是否支持
列增加
列减少
列位置变更
列重命名
列类型变更

Iceberg Load 节点参数

选项是否必须默认值类型描述
connector必需(none)String指定要使用的连接器,这里应该是‘iceberg’
catalog-type必需hiveStringhivehadoop用于内置目录,或为使用 catalog-impl 的自定义目录实现未设置
catalog-name必需(none)String目录名称
catalog-database必需(none)String在Iceberg目录中管理的数据库名称
catalog-table必需(none)String在底层Iceberg目录和数据库中管理的表名
catalog-impl自定义custom 可选(none)String如果未设置,则必须设置完全限定的类名自定义目录实现catalog-type
cache-enabled可选trueBoolean是否启用目录缓存,默认值为true
urihive catalog可选(none)StringHive 元存储的 thrift URI
clientshive catalog可选2IntegerHive Metastore 客户端池大小,默认值为 2
warehousehive catalog或hadoop catalog可选(none)String对于 Hive 目录,是 Hive 仓库位置,如果既不设置hive-conf-dir指定包含hive-site.xml配置文件的位置也不添加正确hive-site.xml的类路径,用户应指定此路径。对于hadoop目录,HDFS目录存放元数据文件和数据文件
hive-conf-dirhive catalog可选(none)Stringhive-site.xml包含将用于提供自定义 Hive 配置值的配置文件的目录的路径。如果同时设置和创建Iceberg目录时,hive.metastore.warehouse.dirfrom <hive-conf-dir>/hive-site.xml(或来自类路径的 hive 配置文件)的值将被该值覆盖。warehousehive-conf-dirwarehouse
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}
sink.multiple.enable可选falseBoolean是否开启多路写入
sink.multiple.schema-update.policy可选TRY_IT_BESTEnum遇到数据中schema和目标表不一致时的处理策略
TRY_IT_BEST:尽力而为,尽可能处理,处理不了的则忽略
IGNORE_WITH_LOG:忽略并且记录日志,后续该表数据不再处理
THROW_WITH_STOP:抛异常并且停止任务,直到用户手动处理schema不一致的情况
sink.multiple.pk-auto-generated可选falseBoolean是否自动生成主键,对于多路写入自动建表时当源表无主键时是否将所有字段当作主键
sink.multiple.typemap-compatible-with-spark可选falseBoolean是否适配spark的类型系统,对于多路写入自动建表时是否需要适配spark的类型系统

数据类型映射

Iceberg数据类型详细信息。这里介绍了加载数据如何将 Iceberg 类型转换为 Flink 类型。

Flink SQL 类型Iceberg 类型
CHARSTRING
VARCHARSTRING
STRINGSTRING
BOOLEANBOOLEAN
BINARYFIXED(L)
VARBINARYBINARY
DECIMALDECIMAL(P,S)
TINYINTINT
SMALLINTINT
INTEGERINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMPTZ
INTERVAL-
ARRAYLIST
MULTISETMAP
MAPMAP
ROWSTRUCT
RAW-