AWS Big Data Blog

Get a quick start with Apache Hudi, Apache Iceberg, and Delta Lake with Amazon EMR on EKS

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can keep your data as is in your object store or file-based storage without having to first structure the data. Additionally, you can run different types of analytics against your loosely formatted data lake—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML) to guide better decisions. Due to the flexibility and cost effectiveness that a data lake offers, it’s very popular with customers who are looking to implement data analytics and AI/ML use cases.

Due to the immutable nature of the underlying storage in the cloud, one of the challenges in data processing is updating or deleting a subset of identified records from a data lake. Another challenge is making concurrent changes to the data lake. Implementing these tasks is time consuming and costly.

In this post, we explore three open-source transactional file formats: Apache Hudi, Apache Iceberg, and Delta Lake to help us to overcome these data lake challenges. We focus on how to get started with these data storage frameworks via real-world use case. As an example, we demonstrate how to handle incremental data change in a data lake by implementing a Slowly Changing Dimension Type 2 solution (SCD2) with Hudi, Iceberg, and Delta Lake, then deploy the applications with Amazon EMR on EKS.

ACID challenge in data lakes

In analytics, the data lake plays an important role as an immutable and agile data storage layer. Unlike traditional data warehouses or data mart implementations, we make no assumptions on the data schema in a data lake and can define whatever schemas required by our use cases. It’s up to the downstream consumption layer to make sense of that data for their own purposes.

One of the most common challenges is supporting ACID (Atomicity, Consistency, Isolation, Durability) transactions in a data lake. For example, how do we run queries that return consistent and up-to-date results while new data is continuously being ingested or existing data is being modified?

Let’s try to understand the data problem with a real-world scenario. Assume we centralize customer contact datasets from multiple sources to an Amazon Simple Storage Service (Amazon S3)-backed data lake, and we want to keep all the historical records for analysis and reporting. We face the following challenges:

  • We keep creating append-only files in Amazon S3 to track the contact data changes (insert, update, delete) in near-real time.
  • Consistency and atomicity aren’t guaranteed because we just dump data files from multiple sources without knowing whether the entire operation is successful or not.
  • We don’t have an isolation guarantee whenever multiple workloads are simultaneously reading and writing to the same target contact table.
  • We track every single activity at source, including duplicates caused by the retry mechanism and accidental data changes that are then reverted. This leads to the creation of a large volume of append-only files. The performance of extract, transform, and load (ETL) jobs decreases as all the data files are read each time.
  • We have to shorten the file retention period to reduce the data scan and read performance.

In this post, we walk through a simple SCD2 ETL example designed for solving the ACID transaction problem with the help of Hudi, Iceberg, and Delta Lake. We also show how to deploy the ACID solution with EMR on EKS and query the results by Amazon Athena.

Custom library dependencies with EMR on EKS

By default, Hudi and Iceberg are supported by Amazon EMR as out-of-the-box features. For this demonstration, we use EMR on EKS release 6.8.0, which contains Apache Iceberg 0.14.0-amzn-0 and Apache Hudi 0.11.1-amzn-0. To find out the latest and past versions that Amazon EMR supports, check out the Hudi release history and the Iceberg release history tables. The runtime binary files of these frameworks can be found in the Spark’s class path location within each EMR on EKS image. See Amazon EMR on EKS release versions for the list of supported versions and applications.

As of this writing, Amazon EMR does not include Delta Lake by default. There are two ways to make it available in EMR on EKS:

  • At the application level – You install Delta libraries by setting a Spark configuration spark.jars or --jars command-line argument in your submission script. The JAR files will be downloaded and distributed to each Spark Executor and Driver pod when starting a job.
  • At the Docker container level – You can customize an EMR on EKS image by packaging Delta dependencies into a single Docker container that promotes portability and simplifies dependency management for each workload

Other custom library dependencies can be managed the same way as for Delta Lake—passing a comma-separated list of JAR files in the Spark configuration at job submission, or packaging all the required libraries into a Docker image.

Solution overview

The solution provides two sample CSV files as the data source: initial_contact.csv and update_contacts.csv. They were generated by a Python script with the Faker package. For more details, check out the tutorial on GitHub.

The following diagram describes a high-level architecture of the solution and different services being used.

