AWS Big Data Blog

Run Apache XTable in AWS Lambda for background conversion of open table formats

This post was co-written with Dipankar Mazumdar, Staff Data Engineering Advocate with AWS Partner OneHouse.

Data architecture has evolved significantly to handle growing data volumes and diverse workloads. Initially, data warehouses were the go-to solution for structured data and analytical workloads but were limited by proprietary storage formats and their inability to handle unstructured data. This led to the rise of data lakes based on columnar formats like Apache Parquet, which came with different challenges like the lack of ACID capabilities.

Eventually, transactional data lakes emerged to add transactional consistency and performance of a data warehouse to the data lake. Central to a transactional data lake are open table formats (OTFs) such as Apache Hudi, Apache Iceberg, and Delta Lake, which act as a metadata layer over columnar formats. These formats provide essential features like schema evolution, partitioning, ACID transactions, and time-travel capabilities, that address traditional problems in data lakes.

In practice, OTFs are used in a broad range of analytical workloads, from business intelligence to machine learning. Moreover, they can be combined to benefit from individual strengths. For instance, a streaming data pipeline can write tables using Hudi because of its strength in low-latency, write-heavy workloads. In later pipeline stages, data is converted to Iceberg, to benefit from its read performance. Traditionally, this conversion required time-consuming rewrites of data files, resulting in data duplication, higher storage, and increased compute costs. In response, the industry is shifting toward interoperability between OTFs, with tools that allow conversions without data duplication. Apache XTable (incubating), an emerging open source project, facilitates seamless conversions between OTFs, eliminating many of the challenges associated with table format conversion.

In this post, we explore how Apache XTable, combined with the AWS Glue Data Catalog, enables background conversions between OTFs residing on Amazon Simple Storage Service (Amazon S3) based data lakes, with minimal to no changes to existing pipelines in a scalable and cost-effective way, as shown in the following diagram.

This post is one of multiple posts about XTable on AWS. For more examples and references to other posts, refer to the following GitHub repository.

Apache XTable

Apache XTable (incubating) is an open source project designed to enable interoperability among various data lake table formats, allowing omnidirectional conversions between formats without the need to copy or rewrite data. Originally open sourced in November 2023 under the name OneTable, with contributions from amongst others OneHouse, it was licensed under Apache 2.0. In March 2024, the project was donated to the Apache Software Foundation (ASF) and rebranded as Apache XTable, where it is now incubating. XTable isn’t a new table format but provides abstractions and tools to translate the metadata associated with existing formats. The primary objective of XTable is to allow users to start with any table format and have the flexibility to switch to another as needed.

Inner workings and features

At a fundamental level, Hudi, Iceberg, and Delta Lake share similarities in their structure. When data is written to a distributed file system, these formats consist of a data layer, typically Parquet files, and a metadata layer that provides the necessary abstraction (see the following diagram). XTable uses these commonalities to enable interoperability between formats.

The synchronization process in XTable works by translating table metadata using the existing APIs of these table formats. It reads the current metadata from the source table and generates the corresponding metadata for one or more target formats. This metadata is then stored in a designated directory within the base path of your table, such as _delta_log for Delta Lake, metadata for Iceberg, and .hoodie for Hudi. This allows the existing data to be interpreted as if it were originally written in any of these formats.

XTable provides two metadata translation methods: Full Sync, which translates all commits, and Incremental Sync, which only translates new, unsynced commits for greater efficiency with large tables. If issues arise with Incremental Sync, XTable automatically falls back to Full Sync to provide uninterrupted translation.

Community and future

In terms of future plans, XTable is focused on achieving feature parity with OTFs’ built-in features, including adding critical capabilities like support for Merge-on-Read (MoR) tables. The project also plans to facilitate synchronization of table formats across multiple catalogs, such as AWS Glue, Hive, and Unity catalog.

Run XTable as a continuous background conversion mechanism

In this post, we describe a background conversion mechanism for OTFs that doesn’t require changes to data pipelines. The mechanism periodically scans a data catalog like the AWS Glue Data Catalog for tables to convert with XTable.

On a data platform, a data catalog stores table metadata and typically contains the data model and physical storage location of the datasets. It serves as the central integration with analytical services. To maximize ease of use, compatibility, and scalability on AWS, the conversion mechanism described in this post is built around the AWS Glue Data Catalog.

