AWS Architecture Blog
How Nielsen uses serverless concepts on Amazon EKS for big data processing with Spark workloads
Nielsen Marketing Cloud, a leading ad tech company, processes in one of their pipelines 25 TB of data and 30 billion events daily. As their data volumes grew, so did the challenges of scaling their Apache Spark workloads efficiently.
Nielsen’s team faced a scenario in which, as they scaled up their cluster by adding more instances, the performance per instance degraded. The degradation resulted in a decrease in the amount of work done per hour by each instance, and drove costs per GB of data processed up.
Furthermore, they encountered occasional data skew issues. Data skew, where data is unevenly distributed across partitions, created processing bottlenecks and further reduced cluster efficiency. In extreme cases, these combined factors led to cluster failures.
In this post, we follow Nielsen’s journey to build a robust and scalable architecture while enjoying linear scaling. We start by examining the initial challenges Nielsen faced and the root causes behind these issues. Then, we explore Nielsen’s solution: running Spark on Amazon Elastic Kubernetes Service (Amazon EKS) while adopting serverless concepts.
Evolving from a Spark cluster to Spark pods on Amazon EKS
Nielsen’s Marketing Cloud architecture began as a typical Spark cluster on Amazon EMR, receiving a constant stream of files of varying sizes to process. As both data volume and cluster size grew, the team noticed a degradation in performance per instance, as illustrated in the following graphs. Beyond the slower processing and the higher costs, Nielsen occasionally suffered production issues caused by data skew.
The team realized the problem was the growing number of remote shuffles between instances as the cluster grew. Remote shuffle, a process in Spark where data is redistributed across partitions, involves significant data transfer over the network and can become a major bottleneck. Due to the streaming nature of the data in their scenario, Nielsen realized they could instead process data in smaller batches. This meant they didn’t have to lean on the distributed processing capabilities of Spark by using large Spark clusters, and opt for small ones instead.
To address the performance degradation, the team decided to change its growth strategy: instead of scaling up their single Spark cluster, they scaled out using multiple local mode Spark clusters (a single node cluster) running on Amazon EKS. When compared to Spark cluster mode, local mode provides better performance for small analytics workloads. Each local mode is running a limited, smaller amount of data, requiring no remote shuffle and no interaction with other Spark instances.
Moreover, the pods running on Amazon EKS can scale up and down based on the amount of pending work, meaning Nielsen could stop resources when they are not needed.
The new solution scales linearly, is 55% cheaper, and handles data faster, even under large burst conditions.
Why shuffle matters
Remote shuffle is triggered when data needs to be exchanged between Spark instances. Some transformations, like join or repartition, necessitate a shuffle of data. Remote shuffle is an order of magnitude slower than in-memory computations because it requires moving data over the network. It could slow down processing significantly, sometimes adding 100–200% to the total processing time.
The problem Nielsen ran into was that as cluster size grew, the amount of data shuffled grew proportionally to the cluster size. The following graph shows why this happens. It calculates the amount of data exchanged for a randomly distributed dataset as cluster size grows.
The following graph illustrates that the correlation is to the size of the cluster and not to the size of the data.
Addressing shuffle
The team hypothesized that minimizing shuffle could lead to substantial performance improvements. Nielsen’s engineers decided to implement ideas from serverless patterns by drastically reducing the size of each cluster to a minimum while at the same time adding more of these smaller clusters to compensate for the lower capacity of each one. This approach promised to eliminate remote shuffle entirely for each data work item, as illustrated in the preceding graph.
Although this strategy promised performance gains, it also introduced a constraint: a limit on the amount of data per work item.
Designing the new system based on serverless patterns
Nielsen’s team developed a new architecture that uses two core concepts:
- A queue of work items to pull from
- A group of local mode Spark modules pulling work items from the queue
They had the following design goals:
- Keep the Spark modules busy at all times
- Stop modules when not needed
- Make sure all work items are processed successfully
The following diagram illustrates the workflow.
Final design
The final design includes the following components:
- File metadata storage – An Amazon Relational Database Service (Amazon RDS) cluster runs the PostgreSQL engine to store and manage statistics about each file entering the system.
- Work manager – An AWS Lambda function is used to periodically pull waiting files from the database, prepare work items comprised of one or multiple files, and publish the work items to an Amazon Simple Queue Service (Amazon SQS) message queue.
- Work queue – An SQS message queue is used for work items waiting to be pulled for processing.
- Processing units – Local mode Spark instances run as pods on an EKS cluster. They pull work items from the SQS queue. As long as there are waiting work items in the queue, the pods are constantly busy.
- Metrics adaptor – An adaptor (Kubernetes-cloudwatch-adapter) provides Amazon CloudWatch metrics to the Kubernetes Horizontal Pod Autoscaler.
- Kubernetes Horizontal Pod Autoscaler – Horizontal Pod Autoscaler (HPA) uses a scaling rule to scale pods up or down based on the metrics from CloudWatch. It scales according to the number of messages (work items) visible in the queue, which are proportional to the work waiting to be processed. In Nielsen’s system, HPA scales the pods by targetValue = {SQS length/2}. .
- Work completion queue – A second SQS message queue is used for reporting completion of work items. The completions get pulled by another Lambda function and get updated in the PostgreSQL database.
The following diagram illustrates the architecture of the final system.
Analyzing the results
The following graphs demonstrate the EKS pods scaling based on the amount of work items. The active pods pick up new work items as soon as they finish their previous ones.
The following graph shows a large burst of data coming in. The system reacts quickly and scales up to process the added work. It quickly scales down when work is complete.
Analyzing the performance achieved per instance, the new system demonstrated a significant improvement. Performance per instance increased by approximately 130% while growing linearly and maintaining close to constant costs per GB processed.
The comparison of performance between the new system and the old system can be seen in the following graph.
The new system’s costs are 55% lower for the same amount of data processed.
The following graphs compare the costs before and after the implementation.
Conclusion
Nielsen’s journey from a traditional architecture to a serverless-inspired architecture on Amazon EKS exemplifies the power of rethinking established patterns in big data processing.
By addressing the core challenges of data shuffle and scaling, Nielsen not only achieved performance gains and cost reductions, but also demonstrated the potential for linear scaling in large-scale data operations.
If you have big data processing jobs that that can be broken down into many independent small parts, consider using similar ideas over Amazon EKS to achieve linear scaling and large cost savings.
This post was copyedited for grammar, spelling, capitalization, punctuation, terminology, and legal issues. Other important issues are noted in comments, and you should consider revising the content accordingly before publication.