AWS Big Data Blog

Top 10 Performance Tuning Tips for Amazon Athena

February 2024: This post was reviewed and updated to reflect changes in Amazon Athena engine version 3, including cost-based optimization and query result reuse.

Amazon Athena is an interactive analytics service built on open source frameworks that make it straightforward to analyze data stored using open table and file formats in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Athena is easy to use, simply point to your data in Amazon S3, define the schema, and start querying using standard SQL.

In this post, we review the top 10 tips that can improve query performance. We focus on aspects related to storing data in Amazon S3 and tuning specific to queries.

Storage

This section discusses how to structure your data so that you can get the most out of Athena. You can apply the same practices to Amazon EMR data processing applications such as Spark, Trino, Presto, and Hive when your data is stored in Amazon S3. We discuss the following best practices:

  1. Partition your data
  2. Bucket your data
  3. Use compression
  4. Optimize file size
  5. Use columnar file formats

1. Partition your data

Partitioning divides your table into parts and keeps the related data together based on column values such as date, country, and region. Partitions act as virtual columns. You define them at table creation, and they can help reduce the amount of data scanned per query, thereby improving performance. You can restrict the amount of data scanned by a query by specifying filters based on the partition. For more details, see Partitioning data in Athena.

The following example shows a dataset partitioned by year, stored in an S3 bucket:

$ aws s3 ls s3://athena-examples/flight/parquet/
PRE year=1987/
PRE year=1988/
PRE year=1989/
PRE year=1990/
PRE year=1991/
PRE year=1992/
PRE year=1993/

A table for this dataset would have a PARTITIONED BY (year STRING) clause to tell Athena it is partitioned by year. After the table was created, each partition would have to be added, for example with ALTER TABLE ADD PARTITION, using an AWS Glue crawler, or by running MSCK REPAIR TABLE. The table could also be configured to use partition projection (see the bonus tip section for how to set that up).

When querying a partitioned table, you can restrict the partitions that are scanned by using the partition key in the WHERE clause:

SELECT dest, origin FROM flights WHERE year = '1991'

When this query runs, Athena will see that there is a predicate (filter) on the year partition key, and only load data from matching partitions. For this query, that means that only data in s3://athena-examples/flight/parquet/year=1991/ will be read.

Datasets can have multiple partitions keys. The following is an example from the NOAA Global Historical Climatology Network dataset from the Registry of Open Data on AWS. The dataset is partitioned by station and element, and the listing is for one particular station:

$ aws s3 ls --no-sign-request s3://noaa-ghcn-pds/parquet/by_station/STATION=ASN00023351/
PRE ELEMENT=DAPR/
PRE ELEMENT=DWPR/
PRE ELEMENT=MDPR/
PRE ELEMENT=PRCP/

A table for this dataset would have a PARTITIONED BY (station STRING, element STRING) clause to tell Athena it is partitioned this way.

When deciding the columns on which to partition, consider the following:

  • Pick partition keys that support your queries. Work backward from your queries and find fields that are often used to filter the dataset.
  • Partition keys should have a relatively low cardinality. As the number of partitions in your table increases, the higher the overhead of retrieving and processing the partition metadata, and the smaller your files. Partition keys with too many values can negate the benefit of partitioning.
  • If your data is heavily skewed to one partition value, and most queries use that value, the overhead may negate the benefit of partitioning.

Datasets that grow over time should generally be partitioned by date. A common pattern is queries that look at specific windows of time, for example the last week or last month. Partitioniong by date ensures that the amount of data read by these queries remains constant even as the size of the full dataset grows over time.

The following table compares query runtimes between a partitioned and non-partitioned table. The table is from the industry standard benchmark dataset TPC-H. Both versions of the table contain 74 GB data, uncompressed and stored in text format. The partitioned table is partitioned by the l_shipdate column and has 2,526 partitions.

Query Non- Partitioned Table Cost Partitioned Table Cost Savings
. Runtime Data scanned . Runtime Data scanned . .
SELECT COUNT(*)
FROM lineitem
WHERE l_shipdate = '1996-09-01'
4.8 seconds 74.1 GB $0.36 0.7 seconds 29.96 MB $0.0001

99% cheaper

