AWS Big Data Blog

Accelerate data science feature engineering on transactional data lakes using Amazon Athena with Apache Iceberg

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) and data sources residing in AWS, on-premises, or other cloud systems using SQL or Python. Athena is built on open-source Trino and Presto engines, and Apache Spark frameworks, with no provisioning or configuration effort required. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Apache Iceberg is an open table format for very large analytic datasets. It manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

Feature engineering is a process of identifying and transforming raw data (images, text files, videos, and so on), backfilling missing data, and adding one or more meaningful data elements to provide context so a machine learning (ML) model can learn from it. Data labeling is required for various use cases, including forecasting, computer vision, natural language processing, and speech recognition.

Combined with the capabilities of Athena, Apache Iceberg delivers a simplified workflow for data scientists to create new data features without needing to copy or recreate the entire dataset. You can create features using standard SQL on Athena without using any other service for feature engineering. Data scientists can reduce the time spent preparing and copying datasets, and instead focus on data feature engineering, experimentation, and analyzing data at scale.

In this post, we review the benefits of using Athena with the Apache Iceberg open table format and how it simplifies common feature engineering tasks for data scientists. We demonstrate how Athena can convert an existing table in Apache Iceberg format, then add columns, delete columns, and modify the data in the table without recreating or copying the dataset, and use these capabilities to create new features on Apache Iceberg tables.

Solution overview

Data scientists are generally accustomed to working with large datasets. Datasets are usually stored in either JSON, CSV, ORC, or Apache Parquet format, or similar read-optimized formats for fast read performance. Data scientists often create new data features, and backfill such data features with aggregated and ancillary data. Historically, this task was accomplished by creating a view on top of the table with the underlying data in Apache Parquet format, where such columns and data were added at runtime or by creating a new table with additional columns. Although this workflow is well-suited for many use cases, it’s inefficient for large datasets, because data would need to be generated at runtime or datasets would need to be copied and transformed.

Athena has introduced ACID (Atomicity, Consistency, Isolation, Durability) transaction capabilities that add INSERT, UPDATE, DELETE, MERGE, and time travel operations built on Apache Iceberg tables. These capabilities enable data scientists to create new data features and drop existing data features on existing datasets without worrying about copying or transforming the dataset or abstracting it with a view. Data scientists can focus on feature engineering work and avoid copying and transforming the datasets.

The Athena Iceberg UPDATE operation writes Apache Iceberg position delete files and newly updated rows as data files in the same transaction. You can make record corrections via a single UPDATE statement.

With the release of Athena engine version 3, the capabilities for Apache Iceberg tables are enhanced with the support for operations such as CREATE TABLE AS SELECT (CTAS) and MERGE commands that streamline the lifecycle management of your Iceberg data. CTAS makes it fast and efficient to create tables from other formats such as Apache Paquet, and MERGE INTO conditional updates, deletes, or inserts rows into an Iceberg table. A single statement can combine update, delete, and insert actions.

Prerequisites

Set up an Athena workgroup with Athena engine version 3 to use CTAS and MERGE commands with an Apache Iceberg table. To upgrade your existing Athena engine to version 3 in your Athena workgroup, follow the instructions in Upgrade to Athena engine version 3 to increase query performance and access more analytics features or refer to Changing the engine version in the Athena console.

Dataset

For demonstration, we use an Apache Parquet table that contains several million records of randomly distributed fictitious sales data from the last several years stored in an S3 bucket. Download the dataset, unzip it to your local computer, and upload it to your S3 bucket. In this post, we uploaded our dataset to s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/.

The following table shows the layout for the table customer_orders.

Column Name Data Type Description
orderkey string Order number for the order
custkey string Customer identification number
orderstatus string Status of the order
totalprice string Total price of the order
orderdate string Date of the order
orderpriority string Priority of the order
clerk string Name of the clerk who processed the order
shippriority string Priority on the shipping
name string Customer name
address string Customer address
nationkey string Customer nation key
phone string Customer phone number
acctbal string Customer account balance
mktsegment string Customer market segment

Perform feature engineering

As a data scientist, we want to perform feature engineering on the customer orders data by adding calculated one year total purchases and one year average purchases for each customer in the existing dataset. For demonstration purposes, we created the customer_orders table in the sampledb database using Athena as shown in the following DDL command. (You can use any of your existing datasets and follow the steps mentioned in this post.) The customer_orders dataset was generated and stored in the S3 bucket location s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/ in Parquet format. This table is not an Apache Iceberg table.

