AWS Big Data Blog

Stream, transform, and analyze XML data in real time with Amazon Kinesis, AWS Lambda, and Amazon Redshift

August 30, 2023: Amazon Kinesis Data Analytics has been renamed to Amazon Managed Service for Apache Flink. Read the announcement in the AWS News Blog and learn more.

February 9, 2024: Amazon Kinesis Data Firehose has been renamed to Amazon Data Firehose. Read the AWS What’s New post to learn more.

When we look at enterprise data warehousing systems, we receive data in various formats, such as XML, JSON, or CSV. Most third-party system integrations happen through SOAP or REST web services, where the input and output data format is either XML or JSON. When applications deal with CSV or JSON, it becomes fairly simple to parse because most programming languages and APIs have direct support for CSV or JSON. But for XML files, we need to consider a custom parser, because the format is custom and can be very complex.

When systems interact with each other and process data through different pipelines, they expect real-time processing or availability of data, so that business decisions can be instant and quick. In this post, we discuss a use case where XMLs are streamed through a real-time processing system and can go through a custom XML parser to flatten data for easier business analysis.

To demonstrate the implementation approach, we use AWS cloud services like Amazon Kinesis Data Streams as the message bus, Amazon Kinesis Data Firehose as the delivery stream with Amazon Redshift data warehouse as the target storage solution, and AWS Lambda as record transformer of Kinesis Data Firehose, which flattens the nested XML structure with custom parser script in Python.

AWS services overview

This solution uses AWS services for the following purposes:

  • Kinesis Data Streams is a massively scalable and durable real-time data streaming service. It can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as website click-streams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more. We use Kinesis Data Streams because it’s a serverless solution that can scale based on usage.
  • Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics tools. It can capture, transform, and load streaming data into Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics with existing business intelligence (BI) tools and dashboards you’re already using today. It’s a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, minimizing the amount of storage used at the destination and increasing security. In our use case, our target storage layer is Amazon Redshift, so Kinesis Data Firehose fits great to simplify the solution.
  • Lambda is an event-driven, serverless computing platform provided by AWS. It’s a computing service that runs code in response to events and automatically manages the computing resources required by that code. Lambda supports multiple programming languages, and for our use case, we use Python 3.8. Other options include Amazon Kinesis Data Analytics with Flink, Amazon EMR with Spark streaming, Kinesis Data Firehose, or a custom application based on Kinesis consumer library. We use Kinesis Data Firehose as the consumer in this use case, with AWS Lambda as the record transformer, because our target storage is Amazon Redshift, which is supported by Kinesis Data Firehose.
  • Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. This means customers of all sizes and industries can use it to store and protect any amount of data for a range of use cases, such as websites, mobile applications, backup and restore, archive, enterprise applications, IoT devices, and big data analytics. For our use case, we use Amazon S3 as an intermediate storage before loading to the data warehousing system, so that it’s fault tolerant and provides better performance while loading to Amazon Redshift. By default, Kinesis Data Firehose requests an intermediate S3 bucket path when Amazon Redshift is the target.
  • Amazon Redshift is a fast, fully managed data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing BI tools. In our use case, we use Amazon Redshift so that BI tools like Amazon QuickSight can easily connect to Amazon Redshift to build real-time dashboards.

Architecture overview

The following diagram illustrates the simple architecture that you can use to implement the solution.

The architecture includes the following components:

  • The Amazon Kinesis Producer Library (KPL) represents the system that pushes data to Kinesis Data Streams. It can be a simple Amazon Elastic Compute Cloud (Amazon EC2) machine or your local windows command line that executes the Kinesis Data Streams command line interface (CLI) to push messages. Alternatively, it can be a dynamic application that uses Kinesis Data Streams APIs or KPL to push messages dynamically. For our use case, we spin up an EC2 instance through AWS Cloud9 and use Kinesis Data Streams CLI commands to publish messages.
  • Kinesis Data Streams receives messages against a partition key from the publisher and waits for consumers to consume it. By default, the retention period of the messages in Kinesis Data Streams is 24 hours, but you can extend it to 7 days.
  • Kinesis Data Firehose takes a few actions:
    • Consumes data from Kinesis Data Streams and writes the same XML message into a backup S3 bucket.
    • Invokes a Lambda function that acts as a record transformer. Lambda receives input as XML, applies transformations to flatten it to be pipe-delimited content, and returns it to Kinesis Data Firehose.
    • Writes the pipe-delimited content to another S3 bucket, which acts as an intermediate storage bucket before writing into Amazon Redshift.
    • Invokes the Amazon Redshift COPY command, which takes pipe-delimited data from the intermediate S3 bucket and writes it into Amazon Redshift.
  • Data is inserted into the Amazon Redshift table, which you can query for data analysis and reporting.