The workflow steps are as follows:

  1. Ingest the first CSV file from a source S3 bucket. The data is being processed by running a Spark ETL job with EMR on EKS. The application contains either the Hudi, Iceberg, or Delta framework.
  2. Store the initial table in Hudi, Iceberg, or Delta file format in a target S3 bucket (curated). We use the AWS Glue Data Catalog as the hive metastore. Optionally, you can configure Amazon DynamoDB as a lock manager for the concurrency controls.
  3. Ingest a second CSV file that contains new records and some changes to the existing ones.
  4. Perform SCD2 via Hudi, Iceberg, or Delta in the Spark ETL job.
  5. Query the Hudi, Iceberg, or Delta table stored on the target S3 bucket in Athena

To simplify the demo, we have accommodated steps 1–4 into a single Spark application.


Install the following tools:

curl -fsSL -o \

chmod 700
export DESIRED_VERSION=v3.8.2
helm version

For a quick start, you can use AWS CloudShell which includes the AWS CLI and kubectl already.

Clone the project

Download the sample project either to your computer or the CloudShell console:

git clone
cd emr-on-eks-hudi-iceberg-delta

Set up the environment

Run the following script to set up a test environment. The infrastructure deployment includes the following resources:

  • A new S3 bucket to store sample data and job code.
  • An Amazon Elastic Kubernetes Service (Amazon EKS) cluster (version 1.21) in a new VPC across two Availability Zones.
  • An EMR virtual cluster in the same VPC, registered to the emr namespace in Amazon EKS.
  • An AWS Identity and Access Management (IAM) job execution role contains DynamoDB access, because we use DynamoDB to provide concurrency controls that ensure atomic transaction with the Hudi and Iceberg tables.
export AWS_REGION=us-east-1
export EKSCLUSTER_NAME=eks-quickstart
# Upload sample contact data to S3
export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
aws s3 sync data s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/data

Job execution role

The provisioning includes an IAM job execution role called emr-on-eks-quickstart-execution-role that allows your EMR on EKS jobs access to the required AWS services. It contains AWS Glue permissions because we use the Data Catalog as our metastore.

