Iceberg Connector

Overview

The Iceberg connector allows querying data stored in Iceberg tables.

Metastores

Iceberg tables store most of the metadata in the metadata files, along with the data on the filesystem, but it still requires a central place to find the current location of the current metadata pointer for a table. This central place is called the Iceberg Catalog. The Presto Iceberg connector supports different types of Iceberg Catalogs : Hive Metastore, GLUE, NESSIE, and HADOOP.

To configure the Iceberg connector, create a catalog properties file etc/catalog/iceberg.properties. To define the catalog type, iceberg.catalog.type property is required along with the following contents, with the property values replaced as follows:

Hive Metastore catalog

The Iceberg connector supports the same configuration for HMS as a Hive connector.

  1. connector.name=iceberg
  2. hive.metastore.uri=hostname:port
  3. iceberg.catalog.type=hive

Glue catalog

The Iceberg connector supports the same configuration for Glue as a Hive connector.

  1. connector.name=iceberg
  2. hive.metastore=glue
  3. iceberg.catalog.type=hive

Nessie catalog

To use a Nessie catalog, configure the catalog type as iceberg.catalog.type=nessie

  1. connector.name=iceberg
  2. iceberg.catalog.type=nessie
  3. iceberg.catalog.warehouse=/tmp
  4. iceberg.nessie.uri=https://localhost:19120/api/v1

Additional supported properties for the Nessie catalog:

Property Name

Description

iceberg.nessie.ref

The branch/tag to use for Nessie, defaults to main.

iceberg.nessie.uri

Nessie API endpoint URI (required). Example: https://localhost:19120/api/v1

iceberg.nessie.auth.type

The authentication type to use. Available values are BASIC or BEARER. Example: BEARER

iceberg.nessie.auth.basic.username

The username to use with BASIC authentication. Example: test_user

iceberg.nessie.auth.basic.password

The password to use with BASIC authentication. Example: my$ecretPass

iceberg.nessie.auth.bearer.token

The token to use with BEARER authentication. Example: SXVLUXUhIExFQ0tFUiEK

iceberg.nessie.read-timeout-ms

The read timeout in milliseconds for requests to the Nessie server. Example: 5000

iceberg.nessie.connect-timeout-ms

The connection timeout in milliseconds for the connection requests to the Nessie server. Example: 10000

iceberg.nessie.compression-enabled

Configuration of whether compression should be enabled or not for requests to the Nessie server, defaults to true.

iceberg.nessie.client-builder-impl

Configuration of the custom ClientBuilder implementation class to be used.

Setting Up Nessie With Docker

To set up a Nessie instance locally using the Docker image, see Setting up Nessie. Once the Docker instance is up and running, you should see logs similar to the following example:

  1. 2023-09-05 13:11:37,905 INFO [io.quarkus] (main) nessie-quarkus 0.69.0 on JVM (powered by Quarkus 3.2.4.Final) started in 1.921s. Listening on: http://0.0.0.0:19120
  2. 2023-09-05 13:11:37,906 INFO [io.quarkus] (main) Profile prod activated.
  3. 2023-09-05 13:11:37,906 INFO [io.quarkus] (main) Installed features: [agroal, amazon-dynamodb, cassandra-client, cdi, google-cloud-bigtable, hibernate-validator, jdbc-postgresql, logging-sentry, micrometer, mongodb-client, narayana-jta, oidc, opentelemetry, reactive-routes, resteasy, resteasy-jackson, security, security-properties-file, smallrye-context-propagation, smallrye-health, smallrye-openapi, swagger-ui, vertx]

If log messages related to Nessie’s OpenTelemetry collector appear similar to the following example, you can disable OpenTelemetry using the configuration option quarkus.otel.sdk.disabled=true.

  1. 2023-08-27 11:10:02,492 INFO [io.qua.htt.access-log] (executor-thread-1) 172.17.0.1 - - [27/Aug/2023:11:10:02 +0000] "GET /api/v1/config HTTP/1.1" 200 62
  2. 2023-08-27 11:10:05,007 SEVERE [io.ope.exp.int.grp.OkHttpGrpcExporter] (OkHttp http://localhost:4317/...) Failed to export spans. The request could not be executed. Full error message: Failed to connect to localhost/127.0.0.1:4317

For example, start the Docker image using the following command: docker run -p 19120:19120 -e QUARKUS_OTEL_SDK_DISABLED=true ghcr.io/projectnessie/nessie

For more information about this configuration option and other related options, see the OpenTelemetry Configuration Reference.

For more information about troubleshooting OpenTelemetry traces, see Troubleshooting traces.

If an error similar to the following example is displayed, this is probably because you are interacting with an http server, and not https server. You need to set iceberg.nessie.uri to http://localhost:19120/api/v1.

  1. Caused by: javax.net.ssl.SSLException: Unsupported or unrecognized SSL message
  2. at sun.security.ssl.SSLSocketInputRecord.handleUnknownRecord(SSLSocketInputRecord.java:448)
  3. at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:174)
  4. at sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
  5. at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1320)
  6. at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1233)
  7. at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:417)
  8. at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:389)
  9. at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:558)
  10. at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:201)
  11. at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
  12. at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.executeRequest(UrlConnectionRequest.java:71)
  13. ... 42 more