The following diagram illustrates the solution at a glance. We design this conversion mechanism based on Lambda, AWS Glue, and XTable.

In order for the Lambda function to be able to detect the tables inside the Data Catalog, the following information needs to be associated with a table: source format and target formats. For each detected table, the Lambda function invokes the XTable application, which is packaged into the functions environment. Then XTable translates between source and target formats and writes the new metadata on the same data store.

Solution overview

We implement the solution with the AWS Cloud Development Kit (AWS CDK), an open source software development framework for defining cloud infrastructure in code, and provide it on GitHub. The AWS CDK solution deploys the following components:

  • A converter Lambda function that contains the XTable application and starts the conversion job for the detected tables
  • A detector Lambda function that scans the Data Catalog for tables that are to be converted and invokes the converter Lambda function
  • An Amazon EventBridge schedule that invokes the detector Lambda function on an hourly basis

Currently, the XTable application needs to be built from source. We therefore provide a Dockerfile that implements the required build steps and use the resulting Docker image as the Lambda function runtime environment.

In case you don’t have sample data available for testing, we provide scripts for generating sample datasets on GitHub. Data and metadata are shown in blue in the following detail diagram.

Converter Lambda function: Run XTable

The converter Lambda function invokes the XTable JAR, wrapped with the third-party library jpype, and converts the metadata layer of the respective data lake tables.

The function is defined in the AWS CDK through the DockerImageFunction, which uses a Dockerfile and builds a Docker container as part of the deploy step. With this mechanism, we can bundle the XTable application inside our Lambda function.

First, we download the XTtable GitHub repository and build the jar with the maven CLI. This is done as a part of the Docker container build process:

# Dockerfile # clone sources
RUN git clone --depth 1 --branch <xtable_branch> https://github.com/apache/incubator-xtable.git

# build xtable jar
WORKDIR /incubator-xtable
RUN /apache-maven-<maven_version>/bin/mvn package -DskipTests=true
WORKDIR /

To automatically build and upload the Docker image, we create a DockerImageFunction in the AWS CDK and reference the Dockerfile in its definition. To successfully run Spark and therefore XTable in a Lambda function, we need to set the LOCAL_IP variable of Spark to localhost and therefore to 127.0.0.1:

# cdk_stack.py
detector = _lambda.DockerImageFunction(
    scope=self,
    id="Converter",
    # Dockerfile in ./src directory
    code=_lambda.DockerImageCode.from_image_asset(
        directory="src", cmd=["detector.handler"]
    )
    environment={"SPARK_LOCAL_IP": "127.0.0.1"}
    ...
)

To call the XTtable JAR, we use a third-party Python library called jpype, which handles the communication with the Java virtual machine. In our Python code, the XTtable call is as follows:

# call java class with configuration files
run_sync = jpype.JPackage("org").apache.xtable.utilities.RunSync.main
run_sync(
    [
        "--datasetConfig",
        "<path_to_dataset_config>",
        "--icebergCatalogConfig",
        "<path_to_catalog_config>",
    ]
)

For more information on XTable application parameters, see Creating your first interoperable table.

Detector Lambda function: Identify tables to convert in the Data Catalog

The detector Lambda function scans the tables in the Data Catalog. For a table that will be converted, it invokes the converter Lambda function through an event. This decouples the scanning and conversion parts and makes our solution more resilient to potential failures.

The detection mechanism searches in the table parameters for the parameters xtable_table_type and xtable_target_formats. If they exist, the conversion is invoked. See the following code:

# detector.py
# create paginator to loop through AWS Glue tables
tables = glue_client.get_paginator("get_tables").paginate(
    DatabaseName=database["Name"]
)
for table_list in tables:
    table_list = table_list["TableList"]
…
# loop through all tables and check for required custom glue parameters
for table in table_list:
    required_parameters={"xtable_table_type", "xtable_target_formats"}
    # if required table parameters exist pass on table for conversion
    if required_parameters <= table["Parameters"].keys():
        yield table

EventBridge Scheduler rule

In the AWS CDK, you define an EventBridge Scheduler rule as follows. Based on the rule, EventBridge will then call the Lambda detector function every hour:

# cdk_stack.py
event = events.Rule(
    scope=self,
    id="DetectorSchedule",
    schedule=events.Schedule.rate(Duration.hours(1)),
)
event.add_target(targets.LambdaFunction(detector))

Prerequisites

Let’s dive deeper into how to deploy the provided AWS CDK stack. You need one of the following container runtimes:

  • Finch (an open source client for container development)
  • Docker

You also need the AWS CDK configured. For more details, see Getting started with the AWS CDK.

Build and deploy the solution

Complete the following steps:

  1. To deploy the stack, clone the GitHub repo, change into the folder for this post (xtable_lambda), and deploy the AWS CDK stack:
    git clone https://github.com/aws-samples/apache-xtable-on-aws-samples.git
    cd xtable_lambda
    cdk deploy

This deploys the described Lambda functions and the EventBridge Scheduler rule.

  1. When using Finch, you need to set the CDK_DOCKER environment variable before deployment:
    export CDK_DOCKER=finch

After successful deployment, the conversion mechanism starts to run every hour.

  1. The following parameters need to exist on the AWS Glue table that will be converted:
    1. "xtable_table_type": "<source_format>"
    2. "xtable_target_formats": "<target_format>, <target_format>"

On the AWS Glue console, the parameters look like the following screenshot and can be set under Table properties when editing an AWS Glue table.

  1. Optionally, if you don’t have sample data, the following scripts can help you set up a test environment either with your local machine or in an AWS Glue for Spark job:
    # local: create hudi dataset on S3
    cd scripts
    pip install -r requirements.txt
    python ./create_hudi_s3.py

Convert a streaming table (Hudi to Iceberg)

Let’s assume we have a Hudi table on Amazon S3, which is registered in the Data Catalog, and want to periodically translate it to Iceberg format. Data is streaming in continuously. We have deployed the provided AWS CDK stack and set the required AWS Glue table properties to translate the dataset to the Iceberg format. In the following steps, we run the background job, see the results in AWS Glue and Amazon S3, and query it with Amazon Athena, a serverless and interactive analytics service that provides a simplified and flexible way to analyze petabytes of data.

In Amazon S3 and AWS Glue, we can see our Hudi dataset and table along with the metadata folder .hoodie. On the AWS Glue console, we set the following table properties:

  • "xtable_target_type": "HUDI"
  • "xtable_table_formats": "ICEBERG"

Our Lambda function is invoked periodically every hour. After the run, we can find the Iceberg-specific metadata folder in our S3 bucket, which was generated by XTable.

If we look at the Data Catalog, we can see the new table <table_name>_converted was registered as an Iceberg table.

img-registered-table-after-conversion

With the Iceberg format, we can now take advantage of the time travel feature by querying the dataset with a downstream analytical service like Athena. In the following screenshot, you can see at Name: that the table is in Iceberg format.

Querying all snapshots, we can see that we created three snapshots with overwrites after the initial one.

We then take the current time and query the dataset representation of 180 minutes ago, resulting in the data from the first snapshot committed.

Summary

In this post, we demonstrated how to build a background conversion job for OTFs, using XTable and the Data Catalog, which is independent from data pipelines and transformation jobs. Through Xtable, it allows for efficient translation between OTFs, because data files are reused and only the metadata layer is processed. The integration with the Data Catalog provides wide compatability with AWS analytical services.

You can reuse the Lambda based XTable deployment in other solutions. For instance, you could use it in a reactive mechanism for near real-time conversion of OTFs, which is invoked by Amazon S3 object events resulting from changes to OTF metadata.

For further information about XTable, see the project’s official website. For more examples and references to other posts on using XTable on AWS, refer to the following GitHub repository.


About the authors

Matthias Rudolph is a Solutions Architect at AWS, digitalizing the German manufacturing industry, focusing on analytics and big data. Before that he was a lead developer at the German manufacturer KraussMaffei Technologies, responsible for the development of data platforms.

Dipankar Mazumdar is a Staff Data Engineer Advocate at Onehouse.ai, focusing on open-source projects like Apache Hudi and XTable to help engineering teams build and scale robust analytics platforms, with prior contributions to critical projects such as Apache Iceberg and Apache Arrow.

Stephen Said is a Senior Solutions Architect and works with Retail/CPG customers. His areas of interest are data platforms and cloud-native software engineering.