The Trino engine provides APIs to support row-level SQL
MERGE, a connector must provide an implementation
ConnectorMergeSink, which is typically layered on top of a
ConnectorPageSink, and define
methods to get a “rowId” column handle; get the row change paradigm;
and to start and complete the
The Trino engine machinery used to implement SQL
MERGE has now
been used to support SQL
UPDATE, replacing the
previous implementations. This means that all a connector needs to
do is implement support for SQL
MERGE, and the connector gets
all the DML operations.
Different query engines support varying definitions of SQL
Trino supports the strict SQL specification
ISO/IEC 9075, published
in 2016. As a simple example, given tables
source_table defined as:
CREATE TABLE accounts ( customer VARCHAR, purchases DECIMAL, address VARCHAR); INSERT INTO accounts (customer, purchases, address) VALUES ...; CREATE TABLE monthly_accounts_update ( customer VARCHAR, purchases DECIMAL, address VARCHAR); INSERT INTO monthly_accounts_update (customer, purchases, address) VALUES ...;
Here is a possible
MERGE operation, from
MERGE INTO accounts t USING monthly_accounts_update s ON (t.customer = s.customer) WHEN MATCHED AND s.address = 'Berkeley' THEN DELETE WHEN MATCHED AND s.customer = 'Joe Shmoe' THEN UPDATE SET purchases = purchases + 100.0 WHEN MATCHED THEN UPDATE SET purchases = s.purchases + t.purchases, address = s.address WHEN NOT MATCHED THEN INSERT (customer, purchases, address) VALUES (s.customer, s.purchases, s.address);
MERGE tries to match each
WHEN clause in source order. When
a match is found, the corresponding
is executed and subsequent
WHEN clauses are ignored.
MERGE supports two operations on the target table and source
when a row from the source table or query matches a row in the target table:
UPDATE, in which the columns in the target row are updated.
DELETE, in which the target row is deleted.
NOT MATCHED case, SQL
MERGE supports only
operations. The values inserted are arbitrary but usually come from
the unmatched row of the source table or query.
Different connectors have different ways of representing row updates,
imposed by the underlying storage systems. The Trino engine classifies
these different paradigms as elements of the
enumeration, returned by enumeration, returned by method
RowChangeParadigm enumeration values are:
CHANGE_ONLY_UPDATED_COLUMNS, intended for connectors that can update individual columns of rows identified by a
rowId. The corresponding merge processor class is
DELETE_ROW_AND_INSERT_ROW, intended for connectors that represent a row change as a row deletion paired with a row insertion. The corresponding merge processor class is
MERGE statement is processed by creating a
RIGHT JOIN between the
target table and the source, on the
MERGE criteria. The source may be
a table or an arbitrary query. For each row in the source table or query,
MERGE produces a
ROW object containing:
the data column values from the
INSERTcases. For the
DELETEcases, only the partition columns, which determine partitioning and bucketing, are non-NULL.
a boolean column containing
truefor source rows that matched some target row, and
an integer that identifies whether the merge case operation is
INSERT, or a source row for which no case matched. If a source row does not match any merge case, all data column values except those that determine distribution are null, and the operation number is -1.
SearchedCaseExpression is constructed from
RIGHT JOIN result
to represent the
WHEN clauses of the
MERGE. In the example above
MERGE is executed as if the
SearchedCaseExpression were written as:
SELECT CASE WHEN present AND s.address = 'Berkeley' THEN -- Null values for delete; present=true; operation DELETE=2, case_number=0 row(null, null, null, false, 2, 0) WHEN present AND s.customer = 'Joe Shmoe' THEN -- Update column values; present=true; operation UPDATE=3, case_number=1 row(t.customer, t.purchases + 100.0, t.address, true, 3, 1) WHEN present THEN -- Update column values; present=true; operation UPDATE=3, case_number=2 row(t.customer, s.purchases + t.purchases, s.address, true, 3, 2) WHEN (present IS NULL) THEN -- Insert column values; present=false; operation INSERT=1, case_number=3 row(s.customer, s.purchases, s.address, false, 1, 3) ELSE -- Null values for no case matched; present=false; operation=-1, -- case_number=-1 row(null, null, null, false, -1, -1) END FROM (SELECT *, true AS present FROM target_table) t RIGHT JOIN source_table s ON s.customer = t.customer;
The Trino engine executes the
RIGHT JOIN and
and ensures that no target table row matches more than one source expression
row, and ultimately creates a sequence of pages to be routed to the node that
MERGE target table rows are identified by
rowId column handle. For
handle is returned by
MERGE implementation allows
UPDATE to change
the values of columns that determine partitioning and/or bucketing, and so
it must “redistribute” rows from the
MERGE operation to the worker
nodes responsible for writing rows with the merged partitioning and/or
MERGE process in general requires redistribution of
merged rows among Trino nodes, the order of rows in pages to be stored
are indeterminate. Connectors like Hive that depend on an ascending
rowId order for deleted rows must sort the deleted rows before storing
To ensure that all inserted rows for a given partition end up on a
single node, the redistribution hash on the partition key/bucket column(s)
is applied to the page partition key(s). As a result of the hash, all
rows for a specific partition/bucket hash together, whether they
MATCHED rows or
NOT MATCHED rows.
For connectors whose
inserted rows are distributed using the layout supplied by
ConnectorMetadata.getInsertLayout(). For some connectors, the same
layout is used for updated rows. Other connectors require a special
layout for updated rows, supplied by
Connector support for
MERGE processing, the Trino engine calls:
ConnectorMetadata.getMergeRowIdColumnHandle(...)to get the
ConnectorMetadata.getRowChangeParadigm(...)to get the paradigm supported by the connector for changing existing table rows.
ConnectorMetadata.beginMerge(...)to get the a
ConnectorMergeTableHandlefor the merge operation. That
ConnectorMergeTableHandleobject contains whatever information the connector needs to specify the
ConnectorMetadata.getInsertLayout(...), from which it extracts the the list of partition or table columns that impact write redistribution.
ConnectorMetadata.getUpdateLayout(...). If that layout is non-empty, it is used to distribute updated rows resulting from the
On nodes that are targets of the hash, the Trino engine calls
ConnectorPageSinkProvider.createMergeSink(...) to create a
To write out each page of merged rows, the Trino engine calls
method iterates over the rows in the page, performing updates and deletes
MATCHED cases, and inserts in the
NOT MATCHED cases.
RowChangeParadigm``s, ``UPDATE operations translated into the
INSERT operations before
storeMergedRows(Page) is called.
To complete the
MERGE operation, the Trino engine calls
ConnectorMetadata.finishMerge(...), passing the table handle
and a collection of JSON objects encoded as
Slice instances. These
objects contain connector-specific information specifying what was changed
MERGE operation. Typically this JSON object contains the files
written and table and partition statistics generated by the
operation. The connector takes appropriate actions, if any.
RowChangeProcessor implementation for
MERGE implementation, each
corresponds to an internal Trino engine class that implements interface
RowChangeProcessor has one interesting method:
Page transformPage(Page). The format of the output page depends
The connector has no access to the
RowChangeProcessor instance – it
is used inside the Trino engine to transform the merge page rows into rows
to be stored, based on the connector’s choice of
The page supplied to
transformPage() consists of:
The write redistribution columns if any
For partitioned or bucketed tables, a long hash value column.
rowIdcolumn for the row from the target table if matched, or null if not matched
The merge case
The integer case number block
The byte is_distinct block, with value 0 if not distinct.
The merge case
RowBlock has the following layout:
Blocks for each column in the table, including partition columns, in table column order.
A block containing the boolean “present” value which is true if the source row matched a target row, and false otherwise.
A block containing the
MERGEcase operation number, encoded as
UPDATE= 3 and if no
MERGEcase matched, -1.
A block containing the number, starting with 0, for the
WHENclause that matched for the row, or -1 if no clause matched.
The page returned from
transformPage consists of:
All table columns, in table column order.
The merge case operation block.
The rowId block.
A byte block containing 1 if the row is an insert derived from an update operation, and 0 otherwise. This block is used to correctly calculate the count of rows changed for connectors that represent updates and deletes plus inserts.
must ensure that there are no rows whose operation number is -1 in
the page it returns.
Detecting duplicate matching target rows#
MERGE specification requires that in each
a single target table row must match at most one source row, after
MERGE case condition expression. The first step
toward finding these error is done by labeling each row in the target
table with a unique id, using an
AssignUniqueId node above the
target table scan. The projected results from the
have these unique ids for matched target table rows as well as
WHEN clause number. A
MarkDistinct node adds an
“is_distinct” column which is true if no other row has the same
unique id and
WHEN clause number, and false otherwise. If
any row has “is_distinct” = false, a
MERGE_TARGET_ROW_MULTIPLE_MATCHES exception is raised and
MERGE operation fails.
ConnectorMergeTableHandle defines one method,
getTableHandle() to retrieve the
originally passed to
To support SQL
ConnectorPageSinkProvider must implement
the method that creates the
ConnectorMergeSink createMergeSink( ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorMergeTableHandle mergeHandle)
As mentioned above, to support
MERGE, the connector must define an
ConnectorMergeSink, usually layered over the
ConnectorMergeSink is created by a call to
The only interesting methods are:
void storeMergedRows(Page page)
The Trino engine calls the
storeMergedRows(Page)method of the
ConnectorMergeSinkinstance returned by
ConnectorPageSinkProvider.createMergeSink(), passing the page generated by the
RowChangeProcessor.transformPage()method. That page consists of all table columns, in table column order, followed by the rowId column, followed by the operation column from the merge case
The job of
storeMergedRows()is iterate over the rows in the page, and process them based on the value of the operation column,
UPDATE, or ignore the row. By choosing appropriate paradigm, the connector can request that the UPDATE operation be transformed into
The Trino engine calls
finish()when all the data has been processed by a specific
ConnectorMergeSinkinstance. The connector returns a future containing a collection of
Slice, representing connector-specific information about the rows processed. Usually this includes the row count, and might include information like the files or partitions created or changed.
A connector implementing
MERGE must implement these
RowChangeParadigm getRowChangeParadigm( ConnectorSession session, ConnectorTableHandle tableHandle)
This method is called as the engine starts processing a
MERGEstatement. The connector must return a
RowChangeParadigmenum instance. If the connector does not support
MERGEit should throw a
NOT_SUPPORTEDexception, meaning that SQL
MERGEis not supported by the connector.
ColumnHandle getMergeRowIdColumnHandle( ConnectorSession session, ConnectorTableHandle tableHandle)
This method is called in the early stages of query planning for
MERGEstatements. The ColumnHandle returned provides the
rowIdused by the connector to identify rows to be merged, as well as any other fields of the row that the connector needs to complete the
Optional<ConnectorTableLayout> getInsertLayout( ConnectorSession session, ConnectorTableHandle tableHandle) This method is called during query planning to get the table layout to be used for rows inserted by the ``MERGE`` operation. For some connectors, this layout will be used for rows deleted as well.
Optional<ConnectorTableLayout> getUpdateLayout( ConnectorSession session, ConnectorTableHandle tableHandle) This method is called during query planning to get the table layout to be used for rows deleted by the ``MERGE`` operation. If the optional return value is present, the Trino engine will use the layout for updated rows. Otherwise, it will use the result of ``ConnectorMetadata.getInsertLayout`` to distribute updated rows.
ConnectorMergeTableHandle beginMerge( ConnectorSession session, ConnectorTableHandle tableHandle, MergeDetails mergeDetails)
As the last step in creating the
MERGEexecution plan, the connector’s
beginMerge()method is called, passing the
beginMerge()performs any orchestration needed in the connector to start processing the
MERGE. This orchestration varies from connector to connector. In the case of Hive connector operating on ACID tables, for example,
beginMerge()checks that the table is transactional and that all updated columns are writable, and starts a Hive Metastore transaction.
ConnectorMergeTableHandlewith any added information the connector needs when the handle is passed back to
finishMerge()and the split generation machinery. For most connectors, the returned table handle contains at least a flag identifying the table handle as a table handle for a
void finishMerge( ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection<Slice> fragments)
MERGEprocessing, the Trino engine accumulates the
Slicecollections returned by
ConnectorMergeSink.finish(). The engine calls
finishMerge(), passing the table handle and that collection of
Slicefragments. In response, the connector takes appropriate actions to complete the
MERGEoperation. Those actions might include committing an underlying transaction (if any) or freeing any other resources.