Hadoop catalog

Iceberg connector supports Hadoop catalog

  1. connector.name=iceberg
  2. iceberg.catalog.type=hadoop
  3. iceberg.catalog.warehouse=hdfs://hostname:port

Configuration Properties

Note

The Iceberg connector supports configuration options for Amazon S3 as a Hive connector.

The following configuration properties are available:

Property Name

Description

Default

hive.metastore.uri

The URI(s) of the Hive metastore to connect to using the Thrift protocol. If multiple URIs are provided, the first URI is used by default, and the rest of the URIs are fallback metastores. Example: thrift://192.0.2.3:9083 or thrift://192.0.2.3:9083,thrift://192.0.2.4:9083 This property is required if the iceberg.catalog.type is hive. Otherwise, it will be ignored.

iceberg.file-format

The storage file format for Iceberg tables. The available values are PARQUET and ORC.

ORC

iceberg.compression-codec

The compression codec to use when writing files. The available values are NONE, SNAPPY, GZIP, LZ4, and ZSTD.

GZIP

iceberg.catalog.type

The catalog type for Iceberg tables. The available values are hive, hadoop, and nessie corresponding to the catalogs in Iceberg.

hive

iceberg.catalog.warehouse

The catalog warehouse root path for Iceberg tables. Example: hdfs://nn:8020/warehouse/path This property is required if the iceberg.catalog.type is hadoop. Otherwise, it will be ignored.

iceberg.catalog.cached-catalog-num

The number of Iceberg catalogs to cache. This property is required if the iceberg.catalog.type is hadoop. Otherwise, it will be ignored.

10

iceberg.hadoop.config.resources

The path(s) for Hadoop configuration resources. Example: /etc/hadoop/conf/core-site.xml. This property is required if the iceberg.catalog.type is hadoop. Otherwise, it will be ignored.

iceberg.max-partitions-per-writer

The Maximum number of partitions handled per writer.

100

iceberg.minimum-assigned-split-weight

A decimal value in the range (0, 1] is used as a minimum for weights assigned to each split. A low value may improve performance on tables with small files. A higher value may improve performance for queries with highly skewed aggregations or joins.

0.05

iceberg.enable-merge-on-read-mode

Enable reading base tables that use merge-on-read for updates. The Iceberg connector currently does not read delete lists, which means any updates will not be reflected in the table.

false

Table Properties

Table properties set metadata for the underlying tables. This is key for CREATE TABLE/CREATE TABLE AS statements. Table properties are passed to the connector using a WITH clause:

  1. CREATE TABLE tablename
  2. WITH (
  3. property_name = property_value,
  4. ...
  5. )

The following table properties are available, which are specific to the Presto Iceberg connector:

Property Name

Description

format

Optionally specifies the format of table data files, either PARQUET or ORC. Defaults to PARQUET.

partitioning

Optionally specifies table partitioning. If a table is partitioned by columns c1 and c2, the partitioning property is partitioning = ARRAY[‘c1’, ‘c2’].

location

Optionally specifies the file system location URI for the table.

format_version

Optionally specifies the format version of the Iceberg specification to use for new tables, either 1 or 2. Defaults to 1.

The table definition below specifies format ORC, partitioning by columns c1 and c2, and a file system location of s3://test_bucket/test_schema/test_table:

  1. CREATE TABLE test_table (
  2. c1 bigint,
  3. c2 varchar,
  4. c3 double
  5. )
  6. WITH (
  7. format = 'ORC',
  8. partitioning = ARRAY['c1', 'c2'],
  9. location = 's3://test_bucket/test_schema/test_table')
  10. )

Extra Hidden Metadata Tables

The Iceberg connector exposes extra hidden metadata tables. You can query these as a part of a SQL query by appending name to the table.