85% faster

SELECT COUNT(*)
FROM lineitem
WHERE l_shipdate >= '1996-09-01' 
AND l_shipdate < '1996-10-01'
4.4 seconds 74.1 GB $0.36 2.0 seconds 898.58 MB $0.004

98% cheaper

54% faster

You can use the EXPLAIN command to see what partitions will be read by a query:

EXPLAIN
SELECT COUNT(*)
FROM lineitem
WHERE l_shipdate = '1996-09-01'

In the output, look for the SOURCE fragment, and within it the label PARTITION_KEY. This line will tell you which partitions will be read by the query:

…
Fragment 1 [SOURCE]
    Output layout: [count_0]
    Output partitioning: SINGLE []
    Aggregate[type = PARTIAL]
    │   Layout: [count_0:bigint]
    │   Estimates: {rows: 1 (9B), cpu: 0, memory: 9B, network: 0B}
    │   count_0 := count(*)
    └─ TableScan[table = awsdatacatalog:tpc_h:lineitem]
           Layout: []
           Estimates: {rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
           l_shipdate:string:PARTITION_KEY 
		   :: [[1996-09-01]]

If a table has multiple partition keys, there will be a line for each. If the query matches multiple values for a partition key, each value will be included in the output. For example, if the query was changed to select a range of values for l_shipdate, the last two lines could instead look like the following:

l_shipdate:string:PARTITION_KEY
     :: [[1996-09-01], [1996-09-02], [1996-09-03], [1996-09-04], [1996-09-05]]

Partitioning also has a penalty if a partition filter isn’t used in the query, as shown in the following table. Make sure not to over-partition the data. Over-partitioning leads to greater quantity of smaller files, which hurts performance. We cover this in more detail later in this post.

Query Non- Partitioned Table Partitioned Table Savings
. Runtime Data scanned Runtime Data scanned .
SELECT COUNT(*)
FROM lineitem
3.4 seconds 74.1 GB 8.9 seconds 74.1 GB 62% slower

Another penalty from partitioning is the time it takes to find the partitions that match the query, called partition pruning. A way to mitigate this is to enable partition indexes on the table. This can lead to better performance when a table has tens of thousands of partitions (or more). With partition indexes, only the metadata for the partition value in the query’s filter is retrieved from the catalog, instead of retrieving all the partitions’ metadata. The result is faster queries for such highly partitioned tables. The following table compares query runtimes between a partitioned table with no partition indexing and with partition indexing. The table contains approximately 100,000 partitions and uncompressed text data. The orders table is partitioned by the o_custkey column.

Query Partition indexing = disabled Partition indexing = enabled Speed up
. Runtime Runtime .
SELECT COUNT(*)
FROM orders
WHERE o_custkey BETWEEN 1 AND 100
19.5 seconds 1.2 seconds 16x

To learn more about the benefits of the AWS Glue Data Catalog’s partition indexing in Athena, refer to Improve Amazon Athena query performance using AWS Glue Data Catalog partition indexes.

See the section on creating optimized datasets later in this post for examples on how to partition your data.

2. Bucket your data

Another way to reduce the amount of data a query has to read is to bucket the data within each partition. Bucketing is a technique for distributing records into separate files based on the value of one of the columns. This ensures that all records with the same value will be in the same file. Bucketing is useful when you have a column with high cardinality (many distinct values) and many of your queries look up specific values of the column. Good candidates for bucketing are columns such as IDs for users or devices.

In Athena, if you have a dataset that is already bucketed, you can specify the bucketed column inside your CREATE TABLE statement by specifying CLUSTERED BY (<bucketed columns>) INTO <number of buckets> BUCKETS. Athena supports datasets bucketed with Hive or Spark, and you can create bucketed datasets with CREATE TABLE AS (CTAS) in Athena.

The following table shows the difference in a customer table where the c_custkey column is used to create 32 buckets. The customer table is 2.29 GB in size.

Query Non- Bucketed Table Cost Bucketed Table Using c_custkey as Clustered Column Cost Savings
. Runtime Data scanned . Runtime Data scanned . .
SELECT COUNT(*)
FROM customer
WHERE c_custkey = 12677856
1.3 sec 2.29 GB $0.01145 0.82 sec 72.94 MB $0.0003645 97% cheaper
37% faster

Running EXPLAIN ANALYZE on the preceding query shows how bucketing helped in reading less data from Amazon S3 for the customer table. The following snippets of the EXPLAIN ANALYZE output on the non-bucketed and bucketed tables’ query highlights input rows and size of data to understand the difference.

The following is the output for the non-bucketed table:

…
─ ScanFilterProject[table = awsdatacatalog:tpc_h:customer, filterPredicate = ("c_custkey" = 12677856), projectLocality = LOCAL, protectedBarrier = NONE]
           Layout: []
           Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
           CPU: 19.48s (99.94%), Scheduled: 37.43s (99.97%), Blocked: 0.00ns (0.00%), Output: 1 row (0B)
           Input avg.: 202702.70 rows, Input std.dev.: 4.83%
           c_custkey := c_custkey:int:REGULAR
           Input: 15000000 rows (2.29GB), Filtered: 100.00%, Physical input: 2.29GB, Physical input time: 0.00ms

The following is the output for the bucketed table:

…
─ ScanFilterProject[table = awsdatacatalog:tpc_h:customer buckets=32, filterPredicate = ("c_custkey" = 12677856), projectLocality = LOCAL, protectedBarrier = NONE]
           Layout: []
           Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
           CPU: 654.00ms (100.00%), Scheduled: 1.13s (100.00%), Blocked: 0.00ns (0.00%), Output: 1 row (0B)
           Input avg.: 156250.00 rows, Input std.dev.: 22.35%
           c_custkey := c_custkey:int:REGULAR
           Input: 468750 rows (72.94MB), Filtered: 100.00%, Physical input: 72.94MB, Physical input time: 0.00ns

You can find out more about how to work with bucketed data in Athena by reading the following resources:

See the section on creating optimized datasets later in this post for examples on how to bucket your data.

3. Use compression

Compressing your data can speed up your queries significantly. The smaller data sizes reduce the data scanned from Amazon S3, resulting in lower costs of running queries. It also reduces the network traffic from Amazon S3 to Athena.

Athena supports a variety of compression formats, including common formats like gzip, Snappy, and zstd. For the whole list of supported formats, see Athena compression support.

Querying compressed text data, such as JSON and CSV, requires special consideration. When Athena reads data, it assigns different ranges of files to different nodes, to maximize parallel processing of the data. Each range is known as a split and files that can be read in parallel are called splittable. Most of the common compression formats are not splittable—they require the reader to start from the beginning of the file. This means that if a dataset is a single compressed CSV file, for example, only one node can be used for query processing.

When you create datasets consisting of compressed text files, aim for a balance between the number of files and the file size. We discuss this more in the next section on optimizing file sizes.

Parquet and ORC files are always splittable because these formats compress sections of the files separately, and have metadata that contains the locations within the files for the different sections.

The gzip format provides good compression ratios and has a wide range support across other tools and services. The zstd (Zstandard) format is a newer compression format with a good balance between performance and compression ratio. The bzip2 and LZO compression formats are splittable, but are not recommended if you want performance and compatibility.

See the section on creating optimized datasets later in this post for examples on how to compress your data.

4. Optimize file sizes

Queries run more efficiently when data can be read in parallel, and as much data as possible can be read in a single read request. There is an overhead in reading each file, for example getting metadata, making the request to Amazon S3, and setting up compression dictionaries. This is usually not noticeable, but as the number of files grows, it can add up. To maximize the performance of queries, you should aim for a balance between the number of files and their size.

A general guideline is to aim for splits that are around 128 MB. A split is a part of a file, for example a byte range of an uncompressed text file, or a page of a Parquet file. As discussed in the section on compression, most compressed text files are not splittable, and are therefore processed as a single split. Analytics-optimized formats like Parquet and ORC are always splittable.

One reason why you may end up with many small files is over-partitioning, as discussed in the previous section on partitioning. An indication that your query performance suffers from too many small files is if the planning phase in the query stats is more than a few percent of the total running time. In the worst case, your queries may fail with an Amazon S3 error saying “Please reduce your request rate.” This happens when the number of files is so great that Athena exceeds Amazon S3 service quotas. For more information, see Best practices design patterns: optimizing Amazon S3 performance.

One remedy to solve your small file problem is to use the S3DistCP utility on Amazon EMR. You can use it to combine smaller files into larger objects. You can also use S3DistCP to move large amounts of data in an optimized fashion from HDFS to Amazon S3, Amazon S3 to Amazon S3, and Amazon S3 to HDFS. We discuss another alternative to reprocessing data using Athena Spark at the end of this section.

Some benefits of having fewer, larger files include faster listing, fewer Amazon S3 requests, and less metadata to manage.

For example, the following table compares the difference between a query that has to read 100,000 files with a query for the same dataset stored as a single file. Both sets of files contain the same rows, stored as uncompressed text files. The total amount of data is 74 GB.

Query Number of Files Runtime
SELECT COUNT(*)
FROM lineitem
100,000 files 11.5 seconds
SELECT COUNT(*)
FROM lineitem
1 file 4.3 seconds
Speedup . ~62%

Similarly, the following table compares the difference that the number of files makes when the data has been compressed. The data is the same as in the previous example, but compressed with gzip into 10 and 100 files, respectively.

Query Number of Files Average file size Runtime
SELECT COUNT(*)
FROM lineitem
10 files 2.4 GB 60 seconds
SELECT COUNT(*)
FROM lineitem
100 files 243 MB 6.8 seconds
Speedup . . ~88%

File size, the number of files, and whether the files are compressed can make a big difference to query performance. When the data is not compressed, Athena can process files in parallel, in optimal sizes. This makes processing a single uncompressed text file more efficient than 100,000 files. When the data is compressed, the number and size of the files makes an even bigger difference, but in this case, it’s important to have enough files to allow Athena to process the dataset in parallel.

See the section on creating optimized datasets later in this post for examples on how to rewrite your datasets to combine small files.

5. Use columnar file formats

Apache Parquet and Apache ORC are popular file formats for analytics workloads. They are often described as columnar file formats because they store data not by row, but by column. They also have features that allow query engines to reduce the amount of data that needs to be loaded in different ways. For example, by storing and compressing columns separately, you can achieve higher compression ratios, and only the columns referenced in a query need to be read.

Columnar file formats use multiple compression strategies for data. For example, a column with many repeated values can be encoded using run length encoding, where the value is stored once along with a repetition count, or dictionary encoded, where each value is replaced by a pointer to a lookup table. Textual data can be compressed with standard compression formats like gzip, Snappy, and zstd. For more details, see Athena compression support.

Parquet and ORC can be tuned for different datasets. For example, it can be beneficial to increase the block (Parquet) or stripe (ORC) size in some situations. When a dataset has many columns, we recommend increasing the size from the default 128 MB in Parquet and 64 MB in ORC. This ensures that enough values for each column are stored together and fewer reads are required.

Another way to tune a dataset using a columnar file format is to keep the data sorted by a column that is often included in queries. Parquet and ORC store metadata such as the minimum and maximum value of a column each block of data. This means that the query engine can skip reading a block of data if it sees that the values it contains don’t match the query. This is called predicate pushdown. For example, data often has some kind of timestamp, and by keeping it sorted within the files by this property, a query that looks for a specific range of time can skip reading data from blocks that are before or after the timestamp.

You can combine sorting by timestamp with partitioning to yield even better performance gains and cost savings. Let’s say that you’re often doing aggregations over a time window of a few hours. Partitioning by hour would be possible, but could risk producing too many and too small files. Instead, you can partition by day and sort the data by timestamp. This way, coarse-grained partitioning is used to reduce the set of files that will be included in the query to only those in matching partitions, and the sorting will be used to skip blocks within the remaining files. Just remember to include filters on both the partition keys and the timestamp column.

The following table compares runtimes and data scanned for the same dataset in text gzip, Parquet gzip with no sorting, and Parquet gzip with sorting on l_partkey.

Query

SELECT l_orderkey
FROM lineitem
WHERE l_partkey = 17766770

Savings Compared to Text Format
Text gzip data Runtime 11.9 seconds .
Data Scanned 23.7 GB .
Cost $0.1 .
Parquet gzip data with no sorting Runtime 2.1 seconds ~82% faster
Data Scanned 2.0 GB .
Cost $0.009 ~91% cheaper
Parquet gzip data sorted on l_partkey Runtime 1.1 second ~90% faster
Data Scanned 38.8 MB .
Cost $0.0001 ~99.9% cheaper

Creating optimized datasets

In this section, we show you how to use Athena Spark to transform a dataset and apply the optimizations that we discussed in the previous sections. You can also use this code in most other Spark runtimes, for example Amazon EMR Serverless or AWS Glue ETL. You can also use Athena SQL to transform data and apply many of the optimizations described in this post, but Athena Spark provides more configuration options and control over the process.

The following code first makes the tpc_h database the default. The location of this database will be used to determine where data is written. It then creates a new table called customer_optimized by completing the following actions:

  • Read all rows in the table customer
  • Reduce the number of files that will be written per bucket per partition to four, using coalesce
  • Sort records by c_name with sortWithinPartitions
  • Write the records partitioned by c_mktsegment and c_nationkey with partitionBy and bucketed by c_custkey into 32 buckets with bucketBy into Parquet files compressed with zstd

See the following code:

spark.sql("use tpc_h")
spark\
    .read.table("customer")\
    .coalesce(4)\
    .sortWithinPartitions("c_name")\
    .write\
    .partitionBy("c_mktsegment", "c_nationkey")\
    .bucketBy(32, "c_custkey")
    .saveAsTable("customer_optimized", format="parquet", compression="gzip")

This example shows all the optimizations at the same time. Depending on your use case, you may only need one or a few of them. Refer back to the earlier sections in this post for when each is of most use.

For more information about how to process data using Amazon EMR, EMR Serverless, AWS Glue ETL, or Athena SQL, see the following resources:

Query tuning

The Athena SQL engine is built on the open source distributed query engines Trino and Presto. Understanding how it works provides insight into how you can optimize queries when running them. This section details the following best practices:

  1. Optimize ORDER BY
  2. Optimize joins
  3. Optimize GROUP BY
  4. Use approximate functions
  5. Only include the columns that you need

6. Optimize ORDER BY

The ORDER BY clause returns the results of a query in sort order. Athena uses distributed sort to run the sort operation in parallel on multiple nodes. If you’re using the ORDER BY clause to look at the top or bottom N values, use a LIMIT clause to reduce the cost of the sort, which results in a faster query runtime.

For example, the following table summarizes the runtime for a dataset with a 7.25 GB table, uncompressed in text format, with approximately 60 million rows.

Query Runtime
SELECT *
FROM lineitem
ORDER BY l_shipdate
274 seconds
SELECT *
FROM lineitem
ORDER BY l_shipdate
LIMIT 10000
4.6 seconds
Speedup 98% faster

7. Optimize joins

Choosing the right join order is critical for better query performance. When you join two tables, specify the larger table on the left side of the join and the smaller table on the right side. For the most common type of joins that use equality conditions, Athena builds a lookup table from the table on the right and distributes it to the worker nodes. It then streams the table on the left, joining rows by looking up matching values in the lookup table. This is called a distributed hash join. Because the lookup table built from the table on the right side is kept in memory, the smaller that table is, the less memory will be used, and the faster the join will run.

Because the hash table is distributed across the worker nodes, data skew can affect performance. If many rows have the same values for the columns used in the join condition, one node can end up having to process a large part of the join itself, while other nodes are idle. For best performance, make sure that the columns in your join conditions have as uniform a distribution of values as possible.

The following table shows the runtimes on a dataset with 74 GB total data, uncompressed in text format. The lineitem table has around 600 million rows, and the part table around 20 million.

Query Runtime
SELECT COUNT(*)
FROM lineitem, part
WHERE l_partkey = p_partkey
6.4 seconds
SELECT COUNT(*)
FROM part, lineitem
WHERE p_partkey = l_partkey
8.1 seconds
Speedup ~20% faster

You can use the execution details visualizer on the Athena console to inspect the order that joins are executed. The visualizer also shows the number of rows that are joined from each table. See Viewing statistics and execution details for completed queries for more information on how to use the visualizer.

When you join three or more tables, consider joining the large table with the smallest table first to reduce the intermediate result and then join with the other tables.

Cost-based join optimizations

If a table has table statistics in the AWS Glue Data Catalog, Athena will use these to perform join reordering and aggregation pushdown using cost-based optimization (“cost” here refers to computational cost). When a table has statistics, the query optimizer is able to see which order of the tables is the most efficient and can perform the optimization automatically. This means you don’t have to manually ensure that the larger table is on the left side of the join.

For more information about using the cost-based optimizer, see Speed up queries with the cost-based optimizer in Amazon Athena and Using the cost-based optimizer.

Joining partitioned tables

When querying partitioned tables, it’s best to include filters on all partition keys, for all tables. This ensures that the query planner can skip as much as possible of listing and reading files.

In the following example, the orders and lineitem tables are partitioned by the order date: o_orderdate in orders, and l_orderdate in lineitem. In the first query, there is no predicate on l_orderdate, and the engine must scan all partitions in the lineitem table. When the order date is added to the join condition, the query planner can see that it only needs to load a single partition for two tables, and the amount of data scanned is reduced significantly.

Query Data scanned Runtime
SELECT AVG(l_extendedprice)
FROM lineitem
JOIN orders ON (l_orderkey = o_orderkey)
AND o_orderdate = '1993-07-08'
68.1 GB 106 seconds
SELECT AVG(l_extendedprice)
FROM lineitem
JOIN orders ON (l_orderkey = o_orderkey 
AND l_orderdate = o_orderdate)
AND o_orderdate = '1993-07-08'
35.4 MB 2 seconds

As described earlier, you can use EXPLAIN to inspect which partitions will be read by the query. This can be particularly important to do when joining multiple partitioned tables.

Sometimes, the partitions of one or more of the tables involved in a query depends on information discovered when running the query. In the worst case, this means that all partitions have to be read, because the query planner can’t determine the partitions from analyzing the query. However, in these cases, Athena can often skip reading partitions while the query runs using a mechanism called dynamic partitioning pruning. For example, when the engine sees that the join condition involves a partition key, and the number of values on the right side is low, it can broadcast this information between the worker nodes. The worker nodes then use this information to skip reading files in partitions that would otherwise be filtered out later by the join condition.

In the following example, the orders and lineitem table are partitioned by the order date (o_orderdate in orders, l_orderdate in lineitem). The lineitem table is about 75 GB of CSV in total, and orders is about 16 GB. The query calculates the average price of line items from orders with a specific set of criteria that appear in about 10% of the partitions. Because these partitions are not known in advance, in the worst case, this would mean that 90 GB of data would have to be scanned, but in practice it scans only 26.5 GB:

SELECT AVG(l_extendedprice)
FROM lineitem
JOIN orders ON (l_orderkey = o_orderkey AND l_orderdate = o_orderdate)
WHERE o_clerk = 'Clerk#000094772'
AND o_orderpriority = '1-URGENT'
AND o_orderstatus = 'F'

When the query runs, Athena collects the values of o_orderdate for rows that match the predicates. It broadcasts these across the cluster so that the nodes can skip reading partitions of the lineitem table that don’t match.

You can use the EXPLAIN command to confirm that Athena will perform dynamic partition pruning. Look for dynamicFilterAssignment in the output. For the query in this example, the explain plan looks like the following code:

…
Fragment 1 [HASH]
    Output layout: [avg_4]
    Output partitioning: SINGLE []
    Aggregate[type = PARTIAL]
    │   Layout: [avg_4:row(double, bigint)]
    │   Estimates: {rows: 1 (55B), cpu: ?, memory: 55B, network: 0B}
    │   avg_4 := avg("l_extendedprice")
    └─ InnerJoin[criteria = ("l_orderkey" = "o_orderkey") AND ("l_orderdate" = "o_orderdate"), hash = [$hashvalue, $hashvalue_6], distribution = PARTITIONED]
       │   Layout: [l_extendedprice:double]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
       │   Distribution: PARTITIONED
       │   dynamicFilterAssignments = {o_orderkey -> #df_562, o_orderdate -> #df_563}
       ├─ RemoteSource[sourceFragmentIds = [2]]
       │      Layout: [l_orderkey:integer, l_extendedprice:double, l_orderdate:varchar, $hashvalue:bigint]
       └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_6], arguments = ["o_orderkey", "o_orderdate"]]
          │   Layout: [o_orderkey:integer, o_orderdate:varchar, $hashvalue_6:bigint]
          │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
          └─ RemoteSource[sourceFragmentIds = [3]]
                 Layout: [o_orderkey:integer, o_orderdate:varchar, $hashvalue_7:bigint]
       …

