Looks like Commander Bun Bun is safe on this Iceberg
https://joshdata.me/iceberger.html
Trino Summit 2022 is coming around the corner! This free event on November 10th will take place in-person at the Commonwealth Club in San Francisco, CA or can also be attended remotely! If you want to present, the call for speakers is open until September 15th.
You can register for the conference at any time. We must limit in-person registrations to 250 attendees, so register soon if you plan on attending in person!
Official highlights from Martin Traverso:
EXPLAIN
.LIKE
expressions.query
table function in BigQuery connector.INSERT
support in BigQuery connector.INSERT
queries.Additional highlights worth a mention according to Cole:
More detailed information is available in the release notes for Trino 394, and Trino 395.
It has been over a year since we had Ryan on the Trino Community Broadcast as guest to discuss what Apache Iceberg is and how it can be used in Trino. Since then, the adoption of Iceberg in our community has skyrocketed. Iceberg is delivering as a much better alternative to the Hive table format.
The initial phase of the Iceberg connector in Trino aimed to provide fast and interoperable read support. A typical usage was Trino alongside other query engines like Apache Spark which supported many of the data modification language (DML) SQL features on Iceberg. One of the biggest requests we got as adoption increased was the ability to do everything through Trino. This episode dives into some of the latest features that were missing from the early iterations of the Iceberg connector and what has changed in Iceberg as well!
Iceberg is a next-generation table format that defines a standard around the metadata used to map data to a SQL query engine. It addresses a lot of the maintainability and reliability issues many engineers experienced with the way Hive modeled SQL tables over big data files.
One common confusion to point out is that table format is not equivalent to file formats like ORC or Parquet. The table format is the layer that maintains metadata mapping these files to the concept of a table and other common database abstractions.
This episode assumes you have some basic knowledge of Trino and Iceberg already. If you are new to Iceberg or need a refresher, we recommend the two older episodes about Iceberg and Trino basics:
There has been some great advancements to big data technologies that brought back SQL and data warehouse capabilities. However, Hive and Hive-like table formats are still missing some capabilities due to limitations that Hive tables have, such as dropping and reintroducing stale data unintentionally. On top of that, Hive tables require a lot of knowledge of Hive internals. Some recent formats aim to remain backwards compatible with Hive, but inadvertently reintroduce these limitations.
This is not the case with Iceberg. Iceberg has the most support for query engines and puts a heavy emphasis on being a format that is interoperable. This improves the level of flexibility users have to address a wider array of use cases that may involve querying over a system like Snowflake or a data lakehouse running with Iceberg. All of this is made possible by the Iceberg specification that all these query engines must follow.
Finally, a great video presented by Ryan Blue that dives into Iceberg is, “Why you shouldn’t care about Iceberg.”
Catalogs, in the context of Iceberg, refer to the central storage of metadata. Catalogs are also used to provide the atomic compare-and-swap needed to support serializable isolation in Iceberg. We’ll refer to them as metadata catalogs to avoid confusion with Trino catalogs.
The two existing catalogs supported in Trino’s Iceberg connector are the Hive Metastore Service and the AWS metastore counterpart of the Hive Metastore, Glue. While this provides a nice migration from the Hive model, many are looking to replace these rather cumbersome catalogs with something that’s lightweight. It turns out that the Iceberg connector only uses the Hive Metastore Service to point to top-level metadata files in Iceberg while the majority of metadata exist in the metastore files in storage. This makes it even more compelling to get rid of the complex Hive service in favor of simpler services. Two popular catalogs outside of these are the JDBC catalog and the REST catalog.
There are two PRs in progress to support these metadata catalogs in Trino:
Another feature set that is coming in Iceberg is the ability to use refs to alias your snapshots. This would enable branching and tagging behavior similar to git and treating the snapshot as a commit. This is yet another way that simplifies moving between known states of the data in Iceberg.
On a related note, branching and tagging will eventually be used in the
audit integration in Iceberg.
Auditing allows you to push a soft commit by making a snapshot available, but
it is not initially published to the primary table. This is achieved using Spark
and setting the spark.wap.id
configuration property. This enables interesting
patterns like
Write-Audit-Publish (WAP) pattern,
where you first write the data, audit it using a data quality tool like
Great Expectations, and lastly publish the data
to be visible from the main table. Currently, auditing has to use the
cherry-pick operation to publish. This becomes more streamlined with branching
and tagging.
The Puffin file format is an alternative to Parquet and ORC. This format stores information such as indexes and statistics about data managed in an Iceberg table that cannot be stored directly within the Iceberg manifest. A Puffin file contains arbitrary pieces of information called “blobs”, along with metadata necessary to interpret them.
This format was proposed by long-time Trino maintainer, Piotr Findeisen @findepi, to address a performance issue noted when using Trino on Iceberg. The Puffin format is a great extension for those using Iceberg tables, as it enables better query plans in Trino at the file level.
The pyIceberg library is an exciting development that enables users to read their data directly from Iceberg into their own Python code easily.
MERGE
(PR)UPDATE
(PR)DELETE
(PR)@
syntax for snapshots/time travel
was deprecated in
version 387,
and there were two bug fixes for this feature in versions
386 and
388.optimize
(PR) which is the equivalent to
the Spark SQL
rewrite_data_files.expire_snapshots
(PR) and uses the equivalent name
in Spark.remove_orphan_files
(PR) and uses the equivalent name
in Spark.Almost every release has some sort of Iceberg improvement around planning or pushdown. If you want all the latest features and performance improvements described here, it’s important to keep up with the latest Trino version.
This PR of the episode was contributed by Gaurav Sehgal (@gaurav8297) to enable Trino to automatically scale writers. This PR aims to the number of task writers per worker.
You can enable this feature by setting scale_task_writers
true in your
configuration. Its initial test results are showing a sixfold speed increase.
Thank you so much to Gaurav and all the reviewers that got this PR through!
For this demo of the episode, we use the same schema as the demo we ran in episode 15, and revise the syntax to include new features.
Let’s start up a local Trino coordinator and Hive metastore. Clone the
repository and navigate to the iceberg/trino-iceberg-minio
directory. Then
start up the containers using Docker Compose.
git clone [email protected]:bitsondatadev/trino-getting-started.git
cd iceberg/trino-iceberg-minio
docker-compose up -d
Now open up your favorite Trino client and connect it to
localhost:8080
to run the following commands:
/**
* Make sure to first create a bucket names "logging" in MinIO before running
*/
CREATE SCHEMA iceberg.logging
WITH (location = 's3a://logging/');
/**
* Create table
*/
CREATE TABLE iceberg.logging.logs (
level varchar NOT NULL,
event_time timestamp(6) with time zone NOT NULL,
message varchar NOT NULL,
call_stack array(varchar)
)
WITH (
format_version = 2, -- New property to specify Iceberg spec format. Default 2
format = 'ORC',
partitioning = ARRAY['day(event_time)','level']
);
/**
* Inserting two records. Notice event_time is on the same day but different hours.
*/
INSERT INTO iceberg.logging.logs VALUES
(
'ERROR',
timestamp '2021-04-01 12:23:53.383345' AT TIME ZONE 'America/Los_Angeles',
'1 message',
ARRAY ['Exception in thread "main" java.lang.NullPointerException']
),
(
'ERROR',
timestamp '2021-04-01 13:36:23' AT TIME ZONE 'America/Los_Angeles',
'2 message',
ARRAY ['Exception in thread "main" java.lang.NullPointerException']
);
SELECT * FROM iceberg.logging.logs;
SELECT * FROM iceberg.logging."logs$partitions";
/**
* Notice one partition was created for both records at the day granularity.
*/
/**
* Update the partitioning from daily to hourly 🎉
*/
ALTER TABLE iceberg.logging.logs
SET PROPERTIES partitioning = ARRAY['hour(event_time)'];
/**
* Inserting three records. Notice event_time is on the same day but different hours.
*/
INSERT INTO iceberg.logging.logs VALUES
(
'ERROR',
timestamp '2021-04-01 15:55:23' AT TIME ZONE 'America/Los_Angeles',
'3 message',
ARRAY ['Exception in thread "main" java.lang.NullPointerException']
),
(
'WARN',
timestamp '2021-04-01 15:55:23' AT TIME ZONE 'America/Los_Angeles',
'4 message',
ARRAY ['bad things could be happening']
),
(
'WARN',
timestamp '2021-04-01 16:55:23' AT TIME ZONE 'America/Los_Angeles',
'5 message',
ARRAY ['bad things could be happening']
);
SELECT * FROM iceberg.logging.logs;
SELECT * FROM iceberg.logging."logs$partitions";
/**
* Now there are three partitions:
* 1) One partition at the day granularity containing our original records.
* 2) One at the hour granularity for hour 15 containing two new records.
* 3) One at the hour granularity for hour 16 containing the last new record.
*/
SELECT * FROM iceberg.logging.logs
WHERE event_time < timestamp '2021-04-01 16:55:23' AT TIME ZONE 'America/Los_Angeles';
/**
* This query correctly returns 4 records with only the first two partitions
* being touched. Now let's check the snapshots.
*/
SELECT * FROM iceberg.logging.logs;
SELECT * FROM iceberg.logging."logs$snapshots";
/**
* Update
*/
UPDATE
iceberg.logging.logs
SET
call_stack = call_stack || 'WHALE HELLO THERE!'
WHERE
lower(level) = 'warn';
SELECT * FROM iceberg.logging.logs;
SELECT * FROM iceberg.logging."logs$snapshots";
/**
* Read data from an old snapshot (Time travel)
*
* Old way: SELECT * FROM iceberg.logging."logs@2806470637437034115";
*/
SELECT * FROM iceberg.logging.logs FOR VERSION AS OF 2806470637437034115;
/**
* Merge
*/
CREATE TABLE iceberg.logging.src (
level varchar NOT NULL,
message varchar NOT NULL,
call_stack array(varchar)
)
WITH (
format = 'ORC'
);
INSERT INTO iceberg.logging.src VALUES
(
'ERROR',
'3 message',
ARRAY ['This one will not show up because it is an ERROR']
),
(
'WARN',
'4 message',
ARRAY ['This should show up']
),
(
'WARN',
'5 message',
ARRAY ['This should show up as well']
);
MERGE INTO iceberg.logging.logs AS t
USING iceberg.logging.src AS s
ON s.message = t.message
WHEN MATCHED AND s.level = 'ERROR'
THEN DELETE
WHEN MATCHED
THEN UPDATE
SET message = s.message || '-updated',
call_stack = s.call_stack || t.call_stack;
DROP TABLE iceberg.logging.logs;
DROP SCHEMA iceberg.logging;
This is just the tip of the iceberg that shows the powerful MERGE
statement
and the other features we have added to Iceberg!
Blog posts
Check out the in-person and virtual Trino Meetup groups.
If you want to learn more about Trino, check out the definitive guide from O’Reilly. You can download the free PDF or buy the book online.
Music for the show is from the Megaman 6 Game Play album by Krzysztof Slowikowski.