Do you ❤️ Trino? Give us a 🌟 on GitHub

Trino Community Broadcast

40: Trino's cold as Iceberg!

Sep 8, 2022

Audio

 

Video

Video sections


Looks like Commander Bun Bun is safe on this Iceberg
https://joshdata.me/iceberger.html

Hosts

Guests

Register for Trino Summit 2022!

Trino Summit 2022 is coming around the corner! This free event on November 10th will take place in-person at the Commonwealth Club in San Francisco, CA or can also be attended remotely! If you want to present, the call for speakers is open until September 15th.

You can register for the conference at any time. We must limit in-person registrations to 250 attendees, so register soon if you plan on attending in person!

Releases 394 to 395

Official highlights from Martin Traverso:

Trino 394

  • JSON output format for EXPLAIN.
  • Improved performance for LIKE expressions.
  • query table function in BigQuery connector.
  • INSERT support in BigQuery connector.
  • TLS support in Pinot connector.

Trino 395

  • Faster INSERT queries.
  • Better performance for large clusters.
  • Improved memory efficiency for aggregations and fault tolerant execution.
  • Faster aggregations over decimal columns.
  • Support for dynamic function resolution.

Additional highlights worth a mention according to Cole:

  • The improved performance of inserts on Delta Lake, Hive, and Iceberg is a huge one. We’re not entirely sure how much it’ll matter in production use cases, but some of the benchmarks suggested it could be massive - one test showed a 75% reduction in query duration.
  • Dynamic function resolution in the SPI is going to unlock some very neat possibilities down the line.

More detailed information is available in the release notes for Trino 394, and Trino 395.

Concept of the week: Latest features in Apache Iceberg and the Iceberg connector

It has been over a year since we had Ryan on the Trino Community Broadcast as guest to discuss what Apache Iceberg is and how it can be used in Trino. Since then, the adoption of Iceberg in our community has skyrocketed. Iceberg is delivering as a much better alternative to the Hive table format.

The initial phase of the Iceberg connector in Trino aimed to provide fast and interoperable read support. A typical usage was Trino alongside other query engines like Apache Spark which supported many of the data modification language (DML) SQL features on Iceberg. One of the biggest requests we got as adoption increased was the ability to do everything through Trino. This episode dives into some of the latest features that were missing from the early iterations of the Iceberg connector and what has changed in Iceberg as well!

What is Apache Iceberg?

Iceberg is a next-generation table format that defines a standard around the metadata used to map data to a SQL query engine. It addresses a lot of the maintainability and reliability issues many engineers experienced with the way Hive modeled SQL tables over big data files.

One common confusion to point out is that table format is not equivalent to file formats like ORC or Parquet. The table format is the layer that maintains metadata mapping these files to the concept of a table and other common database abstractions.

This episode assumes you have some basic knowledge of Trino and Iceberg already. If you are new to Iceberg or need a refresher, we recommend the two older episodes about Iceberg and Trino basics:

Why Iceberg over other formats?

There has been some great advancements to big data technologies that brought back SQL and data warehouse capabilities. However, Hive and Hive-like table formats are still missing some capabilities due to limitations that Hive tables have, such as dropping and reintroducing stale data unintentionally. On top of that, Hive tables require a lot of knowledge of Hive internals. Some recent formats aim to remain backwards compatible with Hive, but inadvertently reintroduce these limitations.

This is not the case with Iceberg. Iceberg has the most support for query engines and puts a heavy emphasis on being a format that is interoperable. This improves the level of flexibility users have to address a wider array of use cases that may involve querying over a system like Snowflake or a data lakehouse running with Iceberg. All of this is made possible by the Iceberg specification that all these query engines must follow.

Finally, a great video presented by Ryan Blue that dives into Iceberg is, “Why you shouldn’t care about Iceberg.”

Metadata catalogs

Catalogs, in the context of Iceberg, refer to the central storage of metadata. Catalogs are also used to provide the atomic compare-and-swap needed to support serializable isolation in Iceberg. We’ll refer to them as metadata catalogs to avoid confusion with Trino catalogs.