For more information about dynamic partition pruning, see Dynamic filtering in the Trino documentation.

Beware of cross joins

The join condition can also make a big difference in join performance. If the join condition is complex, for example if it uses LIKE or >, it will be much more computationally demanding. In the worst case, every record from one side of the join must be compared to every record on the other side of the join. This is called a cross join. Whenever possible, use equality conditions.

You can use the EXPLAIN command to see what kind of join Athena will perform. For example, if you run EXPLAIN on a query with a join condition like ON t1.name LIKE (t2.prefix || '%'), you will see something like the following in the output:

Fragment 1 [HASH]
…
└─ CrossJoin[]

When you see a cross join in a query plan, consider rewriting the query to instead use an equality condition. Unless one of the tables is very small, queries with cross joins run a big risk of exceeding query timeout limits.

8. Optimize GROUP BY

When performing aggregations, you should include as few columns as possible in the GROUP BY clause to reduce the amount of CPU and memory required. Additionally, make sure that you are grouping by columns that have as uniform distribution of values as possible.

One cause of performance issues in queries with aggregations is data skew. This can happen when many rows have the same values for the columns in the GROUP BY clause. During aggregation, the rows are distributed across the worker nodes based on a hash of the columns in the GROUP BY clause. If the data is skewed, one node may have to process a large part of the aggregation itself, while other nodes are idle.

