AWS Big Data Blog

Run Apache XTable on Amazon MWAA to translate open table formats

Open table formats (OTFs) like Apache Iceberg are being increasingly adopted, for example, to improve transactional consistency of a data lake or to consolidate batch and streaming data pipelines on a single file format and reduce complexity. In practice, architects need to integrate the chosen format with the various layers of a modern data platform. However, the level of support for the different OTFs varies across common analytical services.

Commercial vendors and the open source community have recognized this situation and are working on interoperability between table formats. One approach is to make a single physical dataset readable in different formats by translating its metadata and avoiding reprocessing of actual data files. Apache XTable is an open source solution that follows this approach and provides abstractions and tools for the translation of open table format metadata.

In this post, we show you how to get started with Apache XTable on AWS and how you can use it in a batch pipeline orchestrated with Amazon Managed Workflows for Apache Airflow (Amazon MWAA). To understand how XTable and similar solutions work, we start with a high-level background on metadata management in an OTF and then dive deeper into XTable and its usage.

Open table formats

Open table formats overcome the gaps of traditional storage formats of data lakes such as Apache Hive tables. They provide abstractions and capabilities known from relational databases like transactional consistency and the ability to create, update, or delete single records. In addition, they help manage schema evolution.

In order to understand how the XTable metadata translation approach works, you must first understand how the metadata of an OTF is represented on the storage layer.

An OTF comprises a data layer and a metadata layer, which are both represented as files on storage. The data layer contains the data files. The metadata layer contains metadata files that keep track of the data files and the transactionally consistent sequence of changes to these. The following figure illustrates this configuration.

Inspecting the files of an Iceberg table on storage, we identify the metadata layer through the folder metadata. Adjacent to it are the data files—in this example, as snappy-compressed Parquet:

<table base folder>
├── metadata # contains metadata files
│ ├── 00000-6c64b16d-affa-4f0e-8635-a996ec13a7fa.metadata.json
│ ├── 23ba9e94-7819-4661-b698-f512f5b51054-m0.avro
│ └── snap-5514083086862319381-1-23ba9e94-7819-4661-b698-f512f5b51054.avro
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files

Comparable to Iceberg, in Delta Lake, the metadata layer is represented through the folder _delta_log:

<table base folder>
├── _delta_log # contains metadata files
│ └── 00000000000000000000.json
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files

Although the metadata layer varies in structure and capabilities between OTFs, it’s eventually just files on storage. Typically, it resides in the table’s base folder adjacent to the data files.

Now, the question emerges: what if metadata files of multiple different formats are stored in parallel for the same table?

Current approaches to interoperability do exactly that, as we will see in the next section.

Apache XTable

XTable is currently provided as a standalone Java binary. It translates the metadata layer between Apache Hudi, Apache Iceberg, or Delta Lake without rewriting data files and integrates with Iceberg-compatible catalogs like the AWS Glue Data Catalog.

In practice, XTable reads the latest snapshot of an input table and creates additional metadata for configurable target formats. It adds this additional metadata to the table on the storage layer—in addition to existing metadata.

Through this, you can choose either format, source or target, read the respective metadata, and get the same consistent view on the table’s data.

The following diagram illustrates the metadata translation process.

Let’s assume you have an existing Delta Lake table that you want to make readable as an Iceberg table. To run XTable, you invoke its Java binary and provide a dataset config file that specifies source and target format, as well as source table paths:

java -jar utilities-0.1.0-SNAPSHOT-bundled.jar \
	--datasetConfig datasetConfig.yaml

A minimal datasetConfig.yaml looks as follows, assuming the table is stored on Amazon Simple Storage Service (Amazon S3):

---
sourceFormat: DELTA
targetFormats:
  - ICEBERG
datasets:
  - tableBasePath: s3://<URI to base folder of table>
    tableName: <table name>
...

As shown in the following listing, XTable adds the Iceberg-specific metadata folder to the table’s base path in addition to the existing _delta_log folder. Now, clients can read the table in either Delta Lake or Iceberg format.

<table base folder>
├── _delta_log # Previously existing Delta Lake metadata
│   └── ...
├── metadata   # Added by XTable: Apache Iceberg metadata
│   └── ...
└── part-00011-587322f1-1007-4500-a5cf-8022f6e7fa3c-c000.snappy.parquet # data files

To register the Iceberg table in Data Catalog, pass a further config file to XTable that is responsible for Iceberg catalogs:

