AWS Big Data Blog

Using Amazon EMR DeltaStreamer to stream data to multiple Apache Hudi tables

In this post, we show you how to implement real-time data ingestion from multiple Kafka topics to Apache Hudi tables using Amazon EMR. This solution streamlines data ingestion by processing multiple Amazon Managed Streaming for Apache Kafka (Amazon MSK) topics in parallel while providing data quality and scalability through change data capture (CDC) and Apache Hudi.

Organizations processing real-time data changes across multiple sources often struggle with maintaining data consistency and managing resource costs. Traditional batch processing requires reprocessing entire datasets, leading to high resource usage and delayed analytics. By implementing CDC with Apache Hudi’s MultiTable DeltaStreamer, you can achieve real-time updates; efficient incremental processing with atomicity, consistency, isolation, durability (ACID) guarantees; and seamless schema evolution while minimizing storage and compute costs.

Using Amazon Simple Storage Service (Amazon S3), Amazon CloudWatch, Amazon EMR, Amazon MSK and AWS Glue Data Catalog, you’ll build a production-ready data pipeline that processes changes from multiple data sources simultaneously. Through this tutorial, you’ll learn to configure CDC pipelines, manage table-specific configurations, implement 15-minute sync intervals, and maintain your streaming pipeline. The result is a robust system that maintains data consistency while enabling real-time analytics and efficient resource utilization.

What is CDC?

Imagine a constantly evolving data stream, a river of information where updates flow continuously. CDC acts like a sophisticated net, capturing only the modifications—the inserts, updates, and deletes—happening within that data stream. Through this targeted approach, you can focus on the new and changed data, significantly improving the efficiency of your data pipelines.There are numerous advantages to embracing CDC:

  • Reduced processing time – Why reprocess the entire dataset when you can focus only on the updates? CDC minimizes processing overhead, saving valuable time and resources.
  • Real-time insights – With CDC, your data pipelines become more responsive. You can react to changes almost instantaneously, enabling real-time analytics and decision-making.
  • Simplified data pipelines – Traditional batch processing can lead to complex pipelines. CDC streamlines the process, making data pipelines more manageable and easier to maintain.

Why Apache Hudi?

Hudi simplifies incremental data processing and data pipeline development. This framework efficiently manages business requirements such as data lifecycle and improves data quality. You can use Hudi to manage data at the record-level in Amazon S3 data lakes to simplify CDC and streaming data ingestion and handle data privacy use cases requiring record-level updates and deletes. Datasets managed by Hudi are stored in Amazon S3 using open storage formats, while integrations with Presto, Apache Hive, Apache Spark, and Data Catalog give you near real time access to updated data. Apache Hudi facilitates incremental data processing for Amazon S3 by:

  • Managing record-level changes – Ideal for update and delete use cases
  • Open formats – Integrates with Presto, Hive, Spark, and Data Catalog
  • Schema evolution – Supports dynamic schema changes
  • HoodieMultiTableDeltaStreamer – Simplifies ingestion into multiple tables using centralized configurations

Hudi MultiTable Delta Streamer

The HoodieMultiTableStreamer offers a streamlined approach to data ingestion from multiple sources into Hudi tables. By processing multiple sources simultaneously through a single DeltaStreamer job, it eliminates the need for separate pipelines while reducing operational complexity. The framework provides flexible configuration options, and you can tailor settings for diverse formats and schemas across different data sources.

One of its key strengths lies in unified data delivery, organizing information in respective Hudi tables for seamless access. The system’s intelligent upsert capabilities efficiently handle both inserts and updates, maintaining data consistency across your pipeline. Additionally, its robust schema evolution support enables your data pipeline to adapt to changing business requirements without disruption, making it an ideal solution for dynamic data environments.

Solution overview

In this section, we show how to stream data to Apache Hudi Table using Amazon MSK. For this example scenario, there are data streams from three distinct sources residing in separate Kafka topics. We aim to implement a streaming pipeline that uses the Hudi DeltaStreamer with multitable support to ingest and process this data at 15-minute intervals.