CREATE EXTERNAL TABLE sampledb.customer_orders(
  `orderkey` string, 
  `custkey` string, 
  `orderstatus` string, 
  `totalprice` string, 
  `orderdate` string, 
  `orderpriority` string, 
  `clerk` string, 
  `shippriority` string, 
  `name` string, 
  `address` string, 
  `nationkey` string, 
  `phone` string, 
  `acctbal` string, 
  `mktsegment` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/'
TBLPROPERTIES (
  'classification'='parquet');

Validate the data in the table by running a query:

SELECT * 
from sampledb.customer_orders 
limit 10;

We want to add new features to this table to get a deeper understanding of customer sales, which can result in faster model training and more valuable insights. To add new features to the dataset, convert the customer_orders Athena table to Apache Iceberg table on Athena. Issue a CTAS query statement to create a new table with Apache Iceberg format from the customer_orders table. While doing so, a new feature is added to get the total purchase amount in the past year (max year of the dataset) by each customer.

In the following CTAS query, a new column named one_year_sales_aggregate with the default value as 0.0 of data type double is added and table_type is set to ICEBERG:

CREATE TABLE  sampledb.customers_orders_aggregate
WITH (table_type = 'ICEBERG',
   format = 'PARQUET', 
   location = 's3://sample-iceberg-datasets-xxxxxxxxxxxx/sampledb/customer_orders_aggregate', 
   is_external = false
   ) 
AS 
SELECT 
orderkey,
custkey,
orderstatus,
totalprice,
orderdate, 
orderpriority, 
clerk, 
shippriority, 
name, 
address, 
nationkey, 
phone, 
acctbal, 
mktsegment,
0.0 as one_year_sales_aggregate
from sampledb.customer_orders;

Issue the following query to verify the data in the Apache Iceberg table with the new column one_year_sales_aggregate values as 0.0:

SELECT custkey, totalprice, one_year_sales_aggregate 
from sampledb.customers_orders_aggregate 
limit 10;

We want to populate the values for the new feature one_year_sales_aggregate in the dataset to get the total purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the Apache Iceberg table using Athena to populate values for the one_year_sales_aggregate feature:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y ') as    orderdate, 
            sum(CAST(totalprice as double)) as one_year_sales_aggregate
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y ') = (select date_format(max(CAST(orderdate as date)), '%Y ') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y ')) sales_one_year_agg
    ON (coa.custkey = sales_one_year_agg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_aggregate = sales_one_year_agg.one_year_sales_aggregate;

Issue the following query to validate the updated value for total spend by each customer in the past year:

SELECT custkey, totalprice, one_year_sales_aggregate
from sampledb.customers_orders_aggregate limit 10;

We decide to add another feature onto an existing Apache Iceberg table to compute and store the average purchase amount in the past year by each customer. Issue an ALTER query statement to add a new column to an existing table for feature one_year_sales_average:

ALTER TABLE sampledb.customers_orders_aggregate
ADD COLUMNS (one_year_sales_average double);

Before populating the values to this new feature, you can set the default value for the feature one_year_sales_average to 0.0. Using the same Apache Iceberg table on Athena, issue an UPDATE query statement to populate the value for the new feature as 0.0:

UPDATE sampledb.customers_orders_aggregate
SET one_year_sales_average = 0.0;

Issue the following query to verify the updated value for average spend by each customer in the past year is set to 0.0:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Now we want to populate the values for the new feature one_year_sales_average in the dataset to get the average purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the existing Apache Iceberg table on Athena using the Athena engine to populate values for the feature one_year_sales_average:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y') as orderdate, 
            avg(CAST(totalprice as double)) as one_year_sales_average
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y') = (select date_format(max(CAST(orderdate as date)), '%Y') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y')) sales_one_year_avg
    ON (coa.custkey = sales_one_year_avg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_average = sales_one_year_avg.one_year_sales_average;

Issue the following query to verify the updated values for average spend by each customer:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Once additional data features have been added to the dataset, data scientists generally proceed to train ML models and make inferences using Amazon Sagemaker or equivalent toolset.

Conclusion

In this post, we demonstrated how to perform feature engineering using Athena with Apache Iceberg. We also demonstrated using the CTAS query to create an Apache Iceberg table on Athena from an existing dataset in Apache Parquet format, adding new features in an existing Apache Iceberg table on Athena using the ALTER query, and using UPDATE and MERGE query statements to update the feature values of existing columns.

We encourage you to use CTAS queries to create tables quickly and efficiently, and use the MERGE query statement to synchronize tables in one step to simplify data preparations and update tasks when transforming the features using Athena with Apache Iceberg. If you have comments or feedback, please leave them in the comments section.


About the Authors

Vivek Gautam is a Data Architect with specialization in data lakes at AWS Professional Services. He works with enterprise customers building data products, analytics platforms, and solutions on AWS. When not building and designing modern data platforms, Vivek is a food enthusiast who also likes to explore new travel destinations and go on hikes.

Mikhail Vaynshteyn is a Solutions Architect with Amazon Web Services. Mikhail works with healthcare and life sciences customers to build solutions that help improve patients’ outcomes. Mikhail specializes in data analytics services.

Naresh Gautam is a Data Analytics and AI/ML leader at AWS with 20 years of experience, who enjoys helping customers architect highly available, high-performance, and cost-effective data analytics and AI/ML solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

Harsha Tadiparthi is a specialist Principal Solutions Architect, Analytics at AWS. He enjoys solving complex customer problems in databases and analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.