Toyota Connected optimizes EMR costs and improves resiliency of batch jobs
At Toyota Connected, data ingested from millions of connected vehicles is stored in our petabyte-scale Toyota Connected data lake with Amazon Simple Storage Service (Amazon S3), an object storage built to retrieve any amount of data from anywhere, as its foundation. We use Amazon EMR, which is the industry-leading cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks, such as Apache Spark. Amazon EMR is used to curate these datasets and make them available for analytics and ML. We have millions of partitions in our Amazon S3 data lake, and each partition has a small parquet file (3–5 MB in size). For these very large datasets, we were striving to find the right balance between cost, performance, and resiliency. In an ideal situation, you achieve the lowest possible cost with a good degree of performance and resiliency to meet the business service-level agreements.
In this blog, we will discuss the architecture for batch jobs, which use Amazon Managed Workflows for Apache Airflow (Amazon MWAA)—a highly available, secure, and managed workflow orchestration for Apache Airflow—for job scheduling. Then we will look into the different levers available to optimize costs and improve resiliency and finally dive deeper into code optimizations that we put in place for Amazon EMR and Apache Spark to understand how Toyota reduced filtering time from 27 minutes to 30 seconds.
The key components of this architecture include aggregate/transform and consume layers.
Aggregate/Transform: Amazon MWAA is used to programmatically submit jobs to Amazon EMR via the RunJobFlow API. The aggregation jobs use the parallel processing capability of Amazon EMR to decrypt the encoded messages and convert the data to Apache Parquet. Apache Parquet is a columnar storage file format designed for querying large amounts of data, regardless of the data processing framework or programming language. Parquet facilitates better compression, which reduces the amount of storage required. It also reduces input-output because we can efficiently scan the data. The datasets are now available for analytics purposes, partitioned following things.
- Masked identification numbers.
- Automotive models and dispatch type.
Consume: Data analytics can be directly performed off the Amazon S3 data lake by using serverless Amazon Athena, an interactive query service. We leverage AWS Identity and Access Management (IAM) policies to restrict access to Athena operations.
Dashboards can be developed rapidly using Amazon QuickSight, which helps everyone in an organization to understand the data. Data access is democratized and made available to data science groups, who build and test various models that provide value to our customers using Project Jupyter or Amazon Sagemaker notebooks.
1. Optimize costs
Before we dive deeper into the different levers available to optimize costs and improve resilience in Amazon EMR, let’s discuss the building blocks for Amazon EMR clusters. An Amazon EMR cluster is composed of a master node, core nodes, and task nodes. The details of the node types are shown in Figure 2.
The majority of the costs come from the Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances—which let you take advantage of unused Amazon EC2 capacity in the AWS cloud—in the cluster, Amazon S3 and Apache Hadoop Distributed File System (HDFS) storage, and the Amazon EMR software itself. We will take a look at the opportunities through these lenses.
1.1 Use Spot Instances
Amazon EC2 instances contribute to a significant portion of the costs, so cost reductions can be accomplished by using Spot Instances for your Amazon EMR workloads. Spot Instances are available at up to a 90 percent discount compared to Amazon EC2 On-Demand prices. Due to the fault-tolerant nature of big data workloads on Amazon EMR, Spot Instances can continue processing even when interrupted. Running Amazon EMR on Spot Instances drastically reduces the cost of big data, facilitates significantly higher compute capacity, and reduces the time to process big datasets.
The Spot Instance Advisor helps you determine pools with the least chance of interruption and provides the savings you get over On-Demand rates. You should weigh your application’s tolerance for interruption and your cost savings goals when selecting a Spot Instance. The lower your interruption rate, the longer your Spot Instances are likely to run.
When you create a cluster using the AWS Command Line Interface (AWS CLI)—a unified tool to manage your AWS services—or Amazon EMR API and an allocation strategy option, you can specify a maximum of 30 Amazon EC2 instance types per fleet as opposed to only five when you use the default Amazon EMR cluster instance fleet configuration. Make sure you use a variety of instance types for a high probability of fulfilling your Spot Instance requests.
1.2 Use new Graviton instances
Amazon EMR now supports Amazon EC2 M6g instances to deliver optimal price performance for cloud workloads. Amazon EC2 M6g instances are powered by AWS Graviton2 processors that are custom designed by AWS using 64-bit Arm Neoverse cores. Amazon EMR provides up to 35 percent lower cost and up to 15 percent improved performance for Apache Spark workloads on Graviton2-based instances versus previous generation instances. In addition, the combination of EMR runtime for Apache Spark and Amazon EC2 M6g instances offer up to 76 percent lower total cost and 3.6 times improved performance when compared to running open-source Apache Spark on previous generation instances.
1.3 Use Amazon S3 for storage
Use cases involving iterative reads over the same dataset are good candidates for HDFS. However, for several other use cases, when the dataset is read once per run, you can use EMR File System (EMRFS) S3 to store your data and achieve up to 90 percent savings on storage costs.
While using Amazon S3:
- Partition your data for improved read performance. You can get 5,500 GET per second per prefix in an Amazon S3 bucket. Using 10 prefixes can give you 55,000 reads per second.
- Avoid small files—keep the file size greater than 128 MB.
- Use columnar formats, like Parquet, for improved read performance.
1.4 Use performance-optimized runtime for Apache Spark
This is turned on by default if you use Amazon EMR 5.28+ and is 100 percent compliant with Apache Spark APIs. This can boost performance by a factor of four.
1.5 Use a larger class of instances if your workloads can handle them
The Amazon EMR price typically scales proportionally to the number of vCPUs, as seen below in Figure 3, for c5 family of instances, where the last column is the price for Amazon EMR. Here you will see that the EMR price for c5.2xlarge is $0.085 per hour, which is two times the price for c5.xlarge ($0.043 per hour) in us-east-1.
However, for larger c5-class instances, the price is the same at $0.27 per hour, even when your instances scale from 12xlarge to 24xlarge, as shown below in Figure 4:
If your processing can use larger vCPU instance classes, like .12x, .18x, or .24x, then there is the potential to save on Amazon EMR costs. Refer to the latest EMR pricing here.
1.6 Use Amazon EMR managed scaling
When your cluster loads vary over time, Amazon EMR managed scaling will automatically resize your cluster for optimal performance at the lowest possible cost, taking guesswork out of the picture. Amazon EMR manages the automatic scaling activity by continuously evaluating cluster metrics and making optimized scaling decisions. This is supported for Amazon EMR version 5.30.0 and later (except Amazon EMR version 6.0.0) and works both with instance groups and with instance fleets. Only YARN applications are supported, such as Spark, Hadoop, Hive, and Flink. For details on node allocation strategy under different scenarios, please refer to the detailed documentation here.
1.7 Close out Amazon EMR clusters
Remember to close out your Amazon EMR clusters after the jobs have run to completion. A cluster with hundreds of nodes left running inadvertently over the weekend, when the jobs have already been completed, can incur steep costs. When you close out your cluster, you will no longer incur Amazon EC2 charges. However, data on core nodes in HDFS is lost when clusters are closed out, so make sure that you store any data from HDFS into Amazon S3 prior to closing the cluster.
2. Improve resiliency
2.1 Use Amazon EMR multimaster
You can now provision three master nodes. Launching a cluster with three master nodes is supported only by Amazon EMR version 5.23.0 and later. The master node is no longer a single point of failure. If one of the master nodes fails, the cluster uses the other two master nodes and runs without interruption. In the meantime, Amazon EMR automatically replaces the failed master node with a new one that is provisioned with the same configuration and bootstrap actions. This has to be balanced with the cost consideration of running three master nodes instead of one and may be a requirement in critical business workloads.
2.2 Add task nodes to a cluster configuration
Master and core nodes in production Amazon EMR clusters should typically be set to On-Demand. Additional task nodes should be added to the cluster, and they can use Spot Instances. Core nodes have storage attached to them, and if they are recalled, the jobs can fail. Task nodes don’t have storage and the cluster can survive failures. In a hundred-node cluster, if 90 percent of the nodes are task nodes, some of the task nodes can fail and the cluster can continue to function. You can increase parallelization with task nodes and reduce the time it takes to complete a job.
2.3 Use Amazon EMR instance fleets and an allocation strategy
Amazon EMR now offers a capacity-optimized allocation strategy for provisioning Spot Instances in an Amazon EMR cluster. The capacity-optimized allocation strategy automatically makes the most efficient use of available spare capacity while taking advantage of the steep discounts offered by Spot Instances. By offering the possibility of fewer interruptions, the capacity-optimized strategy can lower the overall cost of your workload. You can choose up to 30 different instance types and sizes across multiple Availability Zones (AZ) in a single task node instance fleet.
Capacity-optimized allocation strategy uses near-real-time capacity data to allocate instances from the Spot Instance pools with the optimal capacity for the number of instances that are launching. This allocation strategy is appropriate for workloads that have a higher cost of interruption—for example, long running jobs and multitenant persistent clusters running Apache Spark, Apache Hive, and Presto.
The allocation strategy option also lets you specify up to five Amazon EC2 instance types per task node when creating your cluster with instance fleet configuration. This helps you to diversify your spot requests and get steep discounts.
- In Amazon EMR, under Hardware selection, click on Instance fleets. Then make sure to check the Apply Allocation Strategy box as shown in Figure 5. This provides for capacity-optimized spot instances.
2.4 Add more Availability Zones
Adding more AZ improves the chances of acquiring the instance fleets.
2.5 Monitor and increase storage for core nodes, if required
Storage for core nodes should operate at less than 90 percent usage. If higher, this can lead to performance degradation and result in longer running jobs and higher costs.
3. Code optimization
From a coding perspective, we had to optimize for the following challenges:
- The Spark job must scan millions of prefixes in batches.
- The Spark job must read small parquet files (3–5 MB) through each prefix.
Scanning millions of prefixes and reading millions of small parquet files became a bottleneck for the Spark and Amazon EMR job.
First, we upgraded to Spark 3.1.1 because this version handles a large number of small files efficiently. Versions prior to Spark 3.1.0 were not optimized. We increased executor cores from two to five and executor memory from 15 GB to 36 GB.
Second, we broke the job into multiple batches, and each batch application runs on 65,000 files (Amazon S3 prefixes). We arrived at this number so that the individual runs would complete in a short time (less than 10 minutes).
Third, we spread the list operation across all the nodes in the cluster so that they weren’t restricted to just the master node.
The following code snippet shows how we distributed the load across the cluster using the Spark transformation operation.
At Toyota Connected, we leveraged the above-mentioned strategies and saw significant improvement in filtering and scanning the prefixes.
- We use spot instances (100+ nodes).
- Leverage capacity optimized allocation strategy.
- Spread the workload across multiple availability zones.
- Use EMR v5.31+, Spark v3.1.1.
- Introduce additional storage for core nodes and expanded the fleet of task nodes
- Use larger class of instances and
- Use 10 different types of instances to increase the chances of getting spot instances.
As a result, filtering 65,000 Amazon S3 prefixes is now 540 percent faster than before, and runtime has been reduced from 27 minutes to just 30 seconds. Using the spark transformation to filter the Amazon S3 prefixes resulted in significant improvements in performance.
- Spark small files read issue: https://issues.apache.org/jira/browse/SPARK-29089