Mechanism

Using MSK Connect, data from multiple sources flows into MSK topics. These topics are then ingested into Hudi tables using the Hudi MultiTable DeltaStreamer. In this sample implementation, we create three Amazon MSK topics and configure the pipeline to process data in JSON format using JsonKafkaSource, with the flexibility to handle Avro format when needed through the appropriate deserializer configuration

The following diagram illustrates how our solution processes data from multiple source databases through Amazon MSK and Apache Hudi to enable analytics in Amazon Athena. Source databases send their data changes—including inserts, updates, and deletes—to dedicated topics in Amazon MSK, where each data source maintains its own Kafka topic for change events. An Amazon EMR cluster runs the Apache Hudi MultiTable DeltaStreamer, which processes these multiple Kafka topics in parallel, transforming the data and writing it to Apache Hudi tables stored in Amazon S3. Data Catalog maintains the metadata for these tables, enabling seamless integration with analytics tools. Finally, Amazon Athena provides SQL query capabilities on the Hudi tables, allowing analysts to run both snapshot and incremental queries on the latest data. This architecture scales horizontally as new data sources are added, with each source getting its dedicated Kafka topic and Hudi table configuration, while maintaining data consistency and ACID guarantees across the entire pipeline.

To set up the solution, you need to complete the following high-level steps:

  1. Set up Amazon MSK and create Kafka topics
  2. Create the Kafka topics
  3. Create table-specific configurations
  4. Launch Amazon EMR cluster
  5. Invoke the Hudi MultiTable DeltaStreamer
  6. Verify and query data

Prerequisites

To perform the solution, you need to have the following prerequisites. For AWS services and permissions, you need:

  • AWS account:
  • IAM roles:
    • Amazon EMR service role (EMR_DefaultRole) with permissions for Amazon S3, AWS Glue and CloudWatch.
    • Amazon EC2 instance profile (EMR_EC2_DefaultRole) with S3 read/write access.
    • Amazon MSK access role with appropriate permissions.
  • S3 buckets:
    • Configuration bucket for storing properties files and schemas.
    • Output bucket for Hudi tables.
    • Logging bucket (optional but recommended).
  • Network configuration:
  • Development tools:

Set up Amazon MSK and create Kafka topics

In this step, you’ll create an MSK cluster and configure the required Kafka topics for your data streams.

  1. To create an MSK cluster:
aws kafka create-cluster \
    --cluster-name hudi-msk-cluster \
    --broker-node-group-info file://broker-nodes.json \
    --kafka-version "2.8.1" \
    --number-of-broker-nodes 3 \
    --encryption-info file://encryption-info.json \
    --client-authentication file://client-authentication.json
  1. Verify the cluster status:

aws kafka describe-cluster --cluster-arn $CLUSTER_ARN | jq '.ClusterInfo.State'

The command should return ACTIVE when the cluster is ready.

Schema setup

To set up the schema, complete the following steps:

  1. Create your schema files.
    1. input_schema.avsc:
      {
          "type": "record",
          "name": "CustomerSales",
          "fields": [
              {"name": "Id", "type": "string"},
              {"name": "ts", "type": "long"},
              {"name": "amount", "type": "double"},
              {"name": "customer_id", "type": "string"},
              {"name": "transaction_date", "type": "string"}
          ]
      }
    2. output_schema.avsc:
      {
          "type": "record",
          "name": "CustomerSalesProcessed",
          "fields": [
              {"name": "Id", "type": "string"},
              {"name": "ts", "type": "long"},
              {"name": "amount", "type": "double"},
              {"name": "customer_id", "type": "string"},
              {"name": "transaction_date", "type": "string"},
              {"name": "processing_timestamp", "type": "string"}
          ]
      }
  2. Create and upload schemas to your S3 bucket:
    # Create the schema directory
    aws s3 mb s3://hudi-config-bucket-$AWS_ACCOUNT_ID
    aws s3api put-object --bucket hudi-config-bucket-$AWS_ACCOUNT_ID --key HudiProperties/
    # Upload schema files
    aws s3 cp input_schema.avsc s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/
    aws s3 cp output_schema.avsc s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/

