Iceberg

Overview

Apache Iceberg is a high-performance format for huge analytic tables.

Version

Extract NodeVersion
IcebergIceberg: 0.12.x, 0.13.x

Dependencies

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-iceberg</artifactId>
  4. <version>1.2.0-incubating</version>
  5. </dependency>

Usage

Usage for SQL API

To create iceberg table in flink, we recommend to use Flink SQL Client because it’s easier for users to understand the concepts.

Step.1 Start a standalone flink cluster within hadoop environment.

  1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
  2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  3. # Start the flink standalone cluster
  4. ./bin/start-cluster.sh

Step.2 Start the Flink SQL client.

We’ve created a separate flink-runtime module in iceberg project to generate a bundled jar, which could be loaded by flink SQL client directly.

If we want to build the flink-runtime bundled jar manually, please just build the inlong project and it will generate the jar under <inlong-root-dir>/inlong-sort/sort-connectors/iceberg/target.

By default, iceberg has included hadoop jars for hadoop catalog. If we want to use hive catalog, we will need to load the hive jars when opening the flink sql client. Fortunately, inlong auto package a bundled hive jar into iceberg. So we could open the sql client as the following:

  1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
  2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  3. ./bin/sql-client.sh embedded -j <flink-runtime-directory>/sort-connector-iceberg-{inlong-version}.jar

Step.3 create a table in current Flink catalog

By default,we do not need to create a catalog ,just use memory catalog. In catalog if catalog-database.catalog-table doesn’t exist, it will be created automatic.Here we just load data into it.

Table managed in Hive catalog

The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg table default_database.iceberg_table managed in iceberg catalog.Because catalog type default is hive,so here do not need to put catalog-type.

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='hive_prod',
  7. 'uri'='thrift://localhost:9083',
  8. 'warehouse'='hdfs://nn:8020/path/to/warehouse'
  9. );

If you want to create a Flink table mapping to a different iceberg table managed in Hive catalog (such as hive_db.hive_iceberg_table in Hive), then you can create Flink table as following:

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='hive_prod',
  7. 'catalog-database'='hive_db',
  8. 'catalog-table'='hive_iceberg_table',
  9. 'uri'='thrift://localhost:9083',
  10. 'warehouse'='hdfs://nn:8020/path/to/warehouse'
  11. );

The underlying catalog database (hive_db in the above example) will be created automatically if it does not exist when writing records into the Flink table.

Table managed in hadoop catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table default_database.flink_table managed in hadoop catalog.

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='hadoop_prod',
  7. 'catalog-type'='hadoop',
  8. 'warehouse'='hdfs://nn:8020/path/to/warehouse'
  9. );

Table managed in custom catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table default_database.flink_table managed in custom catalog.

  1. CREATE TABLE flink_table (
  2. id BIGINT,
  3. data STRING
  4. ) WITH (
  5. 'connector'='iceberg',
  6. 'catalog-name'='custom_prod',
  7. 'catalog-type'='custom',
  8. 'catalog-impl'='com.my.custom.CatalogImpl',
  9. -- More table properties for the customized catalog
  10. 'my-additional-catalog-config'='my-value',
  11. ...
  12. );

Please check sections under the Integrations tab for all custom catalogs.

Step.4 insert data into iceberg table

  1. INSERT INTO `flink_table`
  2. SELECT
  3. `id` AS `id`,
  4. `d` AS `name`
  5. FROM `source_table`

Usage for InLong Dashboard

TODO

Usage for InLong Manager Client

TODO

Iceberg Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be ‘iceberg’.
catalog-typerequiredhiveStringhive or hadoop for built-in catalogs, or left unset for custom catalog implementations using catalog-impl.
catalog-namerequired(none)StringCatalog name.
catalog-databaserequired(none)StringDatabase name managed in the iceberg catalog.
catalog-tablerequired(none)StringTable name managed in the underlying iceberg catalog and database.
catalog-imploptional for custom catalog(none)StringThe fully-qualified class name custom catalog implementation, must be set if catalog-type is unset.
cache-enabledoptionaltrueBooleanWhether to enable catalog cache, default value is true
urirequired for hive catalog(none)StringThe Hive metastore’s thrift URI.
clientsoptional for hive catalog2IntegerThe Hive metastore client pool size, default value is 2.
warehouseoptional for hadoop catalog or hive catalog(none)StringFor Hive catalog,is the Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath. For hadoop catalog,The HDFS directory to store metadata files and data files.
hive-conf-diroptional for hive catalog(none)StringPath to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwrote with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog.

Data Type Mapping

Iceberg data type detail. Here is iceberg type convert to flink type when load data.

Flink SQL TypeIceberg Type
CHARSTRING
VARCHARSTRING
STRINGSTRING
BOOLEANBOOLEAN
BINARYFIXED(L)
VARBINARYBINARY
DECIMALDECIMAL(P,S)
TINYINTINT
SMALLINTINT
INTEGERINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMPTZ
INTERVAL-
ARRAYLIST
MULTISETMAP
MAPMAP
ROWSTRUCT
RAW-