AWS Big Data Blog
Top 10 Performance Tuning Tips for Amazon Athena
May 2022: This post was reviewed and updated with more details like using EXPLAIN ANALYZE, updated compression, ORDER BY and JOIN tips, using partition indexing, updated stats (with performance improvements), added bonus tips.
Amazon Athena is an interactive query service that makes it easy to analyze data stored in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Athena is easy to use. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL.
In this post, we review the top 10 tips that can improve query performance. We focus on aspects related to storing data in Amazon S3 and tuning specific to queries.
This post assumes that you have knowledge of different file formats, such as Parquet, ORC, TEXTFILE, AVRO, CSV, TSV, and JSON.
This section discusses how to structure your data so that you can get the most out of Athena. You can apply the same practices to Amazon EMR data processing applications such as Spark, Presto, and Hive when your data is stored in Amazon S3. We discuss the following best practices:
- Partition your data
- Bucket your data
- Use compression
- Optimize file size
- Optimize columnar data store generation
1. Partition your data
Partitioning divides your table into parts and keeps the related data together based on column values such as date, country, and region. Partitions act as virtual columns. You define them at table creation, and they can help reduce the amount of data scanned per query, thereby improving performance. You can restrict the amount of data scanned by a query by specifying filters based on the partition. For more details, see Partitioning data in Athena.
Athena supports Hive partitioning, which follows one of two naming conventions. The first method is to partition the column name followed by an equal symbol (=) and then the value. For example:
If your dataset is partitioned in this format, then you can run the MSCK REPAIR table command to add partitions to your table automatically.
If the path of your data doesn’t follow the preceding format, you can add the partitions manually using the ALTER TABLE ADD PARTITION command for each partition. For example:
With this methodology, you can map any location with what values you want to refer them by.
The following example shows how data is partitioned on the year column on the flight table stored in an S3 bucket:
You can restrict the partitions that are scanned in a query by using the column in the ‘WHERE’ clause.
You can also use multiple columns as partition keys. You can scan the data for specific values, and so on.
When deciding the columns on which to partition, consider the following:
- Columns that are used as filters are good candidates for partitioning.
- Partitioning has a cost. As the number of partitions in your table increases, the higher the overhead of retrieving and processing the partition metadata, and the smaller your files. Partitioning too finely can wipe out the initial benefit.
- If your data is heavily skewed to one partition value, and most queries use that value, then the overhead may wipe out the initial benefit.
For example, the following table compares query runtimes between a partitioned and non-partitioned table. Both tables contain 74 GB data, uncompressed and stored in text format. The partitioned table is partitioned by the
l_shipdate column and has 2,526 partitions.
|Query||Non- Partitioned Table||Cost||Partitioned Table||Cost||Savings|
|.||Runtime||Data scanned||.||Runtime||Data scanned||.||,|
|SELECT count(*) FROM lineitem WHERE l_shipdate = ‘1996-09-01’||4.8 seconds||74.1 GB||$0.36||0.7 seconds||29.96 MB||$0.0001||
|SELECT count(*) FROM lineitem WHERE l_shipdate >= ‘1996-09-01’ AND l_shipdate < ‘1996-10-01’||4.4 seconds||74.1 GB||$0.36||2.0 seconds||898.58 MB||$0.004||
Running EXPLAIN ANALYZE on a query helps determine if a table is partitioned and if so, whether a partitioned column is used as a filter. Consider the following query run on a partitioned table:
In the following output, for the
TableScan operator, it shows a partition key filter was used, which led to reduction in the data scanned:
Partitioning also has a penalty if a partition filter isn’t used in the query, as shown in the following table. Make sure not to over-partition the data. Over partitioning leads to greater quantity of smaller files, which hurts performance, as shown later in this post.
|Query||Non- Partitioned Table||Partitioned Table||Savings|
|.||Runtime||Data scanned||Runtime||Data scanned||.|
|SELECT count(*) FROM lineitem;||3.4 seconds||74.1 GB||8.9 seconds||74.1 GB||62% slower|
If your table stored in an AWS Glue Data Catalog has tens and hundreds of thousands and millions of partitions, you can enable partition indexes on the table. With partition indexes, only the metadata for the partition value in the query’s filter is retrieved from the catalog instead of retrieving all the partitions’ metadata. The result is faster queries for such highly partitioned tables. The following table compares query runtimes between a partitioned table with no partition indexing and with partition indexing. The table contains approximately 100,000 partitions and uncompressed text data. The orders table is partitioned by the
|Query||Partition indexing = disabled||Partition indexing = enabled||Speed up|
|SELECT count(*) FROM orders where o_custkey BETWEEN 1 AND 100||19.5 seconds||1.2 seconds||16x|
To learn more about the benefits of the AWS Glue Data Catalog’s partition indexing in Athena, refer to Improve Amazon Athena query performance using AWS Glue Data Catalog partition indexes.
2. Bucket your data
Another way to partition your data is to bucket the data within a single partition. With bucketing, you can specify one or more columns containing rows that you want to group together, and put those rows into multiple buckets. This allows you to query only the bucket that you need to read when the bucketed columns value is specified, which can dramatically reduce the number of rows of data to read, which in turn reduces the cost of running the query.
When you’re selecting a column to be used for bucketing, we recommend that you select one that has high cardinality (that is, it has a large number of unique values), and that is frequently used to filter the data read during query time. An example of a good column to use for bucketing would be a primary key, such as a user ID for systems.
Within Athena, you can specify the bucketed column inside your CREATE TABLE statement by specifying CLUSTERED BY (<bucketed columns>) INTO <number of buckets> BUCKETS. The number of buckets should be so that the files are of optimal size. For more details, see the Optimize file sizes section.
To use bucketed tables within Athena, you must use Apache Hive to create the data files because Athena doesn’t support the Apache Spark bucketing format. For information about how to create bucketed tables, see LanguageManual DDL BucketedTables in the Apache Hive documentation.
Also note that Athena doesn’t support tables and partitions in which the number of files doesn’t match the number of buckets, such as when multiple INSERT INTO statements are run.
The following table shows the difference in a customer table where the
c_custkey column is used to create 32 buckets. The customer table is 2.29 GB in size.
|Query||Non- Bucketed Table||Cost||Bucketed Table Using c_custkey as Clustered Column||Cost||Savings|
|.||Runtime||Data scanned||.||Runtime||Data scanned||.||.|
|SELECT count(*) FROM customer where c_custkey = 12677856;||1.3 sec||2.29 GB||$0.01145||0.82 sec||72.94 MB||$0.0003645||97% cheaper
Running EXPLAIN ANALYZE on the preceding query shows how bucketing helped in reading less data from Amazon S3 for the
customer table. The following snippets of the EXPLAIN ANALYZE output on the non-bucketed and bucketed tables’ query highlights input rows and size of data to understand the difference.
The following is the output for the non-bucketed table:
The following is the output for the bucketed table:
3. Use Compression
Compressing your data can speed up your queries significantly, as long as the files are either of an optimal size (see the next section), or the files are splittable. The smaller data sizes reduce the data scanned from Amazon S3, resulting in lower costs of running queries. It also reduces the network traffic from Amazon S3 to Athena.
The following table summarizes the compression format support in Athena for each storage file format. The TEXTFILE format includes TSV, CSV, JSON, and custom SerDes for text.
|BZIP2||Read support only. Write not supported.||No||No||Yes|
|LZ4||No||Yes (raw/unframed)||No||Hadoop-compatible read support. No write support.|
|LZO||No||No||Yes||Hadoop-compatible read support. No write support.|
|SNAPPY||Raw/unframed read support. Write not supported.||Yes (raw/unframed)||Yes (raw/unframed)||Yes (Hadoop-compatible framing)|
A splittable file can be read in parallel by the execution engine in Athena, whereas an unsplittable file can’t be read in parallel. This means less time is taken in reading a splittable file as compared to an unsplittable file. AVRO, Parquet, and Orc are splittable irrespective of the compression codec used. For text files, only files compressed with BZIP2 and LZO codec are splittable. If other codecs are used on text files, avoid having one single large compressed file. Instead split it into multiple compressed files of optimal sizes, as discussed in the following section.
For Athena, we recommend using either Apache Parquet or Apache ORC, which compress data by default and are splittable.
You can compress your existing dataset using AWS Glue ETL jobs, Spark or Hive on Amazon EMR, or CTAS or INSERT INTO and UNLOAD statements in Athena. The following is an example script for compressing using AWS Glue:
4. Optimize file sizes
Queries run more efficiently when data scanning can be parallelized and when blocks of data can be read sequentially. Ensuring that your file formats are splittable helps with parallelism regardless of how large your files may be.
However, if your files are too small (generally less than 128 MB), the execution engine might be spending additional time with the overhead of opening S3 files, listing directories, getting object metadata, setting up data transfer, reading file headers, reading compression dictionaries, and so on. On the other hand, if your file is not splittable and the files are too large, the query processing waits until a single reader has completed reading the entire file. That can reduce parallelism.
One remedy to solve your small file problem is to use the S3DistCP utility on Amazon EMR. You can use it to combine smaller files into larger objects. You can also use S3DistCP to move large amounts of data in an optimized fashion from HDFS to Amazon S3, Amazon S3 to Amazon S3, and Amazon S3 to HDFS.
Some benefits of having larger files include faster listing, fewer Amazon S3 requests, and less metadata to manage.
For example, the following table compares query runtimes between two tables, one backed by a single large file and one by 100,000 small files. Both tables contain approximately 8 GB of data, stored in text format.
|Query||Number of Files||Runtime|
|SELECT count(*) FROM lineitem||100,000 files||13 seconds|
|SELECT count(*) FROM lineitem||1 file||1.3 seconds|
You can also use AWS Glue to split your data, as shown in the following example script.
5. Optimize columnar data store generation
Apache Parquet and Apache ORC are popular columnar data stores. They provide features that store data efficiently by employing column-wise compression, different encoding, compression based on data type, and predicate pushdown. They are also splittable. Generally, better compression ratios or skipping blocks of data means reading fewer bytes from Amazon S3, leading to better query performance and reduced costs of running queries. Parquet and ORC have a couple of parameters that you can tune for optimal performance.
One parameter that you can tune is the block size (or stripe size). The block size in Parquet (or stripe size in ORC) represents the maximum number rows that can fit into one block in terms of size in bytes. The larger the block or stripe size, the more rows you can store in each block. By default, the Parquet block size is 128 MB and the ORC stripe size is 64 MB. We recommend a larger block size if you have tables with many columns, to ensure that each column block remains at a size that allows for efficient sequential I/O.
Another parameter that could be tuned is the compression algorithm on data blocks. Parquet supports GZIP, Snappy (default), ZSTD, and LZO-based compression techniques. ORC supports ZLIB (default), LZ4, ZSTD, and Snappy compression techniques. We recommend starting with the default compression algorithm and testing with other compression algorithms if you have more than 10 GB of data.
Parquet and ORC file formats both support predicate pushdown (also called predicate filtering). Both formats have blocks of data that represent column values. Each block holds statistics for the block, such as max/min values. When a query is being run, these statistics determine whether the block should be read or skipped depending on the filter value used in the query. This helps reduce data scanned and improves the query runtime. To use this capability, add more filters in the query (for example, using a WHERE clause).
One way to optimize the number of blocks to be skipped is to identify and sort by a commonly filtered column before writing your ORC or Parquet files. This ensures that the range between the min and max of values within the block are as small as possible within each block. This gives it a better chance to be pruned and also reduces data scanned further.
The following table compares runtimes and data scanned for the same dataset in text GZIP, Parquet GZIP with no sorting, and Parquet GZIP with sorting on
|Query||select l_orderkey from lineitem where l_partkey = 17766770||Savings Compared to Text Format|
|Text GZIP data||Runtime||33.06 seconds||.|
|Data Scanned||22 GB||.|
|Parquet GZIP data with no sorting||Runtime||1.72 seconds||~94% faster|
|Data Scanned||2 GB||.|
|Parquet GZIP data sorted on
||Runtime||1.0 second||~96% faster|
|Data Scanned||34.34 MB||.|
You can convert your existing data to Parquet or ORC using Spark or Hive on Amazon EMR, AWS Glue ETL jobs, or CTAS or INSERT INTO and UNLOAD in Athena. See also the following resources:
- Extract, Transform and Load data into S3 data lake using CTAS and INSERT INTO statements in Amazon Athena
- Simplify your ETL and ML pipelines using the Amazon Athena UNLOAD feature
- Build a Data Lake Foundation with AWS Glue and Amazon S3
- Converting a large dataset to Parquet
- Converting to columnar formats
The Athena engine is built upon Presto. Understanding how it works provides insight into how you can optimize queries when running them. This section details the following best practices:
- Optimize ORDER BY
- Optimize joins
- Optimize GROUP BY
- Use approximate functions
- Only include the columns that you need
6. Optimize ORDER BY
The ORDER BY clause returns the results of a query in sort order. Athena uses distributed sort to run the sort operation in parallel on multiple nodes. If you’re using the ORDER BY clause to look at the top or bottom N values, use a LIMIT clause to reduce the cost of the sort, which results in a faster query runtime.
For example, the following table summarizes the runtime for a dataset with a 7.25 GB table, uncompressed in text format, with approximately 60 million rows.
|SELECT * FROM lineitem ORDER BY l_shipdate||274 seconds|
|SELECT * FROM lineitem ORDER BY l_shipdate LIMIT 10000||4.6 seconds|
7. Optimize joins
Choosing a right join order is critical for better query performance. When you join two tables, specify the larger table on the left side of join and the smaller table on the right side of the join. Athena distributes the table on the right to worker nodes, and then streams the table on the left to do the join. If the table on the right is smaller, then less memory is used and the query runs faster.
The following table shows the runtimes on a dataset with 74 GB total data, uncompressed in text format, with approximately 602 million rows.
|SELECT count(*) FROM lineitem, part WHERE lineitem.l_partkey = part.p_partkey||6.4 seconds|
|SELECT count(*) FROM part, lineitem WHERE part.p_partkey = lineitem.l_partkey||8.1 seconds|
|Savings / Speedup||~20% speed up|
When you join three and more tables, you may consider joining the large table with the smallest table first to reduce the intermediate result and join with the other tables.
Athena also supports dynamic filtering and dynamic partition pruning, which improves the query runtime and reduces data scanned for queries with joins and a very selective filter clause for the table on the right side of join, as shown in the following example. In the following query,
Table_B is a small table with a very selective filter (
column_A = “value”). After the selective filter is applied to
Table_B, a value list for a joined column
Table_B.date is extracted first, and it’s pushed down to a joined table
Table_A as a filter. It’s used to filter out unnecessary rows and partitions of
Table_A. This results in reading fewer rows and partitions from the source for
Table_A and helps reduce query runtime and data scan size, which in turn helps reduce the costs of running the query in Athena.
8. Optimize GROUP BY
The GROUP BY operator distributes rows based on the GROUP BY columns to worker nodes, which hold the GROUP BY values in memory. As rows are being ingested, the GROUP BY columns are looked up in memory and the values are compared. If the GROUP BY columns match, the values are aggregated together.
When using GROUP BY in your query, order the columns by the highest cardinality (that is, the most unique values, distributed evenly) to the lowest.
Another optimization is to limit the number of columns within the SELECT and GROUP BY clause to reduce the amount of memory required to store, because rows are held in memory and aggregated for the GROUP BY clause. The following example shows the speedup in the queries on reducing the columns in the SELECT clause.
|select “l_shipdate”,”l_receiptdate”,”l_commitdate”, “l_linestatus”, count(*) from “lineitem” group by 1,2,3,4||33.6 seconds|
|select “l_shipdate”, “l_linestatus”, count(*) from “lineitem” group by 1,2||6.2 seconds|
|Speedup||~81% speed up|
9. Use approximate functions
For exploring large datasets, a common use case is to find the count of distinct values for a certain column using COUNT(DISTINCT column). An example is looking at the number of unique users hitting a webpage.
When an exact number may not be required (for instance, if you’re looking for which webpages to deep dive into), consider using
approx_distinct(). This function tries to minimize the memory usage by counting unique hashes of values instead of entire strings. The drawback is that there is a standard error of 2.3%.
The following table summarizes the speedup on a dataset with a 74 GB table, uncompressed in text format, with approximately 600 million rows.
|SELECT count(distinct l_comment) FROM lineitem;||7.7 seconds|
|SELECT approx_distinct(l_comment) FROM lineitem;||4.6 seconds|
For more information, see Aggregate Functions in the Presto documentation.
10. Only include the columns that you need
When running your queries, limit the final SELECT statement to only the columns that you need instead of selecting all columns. Trimming the number of columns reduces the amount of data that needs to be processed through the entire query run pipeline. This especially helps when you’re querying tables that have large numbers of columns that are string-based, and when you perform multiple joins or aggregations. For columnar formats, it reduces the data scanned from Amazon S3 because only specific columns’ data is read.
The following table summarizes the speedup on a dataset with a 7.25 GB table, uncompressed in text format, with approximately 60 million rows.
|SELECT * FROM lineitem, orders, customer WHERE lineitem.l_orderkey = orders.o_orderkey AND customer.c_custkey = orders.o_custkey;||19.7 seconds|
|SELECT customer.c_name, lineitem.l_quantity, orders.o_totalprice FROM lineitem, orders, customer WHERE lineitem.l_orderkey = orders.o_orderkey AND customer.c_custkey = orders.o_custkey;||5.2 seconds|
|Savings / Speedup||73% faster|
In this section, we provide additional performance tuning tips.
Optimizing Partition Processing using partition projection
Processing partition information can be a bottleneck for Athena queries when you have a very large number of partitions and aren’t using AWS Glue partition indexing. You can use partition projection in Athena to speed up query processing of highly partitioned tables and automate partition management. Partition projection helps minimize this overhead by allowing you to query partitions by calculating partition information rather than retrieving it from a metastore. It eliminates the need to add partitions’ metadata to the AWS Glue table.
In partition projection, partition values and locations are calculated from configuration rather than read from a repository like the AWS Glue Data Catalog. Because in-memory operations are usually faster than remote operations, partition projection can reduce the runtime of queries against highly partitioned tables. Depending on the specific characteristics of the query and underlying data, partition projection can significantly reduce query runtime for queries that are constrained on partition metadata retrieval.
Using partition projection is ideal when your partitions’ schemas are the same or if the tables’ schema always accurately describes the partitions schemas. It can be used to partition very high cardinality columns like IDs, or date ranges at very fine granularity.
See Partition projection with Amazon Athena for more details.
Speed up queries producing large result sets using UNLOAD
Running a SELECT query in Athena produces a single result file in Amazon S3 in uncompressed CSV format. If your query is expected to output a large result set (such as 13 GB, as shown in the following example), then significant time is spent in writing results as one single file to Amazon S3. With UNLOAD, you can split the results into multiple files in Amazon S3, which reduces the time spent in the writing phase. You can also specify the result format (ORC, Parquet, AVRO, JSON, or TEXTFILE) and compression type (defaults to GZIP for Parquet, JSON, and TEXTFILE; and ZLIB for ORC) for the result set.
The following table shows a comparison between SELECT and UNLOAD statements. The query is expected to output approximately 13 GB of uncompressed data.
|Query||SELECT * FROM lineitem LIMIT 85700000||UNLOAD (SELECT * FROM lineitem LIMIT 85700000) to <s3-output-location> with (format=’TEXTFILE’)||Savings|
|Runtime||362 seconds||33.2 seconds||~90% faster|
|Result set||13 GB (CSV, uncompressed)||3.2 GB (CSV, gzip compressed)||~75% reduced storage|
This post covered our top 10 tips for optimizing your interactive analysis on Athena with the Presto engine. You can apply these same practices when using Presto on Amazon EMR.
About the Authors
Mert Hocanin is a Principal Big Data Architect with AWS Lake Formation
Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.