Redundant columns are often added to GROUP BY clauses because the SQL language requires that an expression is either in the GROUP BY clause or uses an aggregate function. For example, if you have a table with a customer_id and customer_name column, you often end up writing GROUP BY c_custkey, c_name when you want to aggregate over customers, even though there will only be one name for any one customer_id. One way around this that can speed up queries with many redundant columns in the GROUP BY clause is the ARBITRARY function. It’s an aggregate function that, as the name suggests, returns an arbitrary value from the group.

In this example, we want to know the number of orders per customer. When we join the customer table with the orders table, there will be one row per order, and we use GROUP BY c_custkey to aggregate these per customer. We want the customer name in the results, and use ARBITRARY(c_name) to avoid having to add the c_name column to the GROUP BY clause:

SELECT c_custkey,
	ARBITRARY(c_name) AS c_name,
	COUNT(*) AS order_count
FROM customer
JOIN orders ON (customer.c_custkey = orders.o_custkey)
GROUP BY c_custkey

Whenever you can, you should remove unnecessary columns from the GROUP BY clause. The speedup will not be noticeable when there is only a single column, like in the previous example. However, it can be critical for performance on queries over large datasets where there are many columns in the GROUP BY clause.

9. Use approximate functions

For exploring large datasets, a common use case is to find the count of distinct values for a certain column using COUNT(DISTINCT column). An example is looking at the number of unique users visiting a webpage.