$properties Table

  • $properties : General properties of the given table
  1. SELECT * FROM "ctas_nation$properties";
  1. key | value
  2. ----------------------+---------
  3. write.format.default | PARQUET

$history Table

  • $history : History of table state changes
  1. SELECT * FROM "ctas_nation$history";
  1. made_current_at | snapshot_id | parent_id | is_current_ancestor
  2. --------------------------------------+---------------------+-----------+---------------------
  3. 2022-11-25 20:56:31.784 Asia/Kolkata | 7606232158543069775 | NULL | true

$snapshots Table

  • $snapshots : Details about the table snapshots, see the details here.
  1. SELECT * FROM "ctas_nation$snapshots";
  1. committed_at | snapshot_id | parent_id | operation | manifest_list | summary
  2. --------------------------------------+---------------------+-----------+-----------+----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  3. 2022-11-25 20:56:31.784 Asia/Kolkata | 7606232158543069775 | NULL | append | s3://my-bucket/ctas_nation/metadata/snap-7606232158543069775-1-395a2cad-b244-409b-b030-cc44949e5a4e.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=25, total-position-deletes=0, added-files-size=1648, total-delete-files=0, total-files-size=1648, total-records=25, total-data-files=1}

$manifests Table

  • $manifests : Details about the manifests of different table snapshots, see the details here.
  1. SELECT * FROM "ctas_nation$manifests";
  1. path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partitions
  2. ---------------------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+-----------
  3. s3://my-bucket/ctas_nation/metadata/395a2cad-b244-409b-b030-cc44949e5a4e-m0.avro | 5957 | 0 | 7606232158543069775 | 1 | 0 | 0 | []

$partitions Table

  • $partitions : Detailed partition information for the table
  1. SELECT * FROM "ctas_nation$partitions";
  1. row_count | file_count | total_size | nationkey | name | regionkey | comment
  2. -----------+------------+------------+-------------------------------+------------------------------------------+------------------------------+------------------------------------------------------------
  3. 25 | 1 | 1648 | {min=0, max=24, null_count=0} | {min=ALGERIA, max=VIETNAM, null_count=0} | {min=0, max=4, null_count=0} | {min= haggle. careful, max=y final packaget, null_count=0}

$files Table

  • $files : Overview of data files in the current snapshot of the table
  1. SELECT * FROM "ctas_nation$files";
  1. content | file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids
  2. ---------+------------------------------------------------------------------------------+-------------+--------------+--------------------+-----------------------------+--------------------------+----------------------+------------------+-------------------------------------------+--------------------------------------------+--------------+---------------+-------------
  3. 0 | s3://my-bucket/ctas_nation/data/9f889274-6f74-4d28-8164-275eef99f660.parquet | PARQUET | 25 | 1648 | {1=52, 2=222, 3=105, 4=757} | {1=25, 2=25, 3=25, 4=25} | {1=0, 2=0, 3=0, 4=0} | NULL | {1=0, 2=ALGERIA, 3=0, 4= haggle. careful} | {1=24, 2=VIETNAM, 3=4, 4=y final packaget} | NULL | NULL | NULL

SQL Support

The Iceberg connector supports querying and manipulating Iceberg tables and schemas (databases). Here are some examples of the SQL operations supported by Presto :

CREATE SCHEMA

Create a new Iceberg schema named web that will store tables in an S3 bucket named my-bucket:

  1. CREATE SCHEMA iceberg.web
  2. WITH (location = 's3://my-bucket/')

CREATE TABLE

Create a new Iceberg table named page_views in the web schema that is stored using the ORC file format, partitioned by ds and country:

  1. CREATE TABLE iceberg.web.page_views (
  2. view_time timestamp,
  3. user_id bigint,
  4. page_url varchar,
  5. ds date,
  6. country varchar
  7. )
  8. WITH (
  9. format = 'ORC',
  10. partitioning = ARRAY['ds', 'country']
  11. )

Create an Iceberg table with Iceberg format version 2:

  1. CREATE TABLE iceberg.web.page_views_v2 (
  2. view_time timestamp,
  3. user_id bigint,
  4. page_url varchar,
  5. ds date,
  6. country varchar
  7. )
  8. WITH (
  9. format = 'ORC',
  10. partitioning = ARRAY['ds', 'country'],
  11. format_version = '2'
  12. )

Partition Column Transform

Beyond selecting some particular columns for partitioning, you can use the transform functions and partition the table by the transformed value of the column.

Available transforms in the Presto Iceberg connector include:

  • Bucket (partitions data into a specified number of buckets using a hash function)

  • Truncate (partitions the table based on the truncated value of the field and can specify the width of the truncated value)