java -jar utilities-0.1.0-SNAPSHOT-bundled.jar \
	--datasetConfig datasetConfig.yaml \
	-- icebergCatalogConfig glueDataCatalog.yaml

The minimal contents of glueDataCatalog.yaml are as follows. It configures XTable to use the Data Catalog-specific IcebergCatalog implementation provided by the iceberg-aws module, which is part of the Apache Iceberg core project:

---
catalogImpl: org.apache.iceberg.aws.glue.GlueCatalog
catalogName: glue
catalogOptions:
  warehouse: s3://<URI to base folder of Iceberg tables>
  catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
  io-impl: org.apache.iceberg.aws.s3.S3FileIO
... 

Run Apache XTable as an Airflow Operator

You can use XTable in batch data pipelines that write tables on the data lake and make sure these are readable in different file formats. For instance, operating in the Delta Lake ecosystem, a data pipeline might create Delta tables, which need to be accessible as Iceberg tables as well.

One tool to orchestrate data pipelines on AWS is Amazon MWAA, which is a managed service for Apache Airflow. In the following sections, we explore how XTable can run within a custom Airflow Operator on Amazon MWAA. We elaborate on the initial design of such an Operator and demonstrate its deployment on Amazon MWAA.

Why a custom Operator? Although XTable could also be invoked from a BashOperator directly, we choose to wrap this step in a custom operator to allow for configuration through a native Airflow programming language (Python) and operator parameters only. For a background on how to write custom operators, see Creating a custom operator.

The following diagram illustrates the dependency between the operator and XTable’s binary.

Input parameters of the Operator

XTable’s primary inputs are YAML-based configuration files:

  • Dataset config – Contains source format, target formats, and source tables
  • Iceberg catalog config (optional) – Contains the reference to an external Iceberg catalog into which to register the table in the target format

We choose to reflect the data structures of the YAML files in the Operator’s input parameters, as listed in the following table.

Parameter Type Values
dataset_config dict Contents of dataset config as dict literal
iceberg_catalog_config dict Contents of Iceberg catalog config as dict literal

As the Operator runs, the YAML files are generated from the input parameters.

Refer to XTable’s GitHub repository for a full reference of all possible dict keys.

Example parameterisation

The following example shows the configuration to translate a table from Delta Lake to both Iceberg and Hudi. The attribute dataset_config reflects the structure of the dataset config file through a Python dict literal:

from mwaa_plugin.plugin import XtableOperator
operator = XtableOperator(
    task_id="xtableTask",
    dataset_config={
        "sourceFormat": "DELTA",
        "targetFormats": ["ICEBERG", "HUDI"],
        "datasets": [
            {
                "tableBasePath": "s3://datalake/sales",
                "tableName": "sales",
                "namespace": "table",
            }
        ],
    }
)

Sample code: The full source code of the sample XtableOperator and all other code used in this post is provided through this GitHub repository.

Solution overview

To deploy the custom operator to Amazon MWAA, we upload it together with DAGs into the configured DAG folder.

Besides the operator itself, we also need to upload XTable’s executable JAR. As of writing this post, the JAR needs to be compiled by the user from source code. To simplify this, we provide a container-based build script.

Prerequisites

We assume you have at least an environment consisting of Amazon MWAA itself, an S3 bucket, and an AWS Identity and Access Management (IAM) role for Amazon MWAA that has read access to the bucket and optionally write access to the AWS Glue Data Catalog.

In addition, you need one of the following container runtimes to run the provided build script for XTable:

  • Finch
  • Docker

Build and deploy the XTableOperator

