AWS Big Data Blog
Use Amazon Athena with Spark SQL for your open-source transactional table formats
AWS-powered data lakes, supported by the unmatched availability of Amazon Simple Storage Service (Amazon S3), can handle the scale, agility, and flexibility required to combine different data and analytics approaches. As data lakes have grown in size and matured in usage, a significant amount of effort can be spent keeping the data consistent with business events. To ensure files are updated in a transactionally consistent manner, a growing number of customers are using open-source transactional table formats such as Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake that help you store data with high compression rates, natively interface with your applications and frameworks, and simplify incremental data processing in data lakes built on Amazon S3. These formats enable ACID (atomicity, consistency, isolation, durability) transactions, upserts, and deletes, and advanced features such as time travel and snapshots that were previously only available in data warehouses. Each storage format implements this functionality in slightly different ways; for a comparison, refer to Choosing an open table format for your transactional data lake on AWS.
In 2023, AWS announced general availability for Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake in Amazon Athena for Apache Spark, which removes the need to install a separate connector or associated dependencies and manage versions, and simplifies the configuration steps required to use these frameworks.
In this post, we show you how to use Spark SQL in Amazon Athena notebooks and work with Iceberg, Hudi, and Delta Lake table formats. We demonstrate common operations such as creating databases and tables, inserting data into the tables, querying data, and looking at snapshots of the tables in Amazon S3 using Spark SQL in Athena.
Prerequisites
Complete the following prerequisites:
- Make sure you meet all the prerequisites mentioned in Run Spark SQL on Amazon Athena Spark.
- Create a database called
sparkblogdb
and a table callednoaa_pq
created in the AWS Glue Data Catalog as detailed in the post Run Spark SQL on Amazon Athena Spark. - Grant the AWS Identity and Access Management (IAM) role used in the Athena workgroup read and write permissions to an S3 bucket and prefix. For more information, refer to Amazon S3: Allows read and write access to objects in an S3 Bucket.
- Grant the IAM role used in the Athena workgroup
s3:DeleteObject
permission to an S3 bucket and prefix for cleanup. For more information, refer to the Delete Object permissions section in Amazon S3 actions.
Download and import example notebooks from Amazon S3
To follow along, download the notebooks discussed in this post from the following locations:
- Iceberg tutorial notebook: s3://athena-examples-us-east-1/athenasparksqlblog/notebooks/SparkSQL_iceberg.ipynb
- Hudi tutorial notebook: s3://athena-examples-us-east-1/athenasparksqlblog/notebooks/SparkSQL_hudi.ipynb
- Delta tutorial notebook: s3://athena-examples-us-east-1/athenasparksqlblog/notebooks/SparkSQL_delta.ipynb
After you download the notebooks, import them into your Athena Spark environment by following the To import a notebook section in Managing notebook files.
Navigate to specific Open Table Format section
If you are interested in Iceberg table format, navigate to Working with Apache Iceberg tables section.
If you are interested in Hudi table format, navigate to Working with Apache Hudi tables section.
If you are interested in Delta Lake table format, navigate to Working with Linux foundation Delta Lake tables section.
Working with Apache Iceberg tables
When using Spark notebooks in Athena, you can run SQL queries directly without having to use PySpark. We do this by using cell magics, which are special headers in a notebook cell that change the cell’s behavior. For SQL, we can add the %%sql
magic, which will interpret the entire cell contents as a SQL statement to be run on Athena.
In this section, we show how you can use SQL on Apache Spark for Athena to create, analyze, and manage Apache Iceberg tables.
Set up a notebook session
In order to use Apache Iceberg in Athena, while creating or editing a session, select the Apache Iceberg option by expanding the Apache Spark properties section. It will pre-populate the properties as shown in the following screenshot.
For steps, see Editing session details or Creating your own notebook.
The code used in this section is available in the SparkSQL_iceberg.ipynb file to follow along.
Create a database and Iceberg table
First, we create a database in the AWS Glue Data Catalog. With the following SQL, we can create a database called icebergdb
:
Next, in the database icebergdb
, we create an Iceberg table called noaa_iceberg
pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/
with your S3 bucket and prefix:
Insert data into the table
To populate the noaa_iceberg
Iceberg table, we insert data from the Parquet table sparkblogdb.noaa_pq
that was created as part of the prerequisites. You can do this using an INSERT INTO statement in Spark:
Alternatively, you can use CREATE TABLE AS SELECT with the USING iceberg clause to create an Iceberg table and insert data from a source table in one step:
Query the Iceberg table
Now that the data is inserted in the Iceberg table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature by year for the 'SEATTLE TACOMA AIRPORT, WA US'
location:
We get following output.
Update data in the Iceberg table
Let’s look at how to update data in our table. We want to update the station name 'SEATTLE TACOMA AIRPORT, WA US'
to 'Sea-Tac'
. Using Spark SQL, we can run an UPDATE statement against the Iceberg table:
We can then run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac'
location:
We get the following output.
Compact data files
Open table formats like Iceberg work by creating delta changes in file storage, and tracking the versions of rows through manifest files. More data files leads to more metadata stored in manifest files, and small data files often cause an unnecessary amount of metadata, resulting in less efficient queries and higher Amazon S3 access costs. Running Iceberg’s rewrite_data_files
procedure in Spark for Athena will compact data files, combining many small delta change files into a smaller set of read-optimized Parquet files. Compacting files speeds up the read operation when queried. To run compaction on our table, run the following Spark SQL:
rewrite_data_files offers options to specify your sort strategy, which can help reorganize and compact data.
List table snapshots
Each write, update, delete, upsert, and compaction operation on an Iceberg table creates a new snapshot of a table while keeping the old data and metadata around for snapshot isolation and time travel. To list the snapshots of an Iceberg table, run the following Spark SQL statement:
Expire old snapshots
Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. It will never remove files that are still required by a non-expired snapshot. In Spark for Athena, run the following SQL to expire snapshots for the table icebergdb.noaa_iceberg
that are older than a specific timestamp:
Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff
. The output will give a count of the number of data and metadata files deleted.
Drop the table and database
You can run the following Spark SQL to clean up the Iceberg tables and associated data in Amazon S3 from this exercise:
Run the following Spark SQL to remove the database icebergdb:
To learn more about all the operations you can perform on Iceberg tables using Spark for Athena, refer to Spark Queries and Spark Procedures in the Iceberg documentation.
Working with Apache Hudi tables
Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Apache Hudi tables.
Set up a notebook session
In order to use Apache Hudi in Athena, while creating or editing a session, select the Apache Hudi option by expanding the Apache Spark properties section.
For steps, see Editing session details or Creating your own notebook.
The code used in this section should be available in the SparkSQL_hudi.ipynb file to follow along.
Create a database and Hudi table
First, we create a database called hudidb
that will be stored in the AWS Glue Data Catalog followed by Hudi table creation:
We create a Hudi table pointing to a location in Amazon S3 where we will load the data. Note that the table is of copy-on-write type. It is defined by type= 'cow'
in the table DDL. We have defined station and date as the multiple primary keys and preCombinedField as year. Also, the table is partitioned on year. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/
with your S3 bucket and prefix:
Insert data into the table
Like with Iceberg, we use the INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq
table created in the previous post:
Query the Hudi table
Now that the table is created, let’s run a query to find the maximum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US'
location:
Update data in the Hudi table
Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US'
to 'Sea–Tac'
. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_hudi
table:
We run the previous SELECT query to find the maximum recorded temperature for the 'Sea-Tac'
location:
Run time travel queries
We can use time travel queries in SQL on Athena to analyze past data snapshots. For example:
This query checks the Seattle Airport temperature data as of a specific time in the past. The timestamp clause lets us travel back without altering current data. Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff
.
Optimize query speed with clustering
To improve query performance, you can perform clustering on Hudi tables using SQL in Spark for Athena:
Compact tables
Compaction is a table service employed by Hudi specifically in Merge On Read (MOR) tables to merge updates from row-based log files to the corresponding columnar-based base file periodically to produce a new version of the base file. Compaction is not applicable to Copy On Write (COW) tables and only applies to MOR tables. You can run the following query in Spark for Athena to perform compaction on MOR tables:
Drop the table and database
Run the following Spark SQL to remove the Hudi table you created and associated data from the Amazon S3 location:
Run the following Spark SQL to remove the database hudidb
:
To learn about all the operations you can perform on Hudi tables using Spark for Athena, refer to SQL DDL and Procedures in the Hudi documentation.
Working with Linux foundation Delta Lake tables
Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Delta Lake tables.
Set up a notebook session
In order to use Delta Lake in Spark for Athena, while creating or editing a session, select Linux Foundation Delta Lake by expanding the Apache Spark properties section.
For steps, see Editing session details or Creating your own notebook.
The code used in this section should be available in the SparkSQL_delta.ipynb file to follow along.
Create a database and Delta Lake table
In this section, we create a database in the AWS Glue Data Catalog. Using following SQL, we can create a database called deltalakedb
:
Next, in the database deltalakedb
, we create a Delta Lake table called noaa_delta
pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/
with your S3 bucket and prefix:
Insert data into the table
We use an INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq
table created in the previous post:
You can also use CREATE TABLE AS SELECT to create a Delta Lake table and insert data from a source table in one query.
Query the Delta Lake table
Now that the data is inserted in the Delta Lake table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US'
location:
Update data in the Delta lake table
Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US'
to 'Sea–Tac'
. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_delta
table:
We can run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac'
location, and the result should be the same as earlier:
Compact data files
In Spark for Athena, you can run OPTIMIZE on the Delta Lake table, which will compact the small files into larger files, so the queries are not burdened by the small file overhead. To perform the compaction operation, run the following query:
Refer to Optimizations in the Delta Lake documentation for different options available while running OPTIMIZE.
Remove files no longer referenced by a Delta Lake table
You can remove files stored in Amazon S3 that are no longer referenced by a Delta Lake table and are older than the retention threshold by running the VACCUM command on the table using Spark for Athena:
Refer to Remove files no longer referenced by a Delta table in the Delta Lake documentation for options available with VACUUM.
Drop the table and database
Run the following Spark SQL to remove the Delta Lake table you created:
Run the following Spark SQL to remove the database deltalakedb
:
Running DROP TABLE DDL on the Delta Lake table and database deletes the metadata for these objects, but doesn’t automatically delete the data files in Amazon S3. You can run the following Python code in the notebook’s cell to delete the data from the S3 location:
To learn more about the SQL statements that you can run on a Delta Lake table using Spark for Athena, refer to the quickstart in the Delta Lake documentation.
Conclusion
This post demonstrated how to use Spark SQL in Athena notebooks to create databases and tables, insert and query data, and perform common operations like updates, compactions, and time travel on Hudi, Delta Lake, and Iceberg tables. Open table formats add ACID transactions, upserts, and deletes to data lakes, overcoming limitations of raw object storage. By removing the need to install separate connectors, Spark on Athena’s built-in integration reduces configuration steps and management overhead when using these popular frameworks for building reliable data lakes on Amazon S3. To learn more about selecting an open table format for your data lake workloads, refer to Choosing an open table format for your transactional data lake on AWS.
About the Authors
Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.
Raj Devnath is a Product Manager at AWS on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.