Create the Kafka topics

To create the Kafka topics, complete the following steps:

  1. Get the bootstrap broker string:
    # Get bootstrap brokers
    BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $CLUSTER_ARN --query 'BootstrapBrokerString' --output text)
  2. Create the required topics:
    kafka-topics.sh --create \
        --bootstrap-server $BOOTSTRAP_BROKERS \
        --replication-factor 3 \
        --partitions 3 \
        --topic cust_sales_details
    kafka-topics.sh --create \
        --bootstrap-server $BOOTSTRAP_BROKERS \
        --replication-factor 3 \
        --partitions 3 \
        --topic cust_sales_appointment
    kafka-topics.sh --create \
        --bootstrap-server $BOOTSTRAP_BROKERS \
        --replication-factor 3 \
        --partitions 3 \
        --topic cust_info

Configure Apache Hudi

The Hudi MultiTable DeltaStreamer configuration is divided into two major components to streamline and standardize data ingestion:

  • Common configurations – These settings apply across all tables and define the shared properties for ingestion. They include details such as shuffle parallelism, Kafka brokers, and common ingestion configurations for all topics.
  • Table-specific configurations – Each table has unique requirements, such as the record key, schema file paths, and topic names. These configurations tailor each table’s ingestion process to its schema and data structure.

Create common configuration file

Common Config: kafka-hudi config file where we specify kafka broker and common configuration for all topics as below

Create the kafka-hudi-deltastreamer.properties file with the following properties:

# Common parallelism settings
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
# Table ingestion configuration
hoodie.deltastreamer.ingestion.tablesToBeIngested=hudi_sales_tables.cust_sales_details,hudi_sales_tables.cust_sales_appointment,hudi_sales_tables.cust_info
# Table-specific config files
hoodie.deltastreamer.ingestion.hudi_sales_tables.cust_sales_details.configFile=s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/tableProperties/cust_sales_details.properties
hoodie.deltastreamer.ingestion.hudi_sales_tables.cust_sales_appointment.configFile=s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/tableProperties/cust_sales_appointment.properties
hoodie.deltastreamer.ingestion.hudi_sales_tables.cust_info.configFile=s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/tableProperties/cust_info.properties
# Source configuration
hoodie.deltastreamer.source.dfs.root=s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/
# MSK configuration
bootstrap.servers=BOOTSTRAP_BROKERS_PLACEHOLDER
auto.offset.reset=earliest
group.id=hudi_delta_streamer
# Security configuration
hoodie.sensitive.config.keys=ssl,tls,sasl,auth,credentials
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=
# Deserializer
hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

Create table-specific configurations

For each topic, create its own configuration with a topic name and primary key details. Complete the following steps:

  1. cust_sales_details.properties:
    # Table: cust sales
    hoodie.datasource.write.recordkey.field=Id
    hoodie.deltastreamer.source.kafka.topic=cust_sales_details
    hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
    hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
    hoodie.streamer.schemaprovider.registry.schemaconverter=
    hoodie.datasource.write.precombine.field=ts
  2. cust_sales_appointment.properties:
    # Table: cust sales appointment
    hoodie.datasource.write.recordkey.field=Id
    hoodie.deltastreamer.source.kafka.topic=cust_sales_appointment
    hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
    hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S hoodie.streamer.schemaprovider.registry.schemaconverter=
    hoodie.datasource.write.precombine.field=ts
  3. cust_info.properties:
    # Table: cust info
    hoodie.datasource.write.recordkey.field=Id
    hoodie.deltastreamer.source.kafka.topic=cust_info
    hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
    hoodie.deltastreamer.keygen.timebased.input.dateformat= yyyy-MM-dd HH:mm:ss.S
    hoodie.streamer.schemaprovider.registry.schemaconverter=
    hoodie.datasource.write.precombine.field=ts
    hoodie.deltastreamer.schemaprovider.source.schema.file=-$AWS_ACCOUNT_ID/HudiProperties/input_schema.avsc
    hoodie.deltastreamer.schemaprovider.target.schema.file=-$AWS_ACCOUNT_ID/HudiProperties/output_schema.avsc