To compile XTable, you can use the provided build script and complete the following steps:

  1. Clone the sample code from GitHub:
    git clone https://github.com/aws-samples/apache-xtable-on-aws-samples.git
    cd apache-xtable-on-aws-samples
  2. Run the build script:
    ./build-airflow-operator.sh

  3. Because the Airflow operator uses the library JPype to invoke XTable’s JAR, add a dependency in the Amazon MWAA requirement.txt file:
    JPype1==1.5.0

    For a background on installing additional Python libraries on Amazon MWAA, see Installing Python dependencies.
    Because XTable is Java-based, a Java 11 runtime environment (JRE) is required on Amazon MWAA. You can use the Amazon MWAA startup script to install a JRE.

  4. Add the following lines to an existing startup script or create a new one as provided in the sample code base of this post:
    if [[ "${MWAA_AIRFLOW_COMPONENT}" != "webserver" ]]
    then
        sudo yum install -y java-11-amazon-corretto-headless
    fi

    For more information about this mechanism, see Using a startup script with Amazon MWAA.

  5. Upload xtable_operator/, requirements.txt, startup.sh and .airflowignore to the S3 bucket and respective paths from which Amazon MWAA will read files.
    Make sure the IAM role for Amazon MWAA has appropriate read permissions.
    With regard to the Customer Operator, make sure to upload the local folder xtable_operator/ and .airflowignore into the configured DAG folder.

  6. Update the configuration of your Amazon MWAA environment as follows and start the update process:
    1. Add or update the S3 URI to the requirements.txt file through the Requirements file configuration option.
    2. Add or update the S3 URI to the startup.sh script through Startup script configuration option.
  7. Optionally, you can use the AWS Glue Data Catalog as an Iceberg catalog. In case you create Iceberg metadata and want to register it in the AWS Glue Data Catalog, the Amazon MWAA role needs permissions to create or modify tables in AWS Glue. The following listing shows a minimal policy for this. It constrains permissions to a defined database in AWS Glue:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetDatabase",
                    "glue:CreateTable",
                    "glue:GetTables",
                    "glue:UpdateTable",
                    "glue:GetDatabases",
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<AWS Region>:<AWS Account ID>:catalog",
                    "arn:aws:glue:<AWS Region>:<AWS Account ID>:database/<Database name>",
                    "arn:aws:glue:<AWS Region>:<AWS Account ID>:table/<Database name>/*"
                ]
            }
        ]
    }

Using the XTableOperator in practice: Delta Lake to Apache Iceberg

Let’s look into a practical example that uses the XTableOperator. We continue the scenario of a data pipeline in the Delta Lake ecosystem and assume it is implemented as a DAG on Amazon MWAA. The following figure shows our example batch pipeline.

The pipeline uses an Apache Spark job that is run by AWS Glue to write a Delta table into an S3 bucket. Additionally, the table is made accessible as an Iceberg table without data duplication. Finally, we want to load the Iceberg table into Amazon Redshift, which is a fully managed, petabyte-scale data warehouse service in the cloud.

As shown in the following screenshot of the graph visualization of the example DAG, we run the XTableOperator after creating the Delta table through a Spark job. Then we use the RedshiftDataOperator to refresh a materialized view, which is used in downstream transformations as a source table. Materialized views are a common construct to precompute complex queries on large tables. In this example, we use them to simplify data loading into Amazon Redshift because of the incremental update capabilities in combination with Iceberg.

The input parameters of the XTableOperator are as follows:

operator = XtableOperator(
    task_id="xtableTask",
    dataset_config={
        "sourceFormat": "DELTA",
        "targetFormats": ["ICEBERG"],
        "datasets": [
            {
                "tableBasePath": "s3://<datalake>/<sales>",
                "tableName": "sales",
                "namespace": "table",
            }
        ],
    },
    iceberg_catalog_config={
        "catalogImpl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "catalogName": "glue",
        "catalogOptions": {
            "warehouse": "s3://datalake/sales",
            "catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
            "io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
    }
)

The XTableOperator creates Apache Iceberg metadata on Amazon S3 and registers a table accordingly in the Data Catalog. The following screenshot shows the created Iceberg table. AWS Glue stores a pointer to Iceberg’s most recent metadata file. As updates are applied to the table and new metadata files are created, XTable updates the pointer after each job.

Amazon Redshift is able to discover the Iceberg table through the Data Catalog and read it using Amazon Redshift Spectrum.

Summary

In this post, we showed how Apache XTable translates the metadata layer of open table formats without data duplication. This provides advantages from both a cost and data integrity perspective—especially in large-scale environment—and allows for a migration of an existing historical estate of datasets. We also discussed how a you can implement a custom Airflow Operator that embeds Apache XTable into data pipelines on Amazon MWAA.

For further reading, visit What’s new with Amazon MWAA and Apache XTable’s website. For more examples of other customer operators, refer to the following GitHub repository.


About the Authors

Matthias Rudolph is an Associate Solutions Architect, digitalizing the German manufacturing industry.

Stephen Said is a Senior Solutions Architect and works with Retail/CPG customers. His areas of interest are data platforms and cloud-native software engineering.