Solution overview

To implement this solution, you complete the following steps:

  1. Set up the Kinesis data stream as the message bus.
  2. Set up KPL, which publishes sample XML message data to Kinesis Data Streams.
  3. Create an Amazon Redshift cluster, which acts as target storage for the Firehose delivery stream.
  4. Set up the delivery stream, which uses Lambda for record transformation and Amazon Redshift as target storage.
  5. Customize a Lambda function script that converts the nested XML string to a flat pipe-delimited stream.

Prerequisites

Before beginning this tutorial, make sure you have permissions to create Kinesis data streams and publish messages to the streams.

Setting up your Kinesis data stream

You can use the AWS Management Console to create a data stream as a one-time activity. You can configure the cluster capacity as per your requirement, but start with the minimum and apply auto scaling as the data volume increases. Auto scaling is based on Amazon CloudWatch metrics. For more information, see Scale Amazon Kinesis Data Streams with AWS Application Auto Scaling.

Setting up KPL

For this use case, we use the AWS Cloud9 environment IDE, where through the Linux command line, we can execute Kinesis Data Streams CLI commands to publish sample XML messages. The following code shows an example XML of an employee record that has one-level nesting for the all_addresses attribute:

aws kinesis put-record --stream-name <Stream-Name> --data "<employees><employee><first_name>FName 1</first_name><last_name>LName 1</last_name><all_address><address><type>primary</type><street_address>Street Address 1</street_address><state>State 1</state><zip>11111</zip></address><address><type>secondary</type><street_address>Street Address 2</street_address><state>State 2</state><zip>11112</zip></address></all_address><phone>111-111-1111</phone></employee><employee><first_name>FName 2</first_name><last_name>LName 2</last_name><all_address><address><type>primary</type><street_address>Street Address 3</street_address><state>State 3</state><zip>11113</zip></address><address><type>secondary</type><street_address>Street Address 4</street_address><state>State 4</state><zip>11114</zip></address></all_address><phone>111-111-1112</phone></employee></employees>" —partition-key <partition-key-name>

You need to change the stream name, XML data, and partition key in the preceding code as per your use case. Also, instead of an AWS Cloud9 environment, you have additional ways to submit messages to the data stream:

  • Use an EC2 instance to execute the Kinesis Data Streams CLI command
  • Use KPL or Kinesis Data Streams APIs in any programming language to submit messages dynamically through your custom application

Creating an Amazon Redshift cluster

In this step, you create an Amazon Redshift cluster that has required permissions and ports open for Kinesis Data Firehose to write to it. For instructions, see Controlling Access with Amazon Kinesis Data Firehose.

Make sure the cluster has the required port and permissions so that Kinesis Firehose can push data into it. Also make sure the table schema you create matches your pipe-delimited format that Lambda creates as output and Kinesis Data Firehose uses it to write to Amazon Redshift.

Setting up the delivery stream

When you create your Kinesis Data Firehose delivery stream on the console, define the source as Kinesis Data Streams, the target as the Amazon Redshift cluster, and enable record transformation with Lambda.

To complete this step, you need to create an AWS Identity and Access Management (IAM) role with the following permissions for the delivery stream:

  • Read permissions from the data stream
  • Write permissions to the intermediate S3 bucket
  • Write permissions to the defined Amazon Redshift cluster

Define the following configurations for the delivery stream:

  • Enable the source record transformation, where you selected your Lambda function.

  • As an optional step, you can enable source record backup, which saves the source XML to the S3 bucket path you define.

  • Define the intermediate S3 bucket, which you use to store transformed pipe-delimited records and later use for the Amazon Redshift copy.

  • In your Amazon Redshift configurations, for COPY options, make sure to specify DELIMITER ‘|’, because the Lambda function output is pipe delimited and Kinesis Data Firehose uses that in the Amazon Redshift copy operation.

