Building Event-Driven Batch Analytics on AWS
Karthik Sonti is a Senior Big Data Architect with AWS Professional Services
Modern businesses typically collect data from internal and external sources at various frequencies throughout the day. These data sources could be franchise stores, subsidiaries, or new systems integrated as a result of merger and acquisitions.
For example, a retail chain might collect point-of-sale (POS) data from all franchise stores three times a day to get insights into sales as well as to identify the right number of staff at a given time in any given store. As each franchise functions as an independent business, the format and structure of the data might not be consistent across the board. Depending on the geographical region, each franchise would provide data at a different frequency and the analysis of these datasets should wait until all the required data is provided (event-driven) from the individual franchises. In most cases, the individual data volumes received from each franchise are usually small but the velocity of the data being generated and the collective volume can be challenging to manage.
In this post, I walk you through an architectural approach as well as a sample implementation on how to collect, process, and analyze data for event-driven applications in AWS.
The architecture diagram below depicts the components and the data flow needed for a event-driven batch analytics system. At a high-level, this architecture approach leverages Amazon S3 for storing source, intermediate, and final output data; AWS Lambda for intermediate file level ETL and state management; Amazon RDS as the state persistent store; Amazon EMR for aggregated ETL (heavy lifting, consolidated transformation, and loading engine); and Amazon Redshift as the data warehouse hosting data needed for reporting.
In this architecture, each location on S3 stores data at a certain state of transformation. When new data is placed at a specific location, an S3 event is raised that triggers a Lambda function responsible for the next transformation in the chain. You can use this event-driven approach to create sophisticated ETL processes, and to syndicate data availability at a given point in the chain.
Data staging layer
S3, being an object store, inherently provides support for a wide variety of data formats (structured, unstructured) and also enables schema on read capability (you can defer assigning a structure or model to the data until you are ready to consume it). That makes S3 a great place to land the data from various source systems.
From an enterprise perspective, S3 could act as your data lake layer for storing data in the source-identical format for any future deep analytics purposes. S3 lets you organize your data with prefixes, which can be used to separate the validated/converted data from source-identical data. S3 also allows you to configure events that invoke specific Lambda functions configured in the input validation/conversion layer and input tracking layer. The idea of breaking the problem up into smaller components starts right here.
Input validation/ conversion layer
The Lambda function in this layer is used for validating ingested files from sources. Validation, in some use cases, could mean checking for specific attributes, and in others it could be to convert data from the source format to a format that the subsequent EMR job expects. Common examples are removing unwanted characters or attributes from the input files, converting a legacy file such as .dbf to a modern file format .json, or decompressing a zip to individual files.
Note that the Lambda function in this layer acts on a file level and writes the output to a different location in the same bucket. If your use case involves complex file conversions, consider breaking this layer into concise steps, each backed by their own Lambda function adhering to the enforced limits. This layer can be omitted if no file-level conversions or validations are needed.
Input tracking layer
In many event-driven batch analytics use cases, the final step to consolidate and process data cannot be invoked unless certain dependencies are resolved. Those dependencies could be inter-file dependencies such as orders data depending on Item data or volume-based dependencies such as waiting until a sizeable amount of data is received before submitting an aggregation job.
To enable resolving such dependencies, the first step is to audit which file is being received at what “state”: Validated, Converted, etc. The number of “states” varies from use case to use case. As soon as the input validation/conversion layer Lambda function places its output into the S3 bucket at a designated location, the same S3 eventing framework is leveraged to trigger the Lambda function configured in this layer, to track those states into a state management store so that the subseqeuent layers can read the store and verify that the configured dependencies are resolved before submitting a EMR job.
State management store
As the name indicates, the persistent storage in this layer stores the state information of files received from data sources and the aggregation jobs configured to run on those files. The state management store is central to this architecture approach, as it helps control the criteria for submitting an EMR aggregation job, its input size and input type, along with other parameters.
The state management store provides a single view of input files that have been received, the list of jobs currently running on those files, and the information on the states (Running/ Completed/ Failed) of those jobs.
While RDS and Amazon DynamoDB are equally good choices for building a state management store, RDS provides the flexibility of using SQL commands to edit the EMR aggregation job configurations and easily tracking the status of a job and its impacts. A sample state management store is modelled in MySQL for the use case discussed later.
Aggregation job submission layer
The Lambda function in this layer iterates over the list of open jobs (jobs that are not in “Running” state) in the state management store and submits a job if the respective precondition criteria configured for that job are met.
In addition to a generic precondition to not submit the job unless an updated version of input files are received after the last time the job was launched, specific preconditions for each job are verified by the Lambda function configured in this layer, such as minimum file count, existence of reference data, etc.
After an EMR job is submitted, the Lambda function in this layer updates the state management store with the respective clusterId:stepId combination to enable the Job monitoring layer update job status. The implementation I provide below for the use case example implements all the aforementioned preconditions.
The Lambda function in this layer is scheduled to run at a specific interval, which is a function of the batch frequency of the input sources and desired frequency of the aggregation jobs. Chosing the right schedule interval prevents concurrent job submissions.
Aggregation job monitoring layer
The Lambda function in this layer iterates over the submitted jobs and updates the status of that job from “Running” to either “Completed” or “Failed”. The Lambda function in this layer is scheduled to run at a specific interval, which is a function of the average time that your aggregation jobs run.
Aggregation and load layer
This layer does the bulk of the heavy lifting of data transformation, aggregation, and loading into Amazon Redshift. Depending on the requirement for how frequently transformation and loading to Amazon Redshift should happen, you can either have a long-running cluster with jobs submitted as steps or have a new cluster instantiated every time a job has to be submitted.
Also depending on the volume of updates received from input sources, it might be efficient to just add the changed records rather than truncate and reload the whole dataset. The output from the aggregation job is stored in the Amazon Redshift data warehouse.
Exception handling and tracking
The Lambda functions across all the layers retry three times in any error scenario. Even after the retries, if a Lambda function fails, Amazon CloudWatch provides metrics and logs to help monitor and troubleshoot a Lambda function.
In an EMR step failure scenario, logs are maintained in S3 and can be accessed from the AWS Management Console. For more information, see Hadoop Resource Manager/Application Master UI.
Sample use case: Yummy Foods
Here’s a sample use case for implementing the event-driven batch analytics framework discussed so far. Yummy Foods, a hypothetical customer, has franchise stores all over the country. These franchise stores run on heterogeneous platforms and they submit cumulative transaction files to Yummy Foods at various cadence levels throughout the day in tab-delimited (.tdf) format. Some of these franchise stores, due to their system limitations, occasionally send additional data starting with characters such as “—-“.
Yummy Foods need to be able to update insights on the sales made by each franchise for a given item throughout the day as soon as the complete list of franchise files from a given province are available. The number of franchises per province is fixed and seldom changes.
The aggregation job for a given province should not be submitted until the configured number of franchise store files from that province are available and also until the product master data update is posted at the beginning of the day. A master data update is identified by the presence of at least one “Item.csv” file for that day.
The aggregation job should consider only transaction codes 4 (sale amount) , 5 (tax amount) and 6 (discount amount). The rest of the codes can be ignored. After the aggregation job is completed, only one record should exist for a given combination of franchise store, item, and transaction date.
Complete end-to-end code Java implementation for the above use case is available from the aws-blog-event-driven-batch-analytics GitHub repo. Follow the instructions in the Setup.md file to set up and execute the code. The high-level cost estimate for the infrastructure to test this code is around $1.10 per hour.
The “Input Validation/ Conversion “ layer eliminates any bad data in the input files and converts the tab delimited .tdf files to .csv files.
The “State Management Store” is modelled to be able to store ingested file status (INGESTEDFILESTATUS) and also the job configurations (AGGRJOBCONFIGURATION) with preconditions such as waiting until all the fixed number of vendor files are received for a province and verifying that the item master data is posted.
The “Input Tracking” layer records the last validated timestamp of the input file in the file status table (INGESTEDFILESTATUS) within the “State Management Store”.
The “Aggregation Job Submisssion” layer submits a job when the preconditions configured for a job in the “State Management Store” are satisfied.
The “Aggregation and Load layer” EMR Spark job, based on the input parameter, processes and aggregates the vendor transaction data and updates the Amazon Redshift data warehouse.
The “Aggregation Job Monitoring” layer at a scheduled interval updates the active “Running” job status to either “Completed” or “Failed” for tracking purposes.
To avoid future charges, terminate the RDS instance, Amazon Redshift cluster, and EMR cluster provisioned as part of testing this code.
I’ve walked through an elastic and modular architecture approach that you can follow to build event-driven batch analytics on AWS. Next time, when you encounter ETL use cases that involves a series of steps in cleansing, standardization, and consolidation of data received from a wide variety of internal and external sources at fixed intervals, consider leveraging the approach described in this post.
This approach provides the following benefits:
- Divides the problem into small, decoupled components, with each component having the ability to fire off independently (useful in “replay” scenarios).
- Scales out or in dynamically with incoming loads.
- Lets you control when and how your EMR processing job should be triggered.
- Lets you track your event-driven batch analytic jobs and the respective input files
- Incurs costs for only the processing used rather than for fixed software and hardware infrastructure.
If you have questions or suggestions, please comment below.