How to ETL at Petabyte-Scale with Trino

Vincent Poon on 2021-09-27

Trino (formerly known as PrestoSQL) is widely appreciated as a fast distributed SQL query engine, but there is precious little information online about using it for batch extract, transform, and load (ETL) ingestion (outside of the original Facebook paper), particularly at petabyte+ scale. After deciding to use Trino as a key piece of Salesforce’s Big Data platform, we developed Huron, a new internal system that uses Trino for running online analytical processing (OLAP) queries against our app log data. In this blog post, we share some of the key insights we discovered on our journey to using Trino to ETL petabytes of data with relatively low data latency.

Why Trino for ETL?

While Trino certainly has some shortcomings when it comes to ETL, such as lack of mid-query fault tolerance and limited expressive power, there are also some highly underrated advantages to using Trino for ETL:

Tips and tricks for running ETL workloads on Trino

CREATE VIEW user_view
AS
 SELECT *
 FROM fine_grained_partitions
 WHERE event_ts > last_compact_ts
 UNION ALL
 SELECT *
 FROM daily_partitions
 WHERE event_ts <= last_compact_ts

Compaction itself is a SQL statement, shown here in a simplified example:

INSERT INTO daily_partitions
SELECT *
FROM fine_grained_partitions
WHERE event_ts > last_compact_ts
 AND event_ts <= current_compact_ts

Scaling writes to object store

Our ETL pipelines write data to S3 using the Hive connector, and managing the writes here is perhaps the trickiest part to doing ETL at large scale with Trino. There is a delicate Goldilocks balance to be managed along multiple related dimensions:

There are several different configs that control this overall balance, and it can be initially difficult to grok how they all interact with each other. Here’s a diagram that makes the process a little easier to understand:

Every INSERT in Trino is executed by a number of writer nodes (i.e. Trino workers), and each of these nodes can have multiple threads (labeled here as “task_writer”) doing the the actual writes. Each thread can potentially write to every partition (in this example, data for each partition is spread across all writers).

There are a few session properties (as of Trino 356) associated with these:

The number of files that ends up getting created is thus a function of: num_writers * task_writer_count * num_partitions.

Getting good overall performance on both the write and read side is a matter of tuning all these variables to get reasonable file counts and sizes. In the end, the right settings to use will depend on a multitude of variables, like the size of the data and skew. For example, we use scale_writers=true when we know a given INSERT will have a relatively small amount of data. Some final pointers to keep in mind as you go about your tuning process:

Should I use Trino to ETL?

While it might not be suitable for every ETL scenario out there, we’ve seen that Trino does quite well for a certain class of ETL use cases. Trino does best where the ETL can be designed around some of Trino’s shortcomings (like keeping ETL queries short-running for easy failure recovery), and where retries and state management are handled by a robust external system like Apache Airflow. It also takes some careful partition planning and config tuning, but the end result can be a simple, fast, and effective system.

Despite our legacy log serialization formats being compute intensive to parse, we’ve managed to ingest at peak rates of >2.5 GB/sec, with data being available for querying within an average of <15 minutes. Going forward, we hope to improve upon our design by using Iceberg, which provides snapshot/serializable isolation and removes the metastore bottleneck, optimizing Trino further for our workloads, and extending our framework to other dataset types.

Thanks to Chinmay Kulkarni, Conor McAvoy, and Laura Lindeman for their review and feedback.

Join our Talent Network to see all of the open positions at Salesforce Engineering!