When an exact number may not be required (for instance, if you’re looking for which webpages to deep dive into), consider using approx_distinct(column). This function tries to minimize the memory usage by counting unique hashes of values instead of entire strings. The drawback is that there is a standard error of 2.3%.

The following table summarizes the speedup on a dataset with a 74 GB table, uncompressed in text format, with approximately 600 million rows.

Query Runtime
SELECT COUNT(DISTINCT l_comment)
FROM lineitem
7.7 seconds
SELECT approx_distinct(l_comment)
FROM lineitem
4.6 seconds
Speedup ~40% faster

For more information, see Approximate aggregate functions in the Trino documentation.

10. Only include the columns that you need

When running your queries, limit the final SELECT statement to only the columns that you need instead of selecting all columns. Trimming the number of columns reduces the amount of data that needs to be processed through the entire query pipeline, and reduces the amount of data written in the final result. This especially helps when you’re querying tables that have a large number of columns that are string-based, and when you perform multiple joins or aggregations. For columnar formats, it reduces the data scanned from Amazon S3 because only specific columns’ data is read.

The following table summarizes the speedup on a dataset with a 7.25 GB table, uncompressed in text format, with approximately 60 million rows.

Query Runtime
SELECT *
FROM lineitem, orders, customer
WHERE l_orderkey = o_orderkey
AND c_custkey = o_custkey
19.7 seconds
SELECT c_name, l_quantity, o_totalprice
FROM lineitem, orders, customer
WHERE l_orderkey = o_orderkey
AND c_custkey = o_custkey
5.2 seconds
Speedup 73% faster