See the following code:

    "Effect": "Allow",
    "Action": ["glue:Get*","glue:BatchCreatePartition","glue:UpdateTable","glue:CreateTable"],
    "Resource": [

Additionally, the role contains DynamoDB permissions, because we use the service as the lock manager. It provides concurrency controls that ensure atomic transaction with our Hudi and Iceberg tables. If a DynamoDB table with the given name doesn’t exist, a new table is created with the billing mode set as pay-per-request. More details can be found in the following framework examples.

    "Sid": "DDBLockManager",
    "Effect": "Allow",
    "Action": [
    "Resource": [

Example 1: Run Apache Hudi with EMR on EKS

The following steps provide a quick start for you to implement SCD Type 2 data processing with the Hudi framework. To learn more, refer to Build Slowly Changing Dimensions Type 2 (SCD2) with Apache Spark and Apache Hudi on Amazon EMR.

The following code snippet demonstrates the SCD type2 implementation logic. It creates Hudi tables in a default database in the Glue Data Catalog. The full version is in the script

# Read incremental contact CSV file with extra SCD columns
delta_csv_df ="csv")\
.withColumn("ts", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_from", lit(current_timestamp()).cast(TimestampType()))\
.withColumn("valid_to", lit("").cast(TimestampType()))\
.withColumn('iscurrent', lit(1).cast("int"))

## Find existing records to be expired
join_cond = [initial_hudi_df.checksum != delta_csv_df.checksum,
             initial_hudi_df.iscurrent == 1]
contact_to_update_df = (initial_hudi_df.join(delta_csv_df, join_cond)
                      .withColumn('iscurrent', lit(0).cast("int"))
merged_contact_df = delta_csv_df.unionByName(contact_to_update_df)

# Upsert
                    .option('hoodie.datasource.write.operation', 'upsert')\
                    .options(**hudiOptions) \

In the job script, the hudiOptions were set to use the AWS Glue Data Catalog and enable the DynamoDB-based Optimistic Concurrency Control (OCC). For more information about concurrency control and alternatives for lock providers, refer to Concurrency Control.

hudiOptions = {
    # sync to Glue catalog
    # DynamoDB based locking mechanisms
    "hoodie.write.concurrency.mode":"optimistic_concurrency_control", #default is SINGLE_WRITER
    "hoodie.cleaner.policy.failed.writes":"LAZY", #Hudi will delete any files written by failed writes to re-claim space
    "hoodie.write.lock.dynamodb.region": REGION,
    "hoodie.write.lock.dynamodb.endpoint_url": f"dynamodb.{REGION}"
  1. Upload the job scripts to Amazon S3:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync hudi/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/
  2. Submit Hudi jobs with EMR on EKS to create SCD2 tables:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1

    Hudi supports two tables types: Copy on Write (CoW) and Merge on Read (MoR). The following is the code snippet to create a CoW table. For the complete job scripts for each table type, refer to and

    aws emr-containers start-job-run \
      --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
      --name em68-hudi-cow \
      --execution-role-arn $EMR_ROLE_ARN \
      --release-label emr-6.8.0-latest \
      --job-driver '{
      "sparkSubmitJobDriver": {
          "entryPoint": "s3://'$S3BUCKET'/blog/",
          "sparkSubmitParameters": "--jars local:///usr/lib/hudi/hudi-spark-bundle.jar,local:///usr/lib/spark/external/lib/spark-avro.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
      --configuration-overrides '{
        "applicationConfiguration": [
            "classification": "spark-defaults", 
            "properties": {
              "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
              "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
  3. Check the job status on the EMR virtual cluster console.
  4. Query the output in Athena:

    select * from hudi_contact_cow where id=103

    select * from hudi_contact_mor_rt where id=103

Example 2: Run Apache Iceberg with EMR on EKS

Starting with Amazon EMR version 6.6.0, you can use Apache Spark 3 on EMR on EKS with the Iceberg table format. For more information on how Iceberg works in an immutable data lake, see Build a high-performance, ACID compliant, evolving data lake using Apache Iceberg on Amazon EMR.

The sample job creates an Iceberg table iceberg_contact in the default database of AWS Glue. The full version is in the script. The following code snippet shows the SCD2 type of MERGE operation:

# Read incremental CSV file with extra SCD2 columns\

# Update existing records which are changed in the update file
contact_update_qry = """
    WITH contact_to_update AS (
          SELECT target.*
          FROM glue_catalog.default.iceberg_contact AS target
          JOIN iceberg_contact_update AS source 
          ON =
          WHERE target.checksum != source.checksum
            AND target.iscurrent = 1
          SELECT * FROM iceberg_contact_update
    ),contact_updated AS (
        SELECT *, LEAD(valid_from) OVER (PARTITION BY id ORDER BY valid_from) AS eff_from
        FROM contact_to_update
    SELECT id,name,email,state,ts,valid_from,
        CAST(COALESCE(eff_from, null) AS Timestamp) AS valid_to,
        CASE WHEN eff_from IS NULL THEN 1 ELSE 0 END AS iscurrent,
    FROM contact_updated
# Upsert
    MERGE INTO glue_catalog.default.iceberg_contact tgt
    USING ({contact_update_qry}) src
    ON =
    AND tgt.checksum = src.checksum

As demonstrated earlier when discussing the job execution role, the role emr-on-eks-quickstart-execution-role granted access to the required DynamoDB table myIcebergLockTable, because the table is used to obtain locks on Iceberg tables, in case of multiple concurrent write operations against a single table. For more information on Iceberg’s lock manager, refer to DynamoDB Lock Manager.

  1. Upload the application scripts to the example S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync iceberg/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/
  2. Submit the job with EMR on EKS to create an SCD2 Iceberg table:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1

    The full version code is in the script. The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-iceberg \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
        "sparkSubmitJobDriver": {
        "entryPoint": "s3://'$S3BUCKET'/blog/",
        "sparkSubmitParameters": "--jars local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar --conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
        "applicationConfiguration": [
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.glue_catalog.catalog-impl": "",
        "spark.sql.catalog.glue_catalog.warehouse": "s3://'$S3BUCKET'/iceberg/",
        "": "",
        "spark.sql.catalog.glue_catalog.lock-impl": "",
        "spark.sql.catalog.glue_catalog.lock.table": "myIcebergLockTable"
  3. Check the job status on the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from iceberg_contact where id=103

Example 3: Run open-source Delta Lake with EMR on EKS

Delta Lake 2.1.x is compatible with Apache Spark 3.3.x. Check out the compatibility list for other versions of Delta Lake and Spark. In this post, we use Amazon EMR release 6.8 (Spark 3.3.0) to demonstrate the SCD2 implementation in a data lake.

The following is the Delta code snippet to load initial dataset; the incremental load MERGE logic is highly similar to the Iceberg example. As a one-off task, there should be two tables set up on the same data:

  • The Delta table delta_table_contact – Defined on the TABLE_LOCATION at ‘s3://{S3_BUCKET_NAME}/delta/delta_contact’. The MERGE/UPSERT operation must be implemented on the Delta destination table. Athena can’t query this table directly, instead it reads from a manifest file stored in the same location, which is a text file containing a list of data files to read for querying a table. It is described as an Athena table below.
  • The Athena table delta_contact – Defined on the manifest location s3://{S3_BUCKET_NAME}/delta/delta_contact/_symlink_format_manifest/. All read operations from Athena must use this table.

The full version code is in the  script. The code snippet is as follows:

# Read initial contact CSV file and create a Delta table with extra SCD2 columns
df_intial_csv =\

spark.sql("GENERATE symlink_format_manifest FOR TABLE delta_table_contact")
spark.sql("ALTER TABLE delta_table_contact SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)")

# Create a queriable table in Athena
    CREATE EXTERNAL TABLE IF NOT EXISTS default.delta_contact (
LOCATION '{TABLE_LOCATION}/_symlink_format_manifest/'""")

The SQL statement GENERATE symlink_format_manifest FOR TABLE ... is a required step to set up the Athena for Delta Lake. Whenever the data in a Delta table is updated, you must regenerate the manifests. Therefore, we use ALTER TABLE .... SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true) to automate the manifest refresh as a one-off setup.

  1. Upload the Delta sample scripts to the S3 bucket:
    export AWS_REGION=us-east-1
    export ACCOUNTID=$(aws sts get-caller-identity --query Account --output text)
    aws s3 sync delta/ s3://emr-on-eks-quickstart-${ACCOUNTID}-${AWS_REGION}/blog/
  2. Submit the job with EMR on EKS:
    export EMRCLUSTER_NAME=emr-on-eks-quickstart
    export AWS_REGION=us-east-1

    The full version code is in the script. The open-source Delta JAR files must be included in the spark.jars. Alternatively, follow the instructions in How to customize Docker images and build a custom EMR on EKS image to accommodate the Delta dependencies.

    "spark.jars": ","

    The code snippet is as follows:

    aws emr-containers start-job-run \
    --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
    --name em68-delta \
    --execution-role-arn $EMR_ROLE_ARN \
    --release-label emr-6.8.0-latest \
    --job-driver '{
       "sparkSubmitJobDriver": {
       "entryPoint": "s3://'$S3BUCKET'/blog/",
       "sparkSubmitParameters": "--conf spark.executor.memory=2G --conf spark.executor.cores=2"}}' \
    --configuration-overrides '{
       "applicationConfiguration": [
        "classification": "spark-defaults",
        "properties": {
        "spark.sql.extensions": "",
    "spark.jars": ","
  3. Check the job status from the EMR on EKS console.
  4. When the job is complete, query the table in Athena:
    select * from delta_contact where id=103

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script (change the Region if necessary):

export EMRCLUSTER_NAME=emr-on-eks-quickstart
export AWS_REGION=us-east-1


Implementing an ACID-compliant data lake with EMR on EKS enables you focus more on delivering business value, instead of worrying about managing complexities and reliabilities at the data storage layer.

This post presented three different transactional storage frameworks that can meet your ACID needs. They ensure you never read partial data (Atomicity). The read/write isolation allows you to see consistent snapshots of the data, even if an update occurs at the same time (Consistency and Isolation). All the transactions are stored directly to the underlying Amazon S3-backed data lake, which is designed for 11 9’s of durability (Durability).

For more information, check out the sample GitHub repository used in this post and the EMR on EKS Workshop. They will get you started with running your familiar transactional framework with EMR on EKS. If you want dive deep into each storage format, check out the following posts:

About the authors

Amir Shenavandeh is a Sr Analytics Specialist Solutions Architect and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimisation. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Amit Maindola is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.