These configurations form the backbone of Hudi’s ingestion pipeline, enabling efficient data handling and maintaining real-time consistency. Schema configurations define the structure of both source and target data, maintaining seamless data transformation and ingestion. Operational settings control how data is uniquely identified, updated, and processed incrementally.

The following are critical details for setting up Hudi ingestion pipelines:

  • hoodie.deltastreamer.schemaprovider.source.schema.file – The schema of the source record
  • hoodie.deltastreamer.schemaprovider.target.schema.file – The schema for the target record
  • hoodie.deltastreamer.source.kafka.topic – The source MSK topic name
  • bootstap.servers – The Amazon MSK bootstrap server’s private endpoint
  • auto.offset.reset – The consumer’s behavior when there is no committed position or when an offset is out of range

Key operational fields to achieve in-place updates for the generated schema include:

  • hoodie.datasource.write.recordkey.field – The record key field. This is the unique identifier of a record in Hudi.
  • hoodie.datasource.write.precombine.field – When two records have the same record key value, Apache Hudi picks the one with the largest value for the pre-combined field.
  • hoodie.datasource.write.operation – The operation on the Hudi dataset. Possible values include UPSERT, INSERT, and BULK_INSERT.

Launch Amazon EMR cluster

This step creates an EMR cluster with Apache Hudi installed. The cluster will run the MultiTable DeltaStreamer to process data from your Kafka topics. To create the EMR cluster, enter the following:

# Create EMR cluster with Hudi installed
aws emr create-cluster \
    --name "Hudi-CDC-Cluster" \
    --release-label emr-6.15.0 \
    --applications Name=Hadoop Name=Spark Name=Hive Name=Livy \
    --ec2-attributes KeyName=myKey,SubnetId=$SUBNET_ID,InstanceProfile=EMR_EC2_InstanceProfile \
    --service-role EMR_ServiceRole \
    --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m5.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m5.xlarge \
    --configurations file://emr-configurations.json \
    --bootstrap-actions Name="Install Hudi",Path="s3://hudi-config-bucket-$AWS_ACCOUNT_ID/bootstrap-hudi.sh"

Invoke the Hudi MultiTable DeltaStreamer

This step configures and starts the DeltaStreamer job that will continuously process data from your Kafka topics into Hudi tables. Complete the following steps:

  1. Connect to the Amazon EMR master node:
    # Get master node public DNS
    MASTER_DNS=$(aws emr describe-cluster --cluster-id $CLUSTER_ID --query 'Cluster.MasterPublicDnsName' --output text)
    
    # SSH to master node
    ssh -i myKey.pem hadoop@$MASTER_DNS
  2. Execute the DeltaStreamer job:
    # 
    spark-submit --deploy-mode client \
      --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \
      --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
      --jars "/usr/lib/hudi/hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar,/usr/lib/hudi/hudi-spark-bundle.jar" \
      --class "org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer" \
      /usr/lib/hudi/hudi-utilities-bundle_2.12-0.14.0-amzn-0.jar \
      --props s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/kafka-hudi-deltastreamer.properties \
      --config-folder s3://hudi-config-bucket-$AWS_ACCOUNT_ID/HudiProperties/tableProperties/ \
      --table-type MERGE_ON_READ \
      --base-path-prefix s3://hudi-data-bucket-$AWS_ACCOUNT_ID/hudi/ \
      --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
      --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --op UPSERT

    For continuous mode, you need to add the following property:

    
    --continuous \
    --min-sync-interval-seconds 900
    

With the job configured and running on Amazon EMR, the Hudi MultiTable DeltaStreamer efficiently manages real-time data ingestion into your Amazon S3 data lake.

Verify and query data