Bonus tips

In this section, we provide additional performance tuning tips, and new performance-oriented features launched since the first version of this post.

Optimize partition processing using partition projection

Processing partition information can be a bottleneck for Athena queries when you have a very large number of partitions and aren’t using AWS Glue partition indexing. You can use partition projection in Athena to speed up query processing of highly partitioned tables and automate partition management. Partition projection helps minimize this overhead by allowing you to query partitions by calculating partition information rather than retrieving it from a metastore. It eliminates the need to add partitions’ metadata to the AWS Glue table.

In partition projection, partition values and locations are calculated from configuration rather than read from a repository like the AWS Glue Data Catalog. Because in-memory operations are usually faster than remote operations, partition projection can reduce the runtime of queries against highly partitioned tables. Depending on the specific characteristics of the query and underlying data, partition projection can significantly reduce query runtime for queries that are constrained by partition metadata retrieval.

Using partition projection is ideal when your partitions’ schemas are the same or if the tables’ schema always accurately describes the partitions schemas. It can be used to partition very high cardinality columns like IDs, or date ranges at very fine granularity.

See Partition projection with Amazon Athena for more details.

Speed up queries producing large result sets using UNLOAD

Running a SELECT query in Athena produces a single result file in Amazon S3 in uncompressed CSV format. If your query is expected to output a large result, then significant time is spent in writing results as one single file to Amazon S3. With UNLOAD, you can split the results into multiple files in Amazon S3, which reduces the time spent in the writing phase. You can also specify the result format (ORC, Parquet, Avro, JSON, or TEXTFILE) and compression type (defaults to gzip for Parquet, JSON, and TEXTFILE; and zlib for ORC) for the result set.