Create an Iceberg table partitioned into 8 buckets of equal sized ranges:

  1. CREATE TABLE players (
  2. id int,
  3. name varchar,
  4. team varchar
  5. )
  6. WITH (
  7. format = 'ORC',
  8. partitioning = ARRAY['bucket(team, 8)']
  9. );

Create an Iceberg table partitioned by the first letter of the team field:

  1. CREATE TABLE players (
  2. id int,
  3. name varchar,
  4. team varchar
  5. )
  6. WITH (
  7. format = 'ORC',
  8. partitioning = ARRAY['truncate(team, 1)']
  9. );

Note

Day, Month, Year, Hour partition column transform functions are not supported in Presto Iceberg connector yet (#20570).

CREATE VIEW

The Iceberg connector supports creating views in Hive and Glue metastores. To create a view named view_page_views for the iceberg.web.page_views table created in the CREATE TABLE example:

  1. CREATE VIEW iceberg.web.view_page_views AS SELECT user_id, country FROM iceberg.web.page_views;

INSERT INTO

Insert data into the page_views table:

  1. INSERT INTO iceberg.web.page_views VALUES(TIMESTAMP '2023-08-12 03:04:05.321', 1, 'https://example.com', current_date, 'country');

CREATE TABLE AS SELECT

Create a new table page_views_new from an existing table page_views:

  1. CREATE TABLE iceberg.web.page_views_new AS SELECT * FROM iceberg.web.page_views

SELECT

SELECT table operations are supported for Iceberg format version 1 and version 2 in the connector:

  1. SELECT * FROM iceberg.web.page_views;
  2. SELECT * FROM iceberg.web.page_views_v2;

Note

The SELECT operations on Iceberg Tables with format version 2 do not read the delete files and remove the deleted rows as of now (#20492).

ALTER TABLE

Alter table operations are supported in the connector:

  1. ALTER TABLE iceberg.web.page_views ADD COLUMN zipcode VARCHAR;
  2. ALTER TABLE iceberg.web.page_views RENAME COLUMN zipcode TO location;
  3. ALTER TABLE iceberg.web.page_views DROP COLUMN location;

To add a new column as a partition column, identify the transform functions for the column. The table is partitioned by the transformed value of the column:

  1. ALTER TABLE iceberg.web.page_views ADD COLUMN zipcode VARCHAR WITH (partitioning = 'identity');
  2. ALTER TABLE iceberg.web.page_views ADD COLUMN location VARCHAR WITH (partitioning = 'truncate(2)');
  3. ALTER TABLE iceberg.web.page_views ADD COLUMN location VARCHAR WITH (partitioning = 'bucket(8)');

Note

Day, Month, Year, Hour partition column transform functions are not supported in the Presto Iceberg connector.

TRUNCATE

The iceberg connector can delete all of the data from tables without dropping the table from the metadata catalog using TRUNCATE TABLE.

  1. TRUNCATE TABLE nation;
  1. TRUNCATE TABLE;
  1. SELECT * FROM nation;
  1. nationkey | name | regionkey | comment
  2. -----------+------+-----------+---------
  3. (0 rows)

DELETE

The iceberg connector can delete data in one or more entire partitions from tables by using DELETE FROM. For example, to delete from the table lineitem:

  1. DELETE FROM lineitem;
  2. DELETE FROM lineitem WHERE linenumber = 1;
  3. DELETE FROM lineitem WHERE linenumber not in (1, 3, 5, 7) and linestatus in ('O', 'F');

Note

Columns in the filter must all be identity transformed partition columns of the target table.

Filtered columns only support comparison operators, such as EQUALS, LESS THAN, or LESS THAN EQUALS.

Deletes must only occur on the latest snapshot.

DROP TABLE

Drop the table page_views

  1. DROP TABLE iceberg.web.page_views
  • Dropping an Iceberg table with Hive Metastore and Glue catalogs only removes metadata from metastore.

  • Dropping an Iceberg table with Hadoop and Nessie catalogs removes all the data and metadata in the table.

DROP VIEW

Drop the view view_page_views:

  1. DROP VIEW iceberg.web.view_page_views;

DROP SCHEMA

Drop a schema:

  1. DROP SCHEMA iceberg.web

Schema Evolution

Iceberg and Presto Iceberg connector support in-place table evolution, aka schema evolution, such as adding, dropping, and renaming columns. With schema evolution, users can evolve a table schema with SQL after enabling the Presto Iceberg connector.

Example Queries

Let’s create an Iceberg table named ctas_nation, created from the TPCH nation table. The table has four columns: nationkey, name, regionkey, and comment.

  1. USE iceberg.tpch;
  2. CREATE TABLE IF NOT EXISTS ctas_nation AS (SELECT * FROM nation);
  3. DESCRIBE ctas_nation;
  1. Column | Type | Extra | Comment
  2. -----------+---------+-------+---------
  3. nationkey | bigint | |
  4. name | varchar | |
  5. regionkey | bigint | |
  6. comment | varchar | |
  7. (4 rows)

We can simply add a new column to the Iceberg table by using ALTER TABLE statement. The following query adds a new column named zipcode to the table.

  1. ALTER TABLE ctas_nation ADD COLUMN zipcode VARCHAR;
  2. DESCRIBE ctas_nation;
  1. Column | Type | Extra | Comment
  2. -----------+---------+-------+---------
  3. nationkey | bigint | |
  4. name | varchar | |
  5. regionkey | bigint | |
  6. comment | varchar | |
  7. zipcode | varchar | |
  8. (5 rows)

We can also rename the new column to another name, address:

  1. ALTER TABLE ctas_nation RENAME COLUMN zipcode TO address;
  2. DESCRIBE ctas_nation;
  1. Column | Type | Extra | Comment
  2. -----------+---------+-------+---------
  3. nationkey | bigint | |
  4. name | varchar | |
  5. regionkey | bigint | |
  6. comment | varchar | |
  7. address | varchar | |
  8. (5 rows)

Finally, we can delete the new column. The table columns will be restored to the original state.

  1. ALTER TABLE ctas_nation DROP COLUMN address;
  2. DESCRIBE ctas_nation;
  1. Column | Type | Extra | Comment
  2. -----------+---------+-------+---------
  3. nationkey | bigint | |
  4. name | varchar | |
  5. regionkey | bigint | |
  6. comment | varchar | |
  7. (4 rows)

Time Travel

Iceberg and Presto Iceberg connector support time travel via table snapshots identified by unique snapshot IDs. The snapshot IDs are stored in the $snapshots metadata table. We can rollback the state of a table to a previous snapshot ID.

Example Queries

Similar to the example queries in Schema Evolution, let’s create an Iceberg table named ctas_nation, created from the TPCH nation table.

  1. USE iceberg.tpch;
  2. CREATE TABLE IF NOT EXISTS ctas_nation AS (SELECT * FROM nation);
  3. DESCRIBE ctas_nation;
  1. Column | Type | Extra | Comment
  2. -----------+---------+-------+---------
  3. nationkey | bigint | |
  4. name | varchar | |
  5. regionkey | bigint | |
  6. comment | varchar | |
  7. (4 rows)

We can find snapshot IDs for the Iceberg table from the $snapshots metadata table.

  1. SELECT snapshot_id FROM iceberg.tpch."ctas_nation$snapshots" ORDER BY committed_at;
  1. snapshot_id
  2. ---------------------
  3. 5837462824399906536
  4. (1 row)

For now, as we’ve just created the table, there’s only one snapshot ID. Let’s insert one row into the table and see the change in the snapshot IDs.

  1. INSERT INTO ctas_nation VALUES(25, 'new country', 1, 'comment');
  2. SELECT snapshot_id FROM iceberg.tpch."ctas_nation$snapshots" ORDER BY committed_at;
  1. snapshot_id
  2. ---------------------
  3. 5837462824399906536
  4. 5140039250977437531
  5. (2 rows)

Now there’s a new snapshot (5140039250977437531) created as a new row is inserted into the table. The new row can be verified by running

  1. SELECT * FROM ctas_nation WHERE name = 'new country';
  1. nationkey | name | regionkey | comment
  2. -----------+-------------+-----------+---------
  3. 25 | new country | 1 | comment
  4. (1 row)

With the time travel feature, we can rollback to the previous state without the new row by calling iceberg.system.rollback_to_snapshot:

  1. CALL iceberg.system.rollback_to_snapshot('tpch', 'ctas_nation', 5837462824399906536);

Now if we check the table again, we’ll find that the newly inserted row no longer exists as we’ve rolled back to the previous state.

  1. SELECT * FROM ctas_nation WHERE name = 'new country';
  1. nationkey | name | regionkey | comment
  2. -----------+------+-----------+---------
  3. (0 rows)

Iceberg Connector Limitations

  • The SELECT operations on Iceberg Tables with format version 2 do not read the delete files and remove the deleted rows as of now (#20492).