Customizing the Lambda function

This function is invoked through Kinesis Data Firehose when the record arrives in Kinesis Data Streams.

Make sure you increase the Lambda execution timeout to more than 1 minute. See the following code:

from __future__ import print_function

import base64
import json
import boto3
import os
import time
import csv 
import sys

from xml.etree.ElementTree import XML, fromstring
import xml.etree.ElementTree as ET

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data'])
        parsedRecords = parseXML(payload)
        
        # Do custom processing on the payload here
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(parsedRecords)
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))
    return {'records': output}
    
    
def parseXML(inputXML):
    xmlstring =  str(inputXML.decode('utf-8'))
    
    # create element tree object
    root = ET.fromstring(str(xmlstring))
    #print("Root Tag"+root.tag)
    
    # create empty list for items 
    xmlItems = ""
  
    # iterate over employee records
    for item in root.findall('employee'):
       #print("child tag name:"+item.tag+" - Child attribute")
       
       # Form pipe delimited string, by concatenating XML values
       record = item.find('first_name').text + "|" + item.find('last_name').text + "|" + item.find('phone').text
       
       primaryaddress = ""
       secondaryaddress = ""
       
       # Get primary address and secondary address separately to be concatenated to the original record in sequence
       for addressitem in item.find('all_address').findall('address'):
           if(addressitem.find('type').text == "primary"):
               primaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
           elif(addressitem.find('type').text == "secondary"):
               secondaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
               
       #print("Primary Address:"+primaryaddress)
       #print("Secondary Address:"+secondaryaddress)
       
       record += "|" + primaryaddress + "|" + secondaryaddress + "\n"
       xmlItems += record
       #print("Record"+record)
    
    #print("Final Transformed Output:"+xmlItems)
    return xmlItems.encode('utf-8')

You can customize this example code to embed your own XML parser logic. Keep in mind that, while using the function, the request and response (synchronous calls) body payload size can be up to 6 MB, so it’s important to make sure the return value isn’t increased over that limit.

Your Amazon Redshift table (employees) has respective fields to capture the flattened pipe-delimited data. Your query might look like the following code to fetch and read the data:

SELECT first_name, last_name, phone, primary_address_street, primary_address_state, primary_address_zip, secondary_address_street, secondary_address_state, secondary_address_zip
FROM employees

The following screenshot shows the result of the query in the Amazon Redshift query editor.

Debugging

While setting up this framework in your development environment, you can debug individual components of the architecture with the following guidelines:

  • Use the Kinesis Data Streams Monitoring tab to validate that it receives messages and read operations are happening through the consumer (Kinesis Data Firehose). You can also use Kinesis Data Streams CLI commands to read from the stream.
  • Use the Kinesis Data Firehose Monitoring tab to check if it receives messages from Kinesis Data Streams and can push them to Amazon Redshift. You can also check for errors on the Error logs tab or directly on the Amazon CloudWatch console.
  • Validate Lambda with a test execution to check that it can transform records to pipe-delimited formats and return to Amazon Data Firehose with the expected format (base64 encoded format).
  • Confirm that the S3 intermediate storage bucket has the transformed record and doesn’t write into failed processing or error record paths. Also, check if the transformed records are pipe delimited and match the schema of the target Amazon Redshift table.
  • Validate if the backup S3 bucket has the original XML format records. If Lambda or the delivery stream fails, you have an approach to manually reprocess it.
  • Make sure Amazon Redshift has the new data records reflecting through SQL SELECT queries and check the cluster’s health on the Monitoring

Conclusion

This post showed you how to integrate real-time streaming of XML messages and flatten them to store in a data warehousing system for real-time dashboards.

Although you followed individual steps for each service in your development environment, for a production setup, consider the following automation methods:

  • AWS CloudFormation allows you to embed infrastructure as code that can spin up all required resources for the project, and you can easily migrate or set up your application in production or other AWS accounts.
  • A custom monitoring dashboard can take input from each AWS service you use through its APIs and show the health of each service with the number of records being processed.

Let us know in the comments any thoughts of questions you have about applying this solution to your use cases.


About the Author

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.