To verify and query the data, complete the following steps:

  1. Register tables in Data Catalog:
    # Start Spark shell
    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \
      --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
      --jars "/usr/lib/hudi/hudi-spark-bundle.jar"
    
    # In Spark shell
    spark.sql("CREATE DATABASE IF NOT EXISTS hudi_sales_tables")
    
    spark.sql("""
    CREATE TABLE hudi_sales_tables.cust_sales_details
    USING hudi
    LOCATION 's3://hudi-data-bucket-$AWS_ACCOUNT_ID/hudi/hudi_sales_tables.cust_sales_details'
    """)
    
    # Repeat for other tables
  2. Query with Athena:
    -- Sample query
    SELECT * FROM hudi_sales_tables.cust_sales_details LIMIT 10;

You can use Amazon CloudWatch alarms to alert you of issues with the EMR job or data processing. To create a CloudWatch alarm to monitor EMR job failures, enter the following:

aws cloudwatch put-metric-alarm \
    --alarm-name EMR-Hudi-Job-Failure \
    --metric-name JobsFailed \
    --namespace AWS/ElasticMapReduce \
    --statistic Sum \
    --period 300 \
    --threshold 1 \
    --comparison-operator GreaterThanOrEqualToThreshold \
    --dimensions Name=JobFlowId,Value=$CLUSTER_ID \
    --evaluation-periods 1 \
    --alarm-actions $SNS_TOPIC_ARN

Real-world impact of Hudi CDC pipelines

With the pipeline configured and running, you can achieve real-time updates to your data lake, enabling faster analytics and decision-making. For instance:

  • Analytics – Up-to-date inventory data maintains accurate dashboards for ecommerce platforms.
  • Monitoring – CloudWatch metrics confirm the pipeline’s health and efficiency.
  • Flexibility – The seamless handling of schema evolution minimizes downtime and data inconsistencies.

Cleanup

To avoid incurring future charges, follow these steps to clean up resources:

  1. Terminate the Amazon EMR cluster
  2. Delete the Amazon MSK cluster
  3. Remove Amazon S3 objects

Conclusion

In this post, we showed how you can build a scalable data ingestion pipeline using Apache Hudi’s MultiTable DeltaStreamer on Amazon EMR to process data from multiple Amazon MSK topics. You learned how to configure CDC with Apache Hudi, set up real-time data processing with 15-minute sync intervals, and maintain data consistency across multiple sources in your Amazon S3 data lake.

To learn more, explore these resources:

By combining CDC with Apache Hudi, you can build efficient, real-time data pipelines. The streamlined ingestion processes simplify management, enhance scalability, and maintain data quality, making this approach a cornerstone of modern data architectures.


About the authors

Radhakant Sahu

Radhakant Sahu

Radhakant is a Senior Data Engineer and Amazon EMR subject matter expert at Amazon Web Services (AWS) with over a decade of experience in the data space. He specializes in big data, graph databases, AI, and DevOps, building robust, scalable data and analytics solutions that help global clients derive actionable insights and drive business outcomes.

Gautam Bhaghavatula

Gautam Bhaghavatula

Gautam is an AWS Senior Partner Solutions Architect with over 10 years of experience in cloud infrastructure architecture. He specializes in designing scalable solutions, with a focus on compute systems, networking, microservices, DevOps, cloud governance, and AI operations. Gautam provides strategic guidance and technical leadership to AWS partners, driving successful cloud migrations and modernization initiatives.

Sucharitha Boinapally

Sucharitha Boinapally

Sucharitha is a Data Engineering Manager with over 15 years of industry experience. She specializes in agentic AI, data engineering, and knowledge graphs, delivering sophisticated data architecture solutions. Sucharitha excels at designing and implementing advanced knowledge mapping systems.

Veera “Bhargav” Nunna

Veera “Bhargav” Nunna

Veera is a Senior Data Engineer and Tech Lead at AWS pioneering Knowledge Graphs for Large Language Models and enterprise-scale data solutions. With over a decade of experience, he specializes in transforming enterprise AI from concept to production by delivering MVPs that demonstrate clear ROI while solving practical challenges like performance optimization and cost control.