The two existing catalogs supported in Trino’s Iceberg connector are the Hive Metastore Service and the AWS metastore counterpart of the Hive Metastore, Glue. While this provides a nice migration from the Hive model, many are looking to replace these rather cumbersome catalogs with something that’s lightweight. It turns out that the Iceberg connector only uses the Hive Metastore Service to point to top-level metadata files in Iceberg while the majority of metadata exist in the metastore files in storage. This makes it even more compelling to get rid of the complex Hive service in favor of simpler services. Two popular catalogs outside of these are the JDBC catalog and the REST catalog.

There are two PRs in progress to support these metadata catalogs in Trino:

Branching, tagging, and auditing, oh my!

Another feature set that is coming in Iceberg is the ability to use refs to alias your snapshots. This would enable branching and tagging behavior similar to git and treating the snapshot as a commit. This is yet another way that simplifies moving between known states of the data in Iceberg.

On a related note, branching and tagging will eventually be used in the audit integration in Iceberg. Auditing allows you to push a soft commit by making a snapshot available, but it is not initially published to the primary table. This is achieved using Spark and setting the spark.wap.id configuration property. This enables interesting patterns like Write-Audit-Publish (WAP) pattern, where you first write the data, audit it using a data quality tool like Great Expectations, and lastly publish the data to be visible from the main table. Currently, auditing has to use the cherry-pick operation to publish. This becomes more streamlined with branching and tagging.

The Puffin file format

The Puffin file format is an alternative to Parquet and ORC. This format stores information such as indexes and statistics about data managed in an Iceberg table that cannot be stored directly within the Iceberg manifest. A Puffin file contains arbitrary pieces of information called “blobs”, along with metadata necessary to interpret them.

This format was proposed by long-time Trino maintainer, Piotr Findeisen @findepi, to address a performance issue noted when using Trino on Iceberg. The Puffin format is a great extension for those using Iceberg tables, as it enables better query plans in Trino at the file level.

pyIceberg

The pyIceberg library is an exciting development that enables users to read their data directly from Iceberg into their own Python code easily.

Trino Iceberg connector updates

Almost every release has some sort of Iceberg improvement around planning or pushdown. If you want all the latest features and performance improvements described here, it’s important to keep up with the latest Trino version.

PR 13111: Scale table writers per task based on throughput

This PR of the episode was contributed by Gaurav Sehgal (@gaurav8297) to enable Trino to automatically scale writers. This PR aims to the number of task writers per worker.

You can enable this feature by setting scale_task_writers true in your configuration. Its initial test results are showing a sixfold speed increase.

Thank you so much to Gaurav and all the reviewers that got this PR through!

Demo: DML operations on Iceberg using Trino

For this demo of the episode, we use the same schema as the demo we ran in episode 15, and revise the syntax to include new features.

Let’s start up a local Trino coordinator and Hive metastore. Clone the repository and navigate to the iceberg/trino-iceberg-minio directory. Then start up the containers using Docker Compose.

git clone [email protected]:bitsondatadev/trino-getting-started.git
cd iceberg/trino-iceberg-minio
docker-compose up -d

Now open up your favorite Trino client and connect it to localhost:8080 to run the following commands:

/**
 * Make sure to first create a bucket names "logging" in MinIO before running
 */
CREATE SCHEMA iceberg.logging
WITH (location = 's3a://logging/');

/**
 * Create table 
 */
CREATE TABLE iceberg.logging.logs (
   level varchar NOT NULL,
   event_time timestamp(6) with time zone NOT NULL,
   message varchar NOT NULL,
   call_stack array(varchar)
)
WITH (
   format_version = 2, -- New property to specify Iceberg spec format. Default 2
   format = 'ORC',
   partitioning = ARRAY['day(event_time)','level']
);

/**
 * Inserting two records. Notice event_time is on the same day but different hours.
 */

INSERT INTO iceberg.logging.logs VALUES 
(
  'ERROR', 
  timestamp '2021-04-01 12:23:53.383345' AT TIME ZONE 'America/Los_Angeles', 
  '1 message',
  ARRAY ['Exception in thread "main" java.lang.NullPointerException']
),
(
  'ERROR', 
  timestamp '2021-04-01 13:36:23' AT TIME ZONE 'America/Los_Angeles', 
  '2 message', 
  ARRAY ['Exception in thread "main" java.lang.NullPointerException']
);

SELECT * FROM iceberg.logging.logs;
SELECT * FROM iceberg.logging."logs$partitions";

/**
 * Notice one partition was created for both records at the day granularity.
 */

/**
 * Update the partitioning from daily to hourly 🎉
 */
ALTER TABLE iceberg.logging.logs 
SET PROPERTIES partitioning = ARRAY['hour(event_time)'];

/**
 * Inserting three records. Notice event_time is on the same day but different hours.
 */
INSERT INTO iceberg.logging.logs VALUES 
(
  'ERROR', 
  timestamp '2021-04-01 15:55:23' AT TIME ZONE 'America/Los_Angeles', 
  '3 message', 
  ARRAY ['Exception in thread "main" java.lang.NullPointerException']
), 
(
  'WARN', 
  timestamp '2021-04-01 15:55:23' AT TIME ZONE 'America/Los_Angeles', 
  '4 message', 
  ARRAY ['bad things could be happening']
), 
(
  'WARN', 
  timestamp '2021-04-01 16:55:23' AT TIME ZONE 'America/Los_Angeles', 
  '5 message', 
  ARRAY ['bad things could be happening']
);

SELECT * FROM iceberg.logging.logs;
SELECT * FROM iceberg.logging."logs$partitions";

/**
 * Now there are three partitions:
 * 1) One partition at the day granularity containing our original records.
 * 2) One at the hour granularity for hour 15 containing two new records.
 * 3) One at the hour granularity for hour 16 containing the last new record.
 */

SELECT * FROM iceberg.logging.logs 
WHERE event_time < timestamp '2021-04-01 16:55:23' AT TIME ZONE 'America/Los_Angeles';

/**
 * This query correctly returns 4 records with only the first two partitions
 * being touched. Now let's check the snapshots.
 */

 
SELECT * FROM iceberg.logging.logs;

SELECT * FROM iceberg.logging."logs$snapshots";

/**
 * Update
 */
UPDATE
  iceberg.logging.logs
SET
  call_stack = call_stack || 'WHALE HELLO THERE!'
WHERE
  lower(level) = 'warn';

SELECT * FROM iceberg.logging.logs;

SELECT * FROM iceberg.logging."logs$snapshots";

/**
 * Read data from an old snapshot (Time travel)
 *
 * Old way: SELECT * FROM iceberg.logging."logs@2806470637437034115";
 */

SELECT * FROM iceberg.logging.logs FOR VERSION AS OF 2806470637437034115;

/**
 * Merge
 */
CREATE TABLE iceberg.logging.src (
   level varchar NOT NULL,
   message varchar NOT NULL,
   call_stack array(varchar)
)
WITH (
   format = 'ORC'
);

INSERT INTO iceberg.logging.src VALUES 
 (
   'ERROR',
   '3 message', 
   ARRAY ['This one will not show up because it is an ERROR']
 ), 
 (
   'WARN', 
   '4 message', 
   ARRAY ['This should show up']
 ), 
 (
   'WARN',
   '5 message', 
   ARRAY ['This should show up as well']
 );

MERGE INTO iceberg.logging.logs AS t
USING iceberg.logging.src AS s
ON s.message = t.message
WHEN MATCHED AND s.level = 'ERROR'
        THEN DELETE
WHEN MATCHED
    THEN UPDATE
        SET message = s.message || '-updated', 
            call_stack = s.call_stack || t.call_stack;

DROP TABLE iceberg.logging.logs;

DROP SCHEMA iceberg.logging;

This is just the tip of the iceberg that shows the powerful MERGE statement and the other features we have added to Iceberg!

Blog posts

Check out the in-person and virtual Trino Meetup groups.

If you want to learn more about Trino, check out the definitive guide from O’Reilly. You can download the free PDF or buy the book online.

Music for the show is from the Megaman 6 Game Play album by Krzysztof Slowikowski.