Iceberg

Overview

Apache Iceberg is a high-performance format for huge analytic tables.

Version

Extract NodeVersion
IcebergIceberg: 0.12.x, 0.13.x

Dependencies

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-iceberg</artifactId>
  4. <version>1.13.0-SNAPSHOT</version>
  5. </dependency>

Usage

Before creating the Iceberg task, we need a Flink environment integrated with Hadoop.

  • Download Apache Hadoop.
  • Modify jobmanager.sh and taskmanager.sh and add Hadoop environment variables. For commands, please refer to Apache Flink.
  1. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  • Modify docker-compose.yml in the docker/docker-compose and mount Hadoop and Flink startup commands into the container:
  1. jobmanager:
  2. image: apache/flink:1.13-scala_2.11
  3. container_name: jobmanager
  4. user: root
  5. environment:
  6. - |
  7. FLINK_PROPERTIES=
  8. jobmanager.rpc.address: jobmanager
  9. volumes:
  10. # Mount Hadoop
  11. - HADOOP_HOME:HADOOP_HOME
  12. # Mount the modified jobmanager.sh which adds the HADOOP_HOME env correctly
  13. - /jobmanager.sh:/opt/flink/bin/jobmanager.sh
  14. ports:
  15. - "8081:8081"
  16. command: jobmanager
  17. taskmanager:
  18. image: apache/flink:1.13-scala_2.11
  19. container_name: taskmanager
  20. environment:
  21. - |
  22. FLINK_PROPERTIES=
  23. jobmanager.rpc.address: jobmanager
  24. taskmanager.numberOfTaskSlots: 2
  25. volumes:
  26. # Mount Hadoop
  27. - HADOOP_HOME:HADOOP_HOME
  28. # Mount the modified taskmanager.sh which adds the HADOOP_HOME env correctly
  29. - /taskmanager.sh:/opt/flink/bin/taskmanager.sh
  30. command: taskmanager

Before using Flink sql client, sql-client.sh also needs to add Hadoop environment variables and mounted to the container. For commands, please refer to Apache Flink.

  1. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

使用 Flink sql cli

  1. CREATE TABLE `iceberg_table_source`(
  2. PRIMARY KEY (`_id`) NOT ENFORCED,
  3. `_id` STRING,
  4. `id` INT,
  5. `name` STRING,
  6. `age` INT)
  7. WITH (
  8. 'connector' = 'iceberg-inlong',
  9. 'catalog-database' = 'DATABASES',
  10. 'catalog-table' = 'TABLE',
  11. 'catalog-type' = 'HIVE',
  12. 'catalog-name' = 'HIVE',
  13. 'streaming' = 'true',
  14. 'uri' = 'thrift://127.0.0.1:9083'
  15. );

Dashboard

Source → Create → Iceberg

img.png

Manager Client

TODO

Options

OptionsRequiredTypeDescription
connectorrequiredStringSpecify what connector to use, here should be ‘iceberg-inlong’
catalog-databaserequiredStringDatabase name managed in the Iceberg directory
catalog-tablerequiredStringTable name managed in Iceberg catalogs and databases
catalog-typerequiredStringhive or hadoop for built-in directories
catalog-namerequiredStringdirectory name
urirequiredStringThe thrift URI of Hive metastore, such as: thrift://127.0.0.1:9083
warehouseoptionalStringFor a Hive directory, the Hive repository location. For the hadoop directory, it is the HDFS directory that stores metadata files and data files.
inlong.metric.labelsoptionalStringIn long metric label, the format of value is groupId=xxgroup&streamId=xxstream&nodeId=xxnode

Data Type Mapping

Flink SQL TypeIceberg Type
CHARSTRING
VARCHARSTRING
STRINGSTRING
BOOLEANBOOLEAN
BINARYFIXED(L)
VARBINARYBINARY
DECIMALDECIMAL(P,S)
TINYINTINT
SMALLINTINT
INTEGERINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMPTZ
INTERVAL-
ARRAYLIST
MULTISETMAP
MAPMAP
ROWSTRUCT
RAW-