The following table shows a comparison between SELECT and UNLOAD statements. The query is expected to output approximately 13 GB of uncompressed data.

Query SELECT * FROM lineitem LIMIT 85700000 UNLOAD (SELECT * FROM lineitem LIMIT 85700000) to <s3-output-location> with (format=’TEXTFILE’) Savings
Runtime 362 seconds 33.2 seconds ~90% faster
Result set 13 GB (CSV, uncompressed) 3.2 GB (CSV, gzip compressed) ~75% reduced storage

Reuse query results when the data hasn’t changed

Data lake datasets are often updated only daily, or a few times per day, but are also often queried more frequently. You may have a query that runs to populate a dashboard or every time a view in your application is accessed. If the data hasn’t changed since the last time it ran, there is no need to compute the result again. In fact, it will take longer and cost more to do so. In these situations, you can use query result reuse. This is a feature where you tell Athena that if the same query was run in, for example, the last 15 minutes, it should return the result for that run instead of computing it again. If there is such a result, Athena returns this immediately, and no data will be scanned.

For more information about query result reuse, see Reduce cost and improve query performance with Amazon Athena Query Result Reuse and Reusing query results.

Conclusion

This post covered our top 10 tips for optimizing your interactive analysis on Athena SQL. You can apply these same practices when using Trino on Amazon EMR.

You can also view the Turkic translated version of this post.


About the Authors

Mert Hocanin is a Principal Big Data Architect with AWS Lake Formation.

Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Theo Tolv is a Senior Analytics Architect based in Stockholm, Sweden. He’s worked with small and big data for most of his career, and has built applications running on AWS since 2008. In his spare time, he likes to tinker with electronics and read space opera.


Audit History

Last reviewed and updated in February 2024 by Theo Tolv | Sr. Analytics Architect