Commander Bun Bun loves sippin' on Pinot after a hard day of data exploration!
Release notes discussed: https://trino.io/docs/current/release/release-353.html Martin’s list:
Manfred’s notes:
Before diving into Pinot, I think it’s worthwhile to discuss some theoretical background to motivate some of the use cases Pinot solves for. We cover the concept of data cubes and how they are used in traditional data warehousing to speed up queries and minimize unnecessary work on your OLAP system.
In data analytics, there are many access patterns that tend to repeat themselves over and over again. It is very common to need to split and merge data based on the date and time values. Or perhaps you ask a lot of questions based on a specific customer, or even a specific product. Answering these questions typically involves aggregation of data like sums, averages, counts, etc… Wouldn’t it make sense to cache some of these intermediary results?
A common way to visualize the columns that are commonly bucketed to some values or range of values is to show them as a cube, that is sliced up into smaller dimensions. This actually derives from the traditional form of OLAP, multi-dimensional OLAP (MOLAP).
This cube represents a caching of data aggregations that are grouped by commonly used dimensions. For example, the displayed cube would be the pre-aggregation of the following query:
SELECT part, store, customer, COUNT(*)
FROM cube_table
GROUP BY part, store, customer
If we want to get the data for a particular customer, we can take a “slice” of that cube by specifying a particular customer. The following query returns the green square above from our cube.
SELECT part, store, COUNT(*)
FROM cube_table
WHERE customer = "Bob"
GROUP BY part, store
Now what if we want to flatten one of the dimensions? While this can be managed
with a GROUP BY
as before, but depending on the system may ignore any cached
data and scan over all the rows. For this, SQL reserved a special set of
keywords around cubes. We won’t dive into that in depth now, but for our current
goal of flattening a dimension, we can use ROLLUP
. Using the keyword ROLLUP
indicates to the underlying system that you intend to aggregate over the
pre-materialized data rather than scan over all rows to compute again. This
gives you the total count of parts per store using the counts of the data cube.
SELECT part, store, COUNT(*)
FROM cube_table
GROUP BY ROLLUP (part, store)
Now, although we used simple counts, you can precompute a lot of other aggregate data like sums, min, max, percentile, etc… These can service various queries that are commonly queried and don’t require a new computation every time. That is the goal of MOLAP and data cubes.
Now let’s move on to Apache Pinot. It is a realtime distributed OLAP datastore, designed to answer OLAP queries with low latency. Although there may be a lot of words there that overlap with the Trino description, the key differentiators are realtime and low latency. Trino performs batch processing and is not a realtime system where Pinot is great for ingesting data in batch or stream. The other key word, low latency could technically apply to both Pinot and Trino but in the context of realtime subsecond latency, Trino is slow compared to Pinot. This is due to the specialized indexes that Pinot uses to store the data that we cover shortly. Importantly, another big distinction is that Trino does not store any data itself. It is purely a query engine. Xiang has a really great summary slide that easily shows the strengths of each system and why they work so well together.
While Trino is not as fast as Pinot, it is able to handle a broader set of use cases like performing broad joins over open data formats in data lakes. This is what motivated work on the Trino Pinot connector. You can have the speed of Pinot, while having the flexibility of Trino.
Now that you understand the common use case for Pinot, it’s important to know the main goals of Pinot.
Another goal of Pinot is to revive the value of data from a historical context. Data reaches a particular point in its lifecycle where it becomes less valuable as it ages. While all data is able to add some value no matter what the age, there’s a tradeoff of scanning multiple rows to glean information from antiquated data. Pinot aims to remove this tradeoff as most questions around historical data are queried in aggregate and this can be summarized and queried at a low cost.
We just covered Pinot theory and goals, let’s take a quick look at the architecture.
A Pinot cluster consists of a controller, broker, server, and optionally a minion to purge data.
Our guest on the show today, Elon Azoulay, is the author of this PR, so we can ask him all about it now.
To put this PR to the test, we set up a Pinot cluster using Docker Compose.
To load the data, we’re going to use a simple batch import, but you can also insert the data in a stream using Kafka.
Let’s start up the Pinot cluster along with the required Zookeeper and Kafka
broker. Clone this repository and navigate to the pinot/trino-pinot
directory.
git clone [email protected]:bitsondatadev/trino-getting-started.git
cd community_tutorials/pinot/trino-pinot
docker-compose up -d
To do batch insert, we will stage a csv file to read the data in. Create a directory underneath a temp folder locally and then submit this to Pinot.
mkdir -p /tmp/pinot-quick-start/rawdata
echo "studentID,firstName,lastName,gender,subject,score,timestampInEpoch
200,Lucy,Smith,Female,Maths,3.8,1570863600000
200,Lucy,Smith,Female,English,3.5,1571036400000
201,Bob,King,Male,Maths,3.2,1571900400000
202,Nick,Young,Male,Physics,3.6,1572418800000" > /tmp/pinot-quick-start/rawdata/transcript.csv
In order for Pinot to understand the CSV data, we must provide it a schema.
echo "{
\"schemaName\": \"transcript\",
\"dimensionFieldSpecs\": [
{
\"name\": \"studentID\",
\"dataType\": \"INT\"
},
{
\"name\": \"firstName\",
\"dataType\": \"STRING\"
},
{
\"name\": \"lastName\",
\"dataType\": \"STRING\"
},
{
\"name\": \"gender\",
\"dataType\": \"STRING\"
},
{
\"name\": \"subject\",
\"dataType\": \"STRING\"
}
],
\"metricFieldSpecs\": [
{
\"name\": \"score\",
\"dataType\": \"FLOAT\"
}
],
\"dateTimeFieldSpecs\": [{
\"name\": \"timestampInEpoch\",
\"dataType\": \"LONG\",
\"format\" : \"1:MILLISECONDS:EPOCH\",
\"granularity\": \"1:MILLISECONDS\"
}]
}" > /tmp/pinot-quick-start/transcript-schema.json
Now we are almost ready to create the table. Instead of adding table configurations as part of the SQL command, Pinot enables you to store table configurations as a file. This is a nice option that decouples the DDL which makes for simpler scripting in batch setups.
echo "{
\"tableName\": \"transcript\",
\"segmentsConfig\" : {
\"timeColumnName\": \"timestampInEpoch\",
\"timeType\": \"MILLISECONDS\",
\"replication\" : \"1\",
\"schemaName\" : \"transcript\"
},
\"tableIndexConfig\" : {
\"invertedIndexColumns\" : [],
\"loadMode\" : \"MMAP\"
},
\"tenants\" : {
\"broker\":\"DefaultTenant\",
\"server\":\"DefaultTenant\"
},
\"tableType\":\"OFFLINE\",
\"metadata\": {}
}" > /tmp/pinot-quick-start/transcript-table-offline.json
Once you create these three files and verify that docker containers are running,
we can now run the Add Table
command:
docker run --rm -ti \
--network=trino-pinot_trino-network \
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
--name pinot-batch-table-creation \
apachepinot/pinot:latest AddTable \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-offline.json \
-controllerHost pinot-controller \
-controllerPort 9000 -exec
Now that the table exists, we can see it in the Pinot web UI. Let’s insert some data using a batch job specification:
echo "executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
schemaURI: 'http://pinot-controller:9000/tables/transcript/schema'
tableConfigURI: 'http://pinot-controller:9000/tables/transcript'
pinotClusterSpecs:
- controllerURI: 'http://pinot-controller:9000'" > /tmp/pinot-quick-start/docker-job-spec.yml
Now run this batch job by running the LaunchDataIngestionJob
task.
docker run --rm -ti \
--network=trino-pinot_trino-network \
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
--name pinot-data-ingestion-job \
apachepinot/pinot:latest LaunchDataIngestionJob \
-jobSpecFile /tmp/pinot-quick-start/docker-job-spec.yml
We modified this demo from the tutorials available on the Pinot website:
The passthrough queries may be failing due to upper case constants that need to
be surrounded with UPPER()
. For example 'Foo'
in this query would be
rendered as all lowercase once it is passed to Pinot:
SELECT *
FROM "SELECT col1, col2, COUNT(*) FROM pinot_table WHERE col2 = 'FOO' GROUP BY col1, col2"
The fix is to pass 'Foo'
to UPPER()
in the passthrough query.
SELECT *
FROM "SELECT col1, col2, COUNT(*) FROM pinot_table WHERE col2 = UPPER('FOO') GROUP BY col1, col2"
It could also be due to parsing of functions in filters. A workaround is to put
the filter outside of the double quotes, which can work in some cases. For
example, column table names can be mixed case as the connector will auto resolve
them. If there are mixed case constants would not work with upper()
:
SELECT *
FROM "SELECT col1, col2, COUNT(*) FROM pinot_table WHERE col2 = 'Foo' GROUP BY col1, col2"
The filter can be hoisted into the outer query:
SELECT *
FROM "SELECT col1, col2, COUNT(*) FROM pinot_table GROUP BY col1, col2" WHERE col2 = 'Foo';
There is ongoing work to improve this parsing: Pinot filter clause parsing (PR 7161).
Blogs
Trino Meetup Groups
Latest training from David, Dain, and Martin(Now with timestamps!):
If you want to learn more about Trino, check out the definitive guide from OReilly. You can download the free PDF or buy the book online.
Music for the show is from the Megaman 6 Game Play album by Krzysztof Słowikowski.