Iceberg Connector#
Overview#
The Iceberg connector allows querying data stored in Iceberg tables.
Metastores#
Iceberg tables store most of the metadata in the metadata files, along with the data on the
filesystem, but it still requires a central place to find the current location of the
current metadata pointer for a table. This central place is called the Iceberg Catalog
.
The Presto Iceberg connector supports different types of Iceberg Catalogs : Hive Metastore
,
GLUE
, NESSIE
, and HADOOP
.
To configure the Iceberg connector, create a catalog properties file
etc/catalog/iceberg.properties
. To define the catalog type, iceberg.catalog.type
property
is required along with the following contents, with the property values replaced as follows:
Hive Metastore catalog#
The Iceberg connector supports the same configuration for HMS as a Hive connector.
connector.name=iceberg
hive.metastore.uri=hostname:port
iceberg.catalog.type=hive
Glue catalog#
The Iceberg connector supports the same configuration for Glue as a Hive connector.
connector.name=iceberg
hive.metastore=glue
iceberg.catalog.type=hive
Nessie catalog#
To use a Nessie catalog, configure the catalog type as
iceberg.catalog.type=nessie
.
connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.catalog.warehouse=/tmp
iceberg.nessie.uri=https://localhost:19120/api/v1
Additional supported properties for the Nessie catalog:
Property Name |
Description |
---|---|
|
The branch/tag to use for Nessie, defaults to |
|
Nessie API endpoint URI (required).
Example: |
|
The authentication type to use.
Available values are Note: Nessie BASIC authentication type is deprecated, this will be removed in upcoming release |
|
The username to use with |
|
The password to use with |
|
The token to use with |
|
The read timeout in milliseconds for requests
to the Nessie server.
Example: |
|
The connection timeout in milliseconds for the connection
requests to the Nessie server.
Example: |
|
Configuration of whether compression should be enabled or
not for requests to the Nessie server, defaults to |
|
Configuration of the custom ClientBuilder implementation class to be used. |
Setting Up Nessie With Docker#
To set up a Nessie instance locally using the Docker image, see Setting up Nessie. Once the Docker instance is up and running, you should see logs similar to the following example:
2023-09-05 13:11:37,905 INFO [io.quarkus] (main) nessie-quarkus 0.69.0 on JVM (powered by Quarkus 3.2.4.Final) started in 1.921s. Listening on: http://0.0.0.0:19120
2023-09-05 13:11:37,906 INFO [io.quarkus] (main) Profile prod activated.
2023-09-05 13:11:37,906 INFO [io.quarkus] (main) Installed features: [agroal, amazon-dynamodb, cassandra-client, cdi, google-cloud-bigtable, hibernate-validator, jdbc-postgresql, logging-sentry, micrometer, mongodb-client, narayana-jta, oidc, opentelemetry, reactive-routes, resteasy, resteasy-jackson, security, security-properties-file, smallrye-context-propagation, smallrye-health, smallrye-openapi, swagger-ui, vertx]
If log messages related to Nessie’s OpenTelemetry collector appear similar to the following example, you can disable OpenTelemetry using the configuration option quarkus.otel.sdk.disabled=true
.
2023-08-27 11:10:02,492 INFO [io.qua.htt.access-log] (executor-thread-1) 172.17.0.1 - - [27/Aug/2023:11:10:02 +0000] "GET /api/v1/config HTTP/1.1" 200 62
2023-08-27 11:10:05,007 SEVERE [io.ope.exp.int.grp.OkHttpGrpcExporter] (OkHttp http://localhost:4317/...) Failed to export spans. The request could not be executed. Full error message: Failed to connect to localhost/127.0.0.1:4317
For example, start the Docker image using the following command:
docker run -p 19120:19120 -e QUARKUS_OTEL_SDK_DISABLED=true ghcr.io/projectnessie/nessie
For more information about this configuration option and other related options, see the OpenTelemetry Configuration Reference.
For more information about troubleshooting OpenTelemetry traces, see Troubleshooting traces.
If an error similar to the following example is displayed, this is probably because you are interacting with an http server, and not an https server. You need to set iceberg.nessie.uri
to http://localhost:19120/api/v1
.
Caused by: javax.net.ssl.SSLException: Unsupported or unrecognized SSL message
at sun.security.ssl.SSLSocketInputRecord.handleUnknownRecord(SSLSocketInputRecord.java:448)
at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:174)
at sun.security.ssl.SSLTransport.decode(SSLTransport.java:111)
at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1320)
at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1233)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:417)
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:389)
at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:558)
at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:201)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
at org.projectnessie.client.http.impl.jdk8.UrlConnectionRequest.executeRequest(UrlConnectionRequest.java:71)
... 42 more
Hadoop catalog#
To use a Hadoop catalog, configure the catalog type as
iceberg.catalog.type=hadoop
connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=hdfs://hostname:port
Configuration Properties#
Note
The Iceberg connector supports configuration options for Amazon S3 as a Hive connector.
The following configuration properties are available:
Property Name |
Description |
Default |
---|---|---|
|
The URI(s) of the Hive metastore to connect to using the
Thrift protocol. If multiple URIs are provided, the first
URI is used by default, and the rest of the URIs are
fallback metastores.
Example: |
|
|
The storage file format for Iceberg tables. The available
values are |
|
|
The compression codec to use when writing files. The
available values are |
|
|
The catalog type for Iceberg tables. The available values
are |
|
|
The catalog warehouse root path for Iceberg tables.
|
|
|
The number of Iceberg catalogs to cache. This property is
required if the |
|
|
The path(s) for Hadoop configuration resources.
|
|
|
The Maximum number of partitions handled per writer. |
|
|
A decimal value in the range (0, 1] is used as a minimum for weights assigned to each split. A low value may improve performance on tables with small files. A higher value may improve performance for queries with highly skewed aggregations or joins. |
|
|
Enable reading base tables that use merge-on-read for updates. |
|
|
When enabled, equality delete row filtering is applied as a join with the data of the equality delete files. |
|
Table Properties#
Table properties set metadata for the underlying tables. This is key for CREATE TABLE/CREATE TABLE AS statements. Table properties are passed to the connector using a WITH clause:
CREATE TABLE tablename
WITH (
property_name = property_value,
...
)
The following table properties are available, which are specific to the Presto Iceberg connector:
Property Name |
Description |
---|---|
|
Optionally specifies the format of table data files,
either |
|
Optionally specifies table partitioning. If a table
is partitioned by columns |
|
Optionally specifies the file system location URI for the table. |
|
Optionally specifies the format version of the Iceberg
specification to use for new tables, either |
The table definition below specifies format ORC
, partitioning by columns c1
and c2
,
and a file system location of s3://test_bucket/test_schema/test_table
:
CREATE TABLE test_table (
c1 bigint,
c2 varchar,
c3 double
)
WITH (
format = 'ORC',
partitioning = ARRAY['c1', 'c2'],
location = 's3://test_bucket/test_schema/test_table')
)
Session Properties#
Session properties set behavior changes for queries executed within the given session.
Property Name |
Description |
---|---|
|
Overrides the behavior of the connector property
|
Caching Support#
Manifest File Caching#
As of Iceberg version 1.1.0, Apache Iceberg provides a mechanism to cache the contents of Iceberg manifest files in memory. This feature helps to reduce repeated reads of small Iceberg manifest files from remote storage.
Note
Currently, manifest file caching is supported for Hadoop and Nessie catalogs in the Presto Iceberg connector.
The following configuration properties are available:
Property Name |
Description |
Default |
---|---|---|
|
Enable or disable the manifest caching feature. This feature
is only available if |
|
|
Custom FileIO implementation to use in a catalog. It must be set to enable manifest caching. |
|
|
Maximum size of cache size in bytes. |
|
|
Maximum time duration in milliseconds for which an entry stays in the manifest cache. |
|
|
Maximum length of a manifest file to be considered for caching in bytes. Manifest files with a length exceeding this size will not be cached. |
|
Alluxio Data Cache#
A Presto worker caches remote storage data in its original form (compressed and possibly encrypted) on local SSD upon read.
The following configuration properties are required to set in the Iceberg catalog file (catalog/iceberg.properties):
cache.enabled=true
cache.base-directory=file:///mnt/flash/data
cache.type=ALLUXIO
cache.alluxio.max-cache-size=1600GB
hive.node-selection-strategy=SOFT_AFFINITY
JMX queries to get the metrics and verify the cache usage:
SELECT * FROM jmx.current."com.facebook.alluxio:name=client.cachehitrate,type=gauges";
SELECT * FROM jmx.current."com.facebook.alluxio:name=client.cachebytesreadcache,type=meters";
SHOW TABLES FROM jmx.current like '%alluxio%';
Metastore Versioned Cache#
Metastore cache only caches schema and table names. Other metadata would be fetched from the filesystem.
Note
Metastore Versioned Cache would be applicable only for Hive Catalog in the Presto Iceberg connector.
hive.metastore-cache-ttl=2d
hive.metastore-refresh-interval=3d
hive.metastore-cache-maximum-size=10000000
SQL Support#
The Iceberg connector supports querying and manipulating Iceberg tables and schemas (databases). Here are some examples of the SQL operations supported by Presto:
CREATE SCHEMA#
Create a new Iceberg schema named web
that stores tables in an
S3 bucket named my-bucket
:
CREATE SCHEMA iceberg.web
WITH (location = 's3://my-bucket/')
CREATE TABLE#
Create a new Iceberg table named page_views
in the web
schema
that is stored using the ORC file format, partitioned by ds
and
country
:
CREATE TABLE iceberg.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
ds date,
country varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['ds', 'country']
)
Create an Iceberg table with Iceberg format version 2:
CREATE TABLE iceberg.web.page_views_v2 (
view_time timestamp,
user_id bigint,
page_url varchar,
ds date,
country varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['ds', 'country'],
format_version = '2'
)
Partition Column Transform#
Beyond selecting some particular columns for partitioning, you can use the transform
functions and partition the table
by the transformed value of the column.
Available transforms in the Presto Iceberg connector include:
Bucket
(partitions data into a specified number of buckets using a hash function)Truncate
(partitions the table based on the truncated value of the field and can specify the width of the truncated value)Identity
(partitions data using unmodified source value)Year
(partitions data using integer value by extracting a date or timestamp year, as years from 1970)Month
(partitions data using integer value by extracting a date or timestamp month, as months from 1970-01-01)Day
(partitions data using integer value by extracting a date or timestamp day, as days from 1970-01-01)Hour
(partitions data using integer value by extracting a timestamp hour, as hours from 1970-01-01 00:00:00)
Create an Iceberg table partitioned into 8 buckets of equal size ranges:
CREATE TABLE players (
id int,
name varchar,
team varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['bucket(team, 8)']
);
Create an Iceberg table partitioned by the first letter of the team
field:
CREATE TABLE players (
id int,
name varchar,
team varchar
)
WITH (
format = 'ORC',
partitioning = ARRAY['truncate(team, 1)']
);
Create an Iceberg table partitioned by ds
:
CREATE TABLE players (
id int,
name varchar,
team varchar,
ds date
)
WITH (
format = 'ORC',
partitioning = ARRAY['year(ds)']
);
Create an Iceberg table partitioned by ts
:
CREATE TABLE players (
id int,
name varchar,
team varchar,
ts timestamp
)
WITH (
format = 'ORC',
partitioning = ARRAY['hour(ts)']
);
CREATE VIEW#
The Iceberg connector supports creating views in Hive and Glue metastores.
To create a view named view_page_views
for the iceberg.web.page_views
table created in the CREATE TABLE example:
CREATE VIEW iceberg.web.view_page_views AS SELECT user_id, country FROM iceberg.web.page_views;
INSERT INTO#
Insert data into the page_views
table:
INSERT INTO iceberg.web.page_views VALUES(TIMESTAMP '2023-08-12 03:04:05.321', 1, 'https://example.com', current_date, 'country');
CREATE TABLE AS SELECT#
Create a new table page_views_new
from an existing table page_views
:
CREATE TABLE iceberg.web.page_views_new AS SELECT * FROM iceberg.web.page_views
SELECT#
SELECT table operations are supported for Iceberg format version 1 and version 2 in the connector:
SELECT * FROM iceberg.web.page_views;
SELECT * FROM iceberg.web.page_views_v2;
Table with delete files#
Iceberg V2 tables support row-level deletion. For more information see Row-level deletes in the Iceberg Table Spec. Presto supports reading delete files, including Position Delete Files and Equality Delete Files. When reading, Presto merges these delete files to read the latest results.
ALTER TABLE#
Alter table operations are supported in the Iceberg connector:
ALTER TABLE iceberg.web.page_views ADD COLUMN zipcode VARCHAR;
ALTER TABLE iceberg.web.page_views RENAME COLUMN zipcode TO location;
ALTER TABLE iceberg.web.page_views DROP COLUMN location;
To add a new column as a partition column, identify the transform functions for the column. The table is partitioned by the transformed value of the column:
ALTER TABLE iceberg.web.page_views ADD COLUMN zipcode VARCHAR WITH (partitioning = 'identity');
ALTER TABLE iceberg.web.page_views ADD COLUMN location VARCHAR WITH (partitioning = 'truncate(2)');
ALTER TABLE iceberg.web.page_views ADD COLUMN location VARCHAR WITH (partitioning = 'bucket(8)');
ALTER TABLE iceberg.web.page_views ADD COLUMN dt date WITH (partitioning = 'year');
ALTER TABLE iceberg.web.page_views ADD COLUMN ts timestamp WITH (partitioning = 'month');
ALTER TABLE iceberg.web.page_views ADD COLUMN dt date WITH (partitioning = 'day');
ALTER TABLE iceberg.web.page_views ADD COLUMN ts timestamp WITH (partitioning = 'hour');
TRUNCATE#
The Iceberg connector can delete all of the data from tables without
dropping the table from the metadata catalog using TRUNCATE TABLE
.
TRUNCATE TABLE nation;
TRUNCATE TABLE;
SELECT * FROM nation;
nationkey | name | regionkey | comment
-----------+------+-----------+---------
(0 rows)
DELETE#
The Iceberg connector can delete data in one or more entire partitions from tables by using DELETE FROM
. For example, to delete from the table lineitem
:
DELETE FROM lineitem;
DELETE FROM lineitem WHERE linenumber = 1;
DELETE FROM lineitem WHERE linenumber not in (1, 3, 5, 7) and linestatus in ('O', 'F');
Note
Columns in the filter must all be identity transformed partition columns of the target table.
Filtered columns only support comparison operators, such as EQUALS, LESS THAN, or LESS THAN EQUALS.
Deletes must only occur on the latest snapshot.
DROP TABLE#
Drop the table page_views
DROP TABLE iceberg.web.page_views
Dropping an Iceberg table with Hive Metastore and Glue catalogs only removes metadata from metastore.
Dropping an Iceberg table with Hadoop and Nessie catalogs removes all the data and metadata in the table.
DROP VIEW#
Drop the view view_page_views
:
DROP VIEW iceberg.web.view_page_views;
DROP SCHEMA#
Drop the schema iceberg.web
:
DROP SCHEMA iceberg.web
Register table#
Iceberg tables for which table data and metadata already exist in the
file system can be registered with the catalog using the register_table
procedure on the catalog’s system
schema by supplying the target schema,
desired table name, and the location of the table metadata:
CALL iceberg.system.register_table('schema_name', 'table_name', 'hdfs://localhost:9000/path/to/iceberg/table/metadata/dir')
Note
If multiple metadata files of the same version exist at the specified location, the most recently modified one will be used.
A metadata file can optionally be included as an argument to register_table
in the case where a specific metadata file contains the targeted table state:
CALL iceberg.system.register_table('schema_name', 'table_name', 'hdfs://localhost:9000/path/to/iceberg/table/metadata/dir', '00000-35a08aed-f4b0-4010-95d2-9d73ef4be01c.metadata.json')
Note
When registering a table with the Hive metastore, the user calling the
procedure will be set as the owner of the table and will have SELECT
,
INSERT
, UPDATE
, and DELETE
privileges for that table. These
privileges can be altered using the GRANT
and REVOKE
commands.
Note
When using the Hive catalog, attempts to read registered Iceberg tables using the Hive connector will fail.
Unregister table#
Iceberg tables can be unregistered from the catalog using the unregister_table
procedure on the catalog’s system
schema:
CALL iceberg.system.unregister_table('schema_name', 'table_name')
Note
Table data and metadata will remain in the filesystem after a call to
unregister_table
only when using the Hive catalog. This is similar to
the behavior listed above for the DROP TABLE
command.
Schema Evolution#
Iceberg and Presto Iceberg connector support in-place table evolution, also known as schema evolution, such as adding, dropping, and renaming columns. With schema evolution, users can evolve a table schema with SQL after enabling the Presto Iceberg connector.
Parquet Writer Version#
Presto now supports Parquet writer versions V1 and V2 for the Iceberg catalog.
It can be toggled using the session property parquet_writer_version
and the config property hive.parquet.writer.version
.
Valid values for these properties are PARQUET_1_0
and PARQUET_2_0
. Default is PARQUET_2_0
.
Example Queries#
Let’s create an Iceberg table named ctas_nation, created from the TPCH nation table. The table has four columns: nationkey, name, regionkey, and comment.
USE iceberg.tpch;
CREATE TABLE IF NOT EXISTS ctas_nation AS (SELECT * FROM nation);
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
(4 rows)
We can simply add a new column to the Iceberg table by using ALTER TABLE statement. The following query adds a new column named zipcode to the table.
ALTER TABLE ctas_nation ADD COLUMN zipcode VARCHAR;
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
zipcode | varchar | |
(5 rows)
We can also rename the new column to another name, address:
ALTER TABLE ctas_nation RENAME COLUMN zipcode TO address;
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
address | varchar | |
(5 rows)
Finally, we can delete the new column. The table columns will be restored to the original state.
ALTER TABLE ctas_nation DROP COLUMN address;
DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
(4 rows)
Time Travel#
Iceberg and Presto Iceberg connector support time travel via table snapshots
identified by unique snapshot IDs. The snapshot IDs are stored in the $snapshots
metadata table. You can rollback the state of a table to a previous snapshot ID.
It also supports time travel query using VERSION (SYSTEM_VERSION) and TIMESTAMP (SYSTEM_TIME) options.
Example Queries#
Similar to the example queries in SCHEMA EVOLUTION, create an Iceberg table named ctas_nation from the TPCH nation table:
.. code-block:: sql
USE iceberg.tpch; CREATE TABLE IF NOT EXISTS ctas_nation AS (SELECT * FROM nation); DESCRIBE ctas_nation;
Column | Type | Extra | Comment
-----------+---------+-------+---------
nationkey | bigint | |
name | varchar | |
regionkey | bigint | |
comment | varchar | |
(4 rows)
We can find snapshot IDs for the Iceberg table from the $snapshots metadata table.
SELECT snapshot_id FROM iceberg.tpch."ctas_nation$snapshots" ORDER BY committed_at;
snapshot_id
---------------------
5837462824399906536
(1 row)
For now, as we’ve just created the table, there’s only one snapshot ID. Let’s insert one row into the table and see the change in the snapshot IDs.
INSERT INTO ctas_nation VALUES(25, 'new country', 1, 'comment');
SELECT snapshot_id FROM iceberg.tpch."ctas_nation$snapshots" ORDER BY committed_at;
snapshot_id
---------------------
5837462824399906536
5140039250977437531
(2 rows)
Now there’s a new snapshot (5140039250977437531) created as a new row is inserted into the table. The new row can be verified by running
SELECT * FROM ctas_nation WHERE name = 'new country';
nationkey | name | regionkey | comment
-----------+-------------+-----------+---------
25 | new country | 1 | comment
(1 row)
With the time travel feature, we can rollback to the previous state without the new row by calling iceberg.system.rollback_to_snapshot:
CALL iceberg.system.rollback_to_snapshot('tpch', 'ctas_nation', 5837462824399906536);
Now if we check the table again, we’ll find that the newly inserted row no longer exists as we’ve rolled back to the previous state.
SELECT * FROM ctas_nation WHERE name = 'new country';
nationkey | name | regionkey | comment
-----------+------+-----------+---------
(0 rows)
Time Travel using VERSION (SYSTEM_VERSION) and TIMESTAMP (SYSTEM_TIME)#
Use the Iceberg connector to access the historical data of a table. You can see how the table looked like at a certain point in time, even if the data has changed or been deleted since then.
// snapshot ID 5300424205832769799
INSERT INTO ctas_nation VALUES(10, 'united states', 1, 'comment');
// snapshot ID 6891257133877048303
INSERT INTO ctas_nation VALUES(20, 'canada', 2, 'comment');
// snapshot ID 705548372863208787
INSERT INTO ctas_nation VALUES(30, 'mexico', 3, 'comment');
// snapshot ID for first record
SELECT * FROM ctas_nation FOR VERSION AS OF 5300424205832769799;
// snapshot ID for first record using SYSTEM_VERSION
SELECT * FROM ctas_nation FOR SYSTEM_VERSION AS OF 5300424205832769799;
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
(1 row)
In above example, SYSTEM_VERSION can be used as an alias for VERSION.
You can access the historical data of a table using FOR TIMESTAMP AS OF TIMESTAMP. The query returns the table’s state using the table snapshot that is closest to the specified timestamp. In this example, SYSTEM_TIME can be used as an alias for TIMESTAMP.
// In following query, timestamp string is matching with second inserted record.
SELECT * FROM ctas_nation FOR TIMESTAMP AS OF TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
// Same example using SYSTEM_TIME as an alias for TIMESTAMP
SELECT * FROM ctas_nation FOR SYSTEM_TIME AS OF TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
20 | canada | 2 | comment
(2 rows)
The option following FOR TIMESTAMP AS OF can accept any expression that returns a timestamp with time zone value. For example, TIMESTAMP ‘2023-10-17 13:29:46.822 America/Los_Angeles’ is a constant string for the expression. In the following query, the expression CURRENT_TIMESTAMP returns the current timestamp with time zone value.
SELECT * FROM ctas_nation FOR TIMESTAMP AS OF CURRENT_TIMESTAMP;
nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
20 | canada | 2 | comment
30 | mexico | 3 | comment
(3 rows)