AWS for Industries

Acxiom’s journey on R-based machine learning models (propensity scores) at scale with trillions of outputs

Introduction

Acxiom partners with the world’s leading brands to create customer intelligence, facilitating data-driven marketing experiences that generate value for customers and for brands. As experts in identity, ethical use of data, cloud-first customer-data management, and analytics solutions, Acxiom makes the complex marketing system work, applying customer intelligence wherever brands and customers meet. By helping brands genuinely understand people, Acxiom facilitates experiences that are relevant and respectful, resulting in customers who are willing to explore new brands and stay loyal to the ones they love.

Acxiom has been playing a critical role for its clients in helping them move away from traditional modeling and isolated decisions to systems that make decisions automatically and intelligently. Acxiom is one of the leading providers of decision intelligence within the ad-tech market, which is undergoing exponential growth. Per a Gartner study, by the end of 2023, more than 33 percent of large organizations will have decision-intelligence teams. Acxiom’s data assets, analytics services, and top-notch industry-focused resources are oriented to generate maximum impact for its clients. In this blog, Acxiom shares how it creates industry-leading data assets at scale that power its decision intelligence, helping its numerous clients and brands to unlock return-on-ad tech investments.

Acxiom generates many audience-propensity scores for its brand and marketing clients using machine learning (ML) models. Acxiom’s clients use these ML-model-based propensity scores to choose audiences for their marketing campaigns. Some of these models are built in partnership between Acxiom and a client using the R programming language. Every month, new models are built by Acxiom’s partner and scored by Acxiom. Several times throughout the year, inference scores must be additionally generated for all these active models, creating over one trillion outputs from a catalog of over 600 models. This blog post talks about Acxiom’s journey (challenges and learning) in running R-based propensity models at scale with trillions of outputs in one month on Amazon Web Services (AWS).

Goals

Acxiom’s goals for migration to AWS were primarily centered around four objectives:

  1. Reduce the turnaround time to score these models
  2. Scale the model-scoring capability to handle more models per month
  3. Reduce operational overhead
  4. Support increased model complexity and a wider range of analytic techniques

The third objective was particularly important for the times that Acxiom scored its over 600 models on a fairly large universe file. To achieve these objectives, Acxiom’s solution uses a combination of Amazon EMR, an industry-leading cloud big data solution, Amazon Simple Storage Service (Amazon S3), an object storage service, and Amazon Redshift, which uses SQL to analyze structured and semi-structured data, with the bulk of the workload being implemented on Amazon EMR.

Challenges to Solve

The R language is designed primarily for single-threaded, in-memory workflows, and the models can only be scored using native R code, specifically by the caret R package, creating a fundamental scalability issue. Acxiom’s internal implementation used Apache Hadoop streaming and Apache MapReduce to orchestrate running native R processes across a cluster. Though this is a functional solution, this approach had several drawbacks:

  1. Acxiom’s internal infrastructure and the associated clusters were primarily built & tuned for data engineering workloads and not for memory intensive R model scoring workloads. This is a unique problem with specific needs that on-premises infrastructure was not designed to handle.
  2. The clusters used in Acxiom’s implementation were shared resources that Acxiom’s other internal teams also used for different applications, and the cluster configurations were not optimized for running native R code. Consequently, the cluster was also limited in the resources that it could request per job, forcing Acxiom to break apart the R job workflows into many smaller, serially initiated batches. This created a large operational overhead that was difficult to automate and increased turnaround time.
  3. Operations that required more than one record at a time, such as grouping and aggregation, could not be easily implemented directly in the R code because each R process saw only one partition of data at a time. Instead, separate MapReduce and HiveQL jobs were used for these operations, increasing complexity and overhead.
  4. The environment supported a prior version of the operating system with isolated network access, which made it a labor-intensive process to compile and install R packages across the cluster nodes. This slowed down platform support for changing business requirements and prevented Acxiom from scoring models using new modeling techniques.
  5. It was possible for a user to submit large or multiple jobs to the cluster, overwhelming it. Because the cluster is shared—and therefore had varying usage at any given time—estimating the amount of work that was safe to submit was difficult.
  6. If any models within a batch failed, the log messages for all models in that batch had to be parsed to identify the issue.

To resolve these issues, Acxiom decided to reimplement the R-scoring solution using the Apache Spark framework and by leveraging highly scalable, and available Amazon EMR infrastructure. A job that would freeze up for no apparent reason in the internal environment, without any log messages to indicate the problem, would run without issue in Amazon EMR using an identical configuration. Upgrading the on-premises software would require a large undertaking and IT investment that the Acxiom team could not wait for. To solve the challenges outlined, Acxiom decided to migrate the workload entirely from on premises to AWS.

Datasets

Input file

Acxiom scores all its models on a refreshed universe each month, and this is done using a bz2 compressed, pipe-separated values universe file. This universe file contains approximately 610 million records and over 2,500 columns. A 250-million-record subset of this universe is considered very high quality; these records are treated differently in some aspects of the business logic. This logic contributed to the operational overhead in Acxiom’s internal solution, which the company was able to simplify into the singular process in Amazon EMR.

Preparing the input file for a model

The input has raw values that are not necessarily suitable for modeling or accepted by the model objects. Each model requires a subset of the over 2,500 columns. For any given column, there are multiple definitions for transforming that column into something suitable for a model, with the definition varying for each model. New definitions are created over time when new analytic or modeling techniques are applied. The definition that should be used for any given model is tied to that specific model and that specific model version; updates to that model create a new model version that might use a different set of transformations.

Because Acxiom’s process must also scale to handle these various transformations, it wraps each model object with the code that will apply the specific transformations that the model needs. This is more operationally and cost efficient than creating a separate pre-transformed file for each set of definitions every scoring cycle and mapping that to the model versions. The trade off is that the scoring process takes more time, and there can be duplication of work if multiple models use the same column and the same transformation.

To improve runtime efficiency, a Spark job first converts the universe file from text to parquet format and stores it on Amazon S3. Due to the varying transformation definitions, the infer schema option is set to False, preserving all columns as strings so that the appropriate type casts and defaulting rules can be applied at scoring time.

Output file

Whether Acxiom is scoring its over 600 models or only a new batch of models received that month, it creates five outputs for each of the records in the universe. The first output is the probability score created by the model object. The remaining four outputs are quantiles, each of which is constructed in different ways on top of the output score distribution. Therefore, at full scale, 600 models x 5 outputs per model x 610 million records produces over 1.8 trillion outputs, not including the columns that are passed through from the input to the output, such as the primary key.

The output file needs to be in rectangular format, where each output record corresponds to a single input record and each column corresponds to one of the model outputs. This requirement will create an additional processing step of merging all the model outputs onto a single file, which Acxiom addresses in a workflow that occurs after model scoring is complete. This workflow takes advantage of Amazon Redshift.

Initial solution design

One of the first decisions one faces when migrating a workload to AWS is selecting which of the many AWS services to use. Considering Acxiom’s background in Hadoop and the ability of R to use Apache Spark, the natural choice was for Acxiom to use Amazon EMR.

The next question was how to determine an optimal way to orchestrate Acxiom’s scoring jobs within Amazon EMR. Each model is fully independent of the others; models can be scored in a parallel way with each model as its own Spark application. For job scheduling, Acxiom implemented each application as an Amazon EMR step, where the step would initiate the R code that was necessary to score that model. This allowed Acxiom to submit models to a cluster in a queue; the cluster terminates automatically when it finishes processing all the models assigned to it. Acxiom used Apache Airflow running on an instance in Amazon Elastic Compute Cloud (Amazon EC2), a broad and deep compute platform, as the orchestration layer for starting and submitting the steps to the cluster. Amazon Managed Workflows for Apache Airflow (Amazon MWAA), which orchestrates workflows using directed acyclic graphs (DAGs) written in Python, is a new offering that was unavailable when Acxiom began this journey, so it did not use the service for this solution.

The high-level architecture is diagrammed below: the scoring universe and models are uploaded to AWS, the models are scored using Amazon EMR, the separate models are joined back together to a single file using Amazon Redshift, and the output data is brought back for delivery to various platforms.

Amazon EMR uses three types of nodes: driver, core, and task. Due to Acxiom’s R-model implementation memory requirements, it used memory-optimized instance types (for example, r5 and r5a) in its instance-fleet configurations. To save money, Amazon EC2 Spot Instances can be purchased instead of On-Demand. Acxiom’s implementation used On-Demand Instances for the driver and core nodes, and Spot Instances were used for the task units. This setup is successful and one that Acxiom continues to use today, but it contributed to some of the challenges in Acxiom’s initial design.

Successes of initial design

Although this solution was not optimal, it was successful in solving Acxiom’s short-term goals. Indeed, Acxiom implemented the above framework successfully for some time before running into difficulties. Acxiom was able to successfully:

  1. migrate workflows into AWS, reducing its on-premises compute footprint
  2. significantly reduce the operational overhead and eliminate several mistake-prone steps
  3. speed up the per-model scoring time by a factor of two, holding model complexity constant
  4. allow support for additional model types with increasing complexity
  5. add quality assurance checks at a model scoring time, reducing the time to recovery in case of a failure or algorithmic error
  6. reduce the near-real-time consumption over 70 percent, including file transfers and quality assurance

Challenges with initial design

Although the above high-level design is sound, the specific design choices that Acxiom made around orchestrating the Amazon EMR tasks proved to be more challenging. Specifically, it was critical to determine (1) the number of steps Acxiom should run in parallel on a cluster and (2) the size and hardware composition of the cluster. It is at this point that Acxiom decided upon a solution that is all too common in an on-premises environment but is not a cloud-native methodology: scaling vertically, not horizontally, to put as much workload on a fixed set of resources as possible. It is this crucial decision that will separate Acxiom’s initial design from what it is running today, and where a lot of Acxiom’s learnings originate.

Acxiom also had to answer the question of how the Spark applications should be configured in terms of resource usage, particularly memory. For example, some of the model types are simple and require a few dozen input columns; other models are complex and require hundreds of raw or transformed features, leading to vastly different memory profiles from Spark application to Spark application. Most of the models were of the simpler type at the beginning of Acxiom’s implementation journey, which made this difference go unnoticed, but as time went on, the average model complexity increased to provide better predictive power. This variation from model to model complicates enhancements to Acxiom’s architecture, because it will have to separate issues related to design choices from issues related to Spark configurations. A cloud-native design from the get-go would have made this easier, perhaps even unnecessary.

This design fundamentally relies on a cluster that can optimize model throughput using a high-concurrency setting for the Amazon EMR steps. This creates a vertical scaling challenge: the bigger the cluster, the higher the throughput. To maximize throughput, Acxiom would set the concurrency to between 12 and 15 steps at a time. Each step needs a certain amount of computing and memory to score the full universe, so Acxiom would size the cluster between 38,000 and 48,000 cores in the task fleet. This resulted in numerous issues that were not easy to separate from each other until Acxiom understood the cause:

  1. At times, the multiple steps would coincidentally line up and perform similar work, such as shuffle operations, at the same time. The driver node could become overwhelmed with the workload, even when sized at 24xlarge, resulting in numerous failures.
    1. The YARN service could temporarily fail, canceling all jobs running on the cluster at that time and failing the running Amazon EMR steps.
    2. One or more of the steps could enter a zombie state, where the Spark application has either finished or failed, but the driver would never catch up to the real state of the application. The result was the Amazon EMR step being in running status indefinitely, even though the underlying job was done.
    3. The Spark Context for a step could be disconnected. Sometimes this occurred even if the R code and Spark app were both running, resulting in failed steps and orphaned applications that would not properly shut down without manual intervention.
    4. Ganglia would become overwhelmed, making it difficult to collect usage metrics.
  2. Segmentation faults could occur due to low memory. At first, this was believed to be related only to the model complexity and that increasing per-model memory would solve the issue. That helped but only some of the time. Because multiple models with different memory requirements could be running simultaneously, finding a configuration that worked across the lifetime of the cluster was difficult.
  3. Because Acxiom was reading and writing data from and to Amazon S3, it did not allocate much disk space or use disk-attached Amazon EC2 instances. Too many log messages could sometimes be generated, causing the driver to run out of disk space. Acxiom solved this by simply increasing the volume size of the Amazon Elastic Block Store (Amazon EBS)—an easy-to-use, scalable, high-performance block-storage service designed for Amazon EC2—but it was a difficult issue to discover due to the other issues usually occurring first.
  4. An Amazon EMR cluster resides in a single Availability Zone (AZ). Having such a large Spot Instance fleet made the cluster vulnerable to spot reclamations. Though Spark is resilient and could recover from this, a spot reclamation would set back all running models, increasing the likelihood of an overloaded driver. Additionally, because Acxiom was scaling vertically, the cluster would be running for longer periods to process more models, making it more likely that a spot reclamation would occur.

Though some of these issues appear serious, a common mechanism for resolving errors is to do a retry. Retrying allowed Acxiom to recover from many of the errors. However, retrying is meant to resolve sporadic issues, not systemic ones. Ultimately, the retry mechanism only hid some of these issues, and a retry would cost more time and money compared to a first-time successful implementation. During a particularly difficult cycle, Acxiom encountered all these challenges at once, increasing Acxiom’s per-model cost by a factor of three. It was this tipping point that made the company reevaluate the design.

Refining the design

A cloud-native design uses horizontal scaling whenever possible, and Acxiom believed this was the optimal approach: increasing the size of the cluster seemed equivalent to horizontal scaling. Indeed, increasing the size of an instance fleet is a form of horizontal scaling. However, this ignores the now-obvious limitations: the cluster is still a single entity, it resides within a single AZ, and the driver node is still a single point of contention. Amazon EMR does support a multidriver setup, and that might have been able to resolve some of Acxiom’s challenges, but it would still not fundamentally resolve the design issue of putting as much workload on a single cluster as possible.

Acxiom’s new design started by scaling back the initial design, reducing the concurrency to only a handful of steps. This resolved most of the issues but not all. During debugging, Acxiom had run a cluster with single-step concurrency, and apart from a few models that had a higher complexity and needed a new memory configuration, everything ran successfully. Recognizing the direction, Acxiom decided to abandon the concurrent-step solution altogether—a cluster would run one model at a time only.

The question was then how to maintain the model throughput to meet Acxiom’s service-level agreements (SLAs). A huge 48,000-core cluster theoretically can process each model 12–15 times faster with only one model running on it at a time, but in practice a single model couldn’t generate the workload necessary to saturate the entire cluster. With CPU underusage, Acxiom saw gains of only 3–5 times speedup—not enough to justify the high cost of a large cluster. A single large cluster would also not solve the Spot Instance or single-AZ limitations.

The solution Acxiom arrived at was running several smaller clusters in parallel. After some tuning, Acxiom settled on a size of about 8,000 cores per cluster. Because cluster creation was automated with Airflow, the operational overhead with this solution only minimally increased. The output location for multiple clusters could be configured to have the same Amazon S3 prefix, allowing for the downstream Amazon Redshift job to pick up the results from multiple clusters at a time. Using multiple clusters allows Acxiom to take advantage of multiple AZs and reduce the total time that each cluster is running, improving Spot Instance usage. The architecture is diagrammed below.

With this solution, Acxiom arrived at a fully scalable answer with only one of the challenges from before: some models are more complex than others and require a greater amount of memory per core. It isn’t trivial to determine how much memory a model needs: the model algorithm, number of features, and feature data types (numeric versus categorical) all contribute.

Acxiom wanted a Spark memory configuration that would work for all models. Too much memory per core and there is CPU underusage; too little and the application fails. The number of models with a high-memory requirement is small compared to the size of the catalog. To keep both operational overhead and cost low, Acxiom resolved the issue by implementing a retry mechanism: all models will initially run with a memory-to-CPU ratio that maximizes the CPU usage on the cluster. If a model fails, it is automatically restarted with a greater memory-to-CPU ratio. This results in the underusage of CPUs for those models, but it does not impact any other models, and crucially, all the models can run successfully without any human intervention.

Successes of the second design

With this refined architecture, Acxiom can retain all the advantages from before with virtually none of the challenges. The company has realized faster SLAs, predictable costs, and low operational overhead. With this new design, Acxiom observed the following improvements:

  1. The average scoring time per model was reduced by 50 percent compared to the initial design, including the time spent in retries.
  2. The maximum number of retries needed to score memory-intensive models was reduced from up to six retries to exactly one.
  3. Consistent runtimes prevented cost overruns from occurring and budgeting became more predictable and reliable.
  4. The necessity for human intervention for troubleshooting was reduced from at least several hours per month to near zero, with nearly all months needing no troubleshooting.
  5. The quality assurance jobs that Acxiom runs on a smaller file before scoring the full universe became reliable in predicting the success or failure of at-scale production jobs; no longer is the scale of the scoring a cause for unreliability.

Amazon Redshift

After all the Amazon EMR steps are complete and the models are scored, Acxiom has a separate parquet file for each model saved in Amazon S3. The next challenge was to figure out a way to combine all this data back onto a single file. If there are 600 models, that would be a 600-way inner join—not a computationally trivial task.

The solution is to not use a join operation at all. Instead, there are three other operations that can perform the same task:

  1. melting, also known as pivot longer or unpivoting, each parquet file from a wide format to a three-column-long format with the schema: primary key (of the original wide file, keys are duplicated in the output), variable, value. In this schema, multiple records are used in the tall format to represent a single record in the wide format.
  2. union the tables together because they now all share the same schema
  3. casting, also known as pivot wider or pivoting, the whole table into a traditional one-column-per-field format

Acxiom initially evaluated Spark for this operation, and though it can melt the data easily enough, it could not pivot the data back in a performant manner. Amazon Redshift, being an SQL warehouse engine, can run the pivot quickly. Using a fleet of 32 dc2.8xlarge Instances, the pivot operation can run in under two hours on a full universe file with a few hundred models. Amazon Redshift does have a hard limit of 1,600 columns per table, so not all 600 models can be pivoted at a single time. However, Amazon Redshift can run in batches to create two or three files that can still be joined quickly downstream.

Key takeaways

Acxiom’s migration to AWS was not a lift-and-shift event, but a journey that taught the company many important lessons.

First, choosing the right AWS service for the job is only the beginning of the journey. Iterate your design as you discover more about your workload. Choose the components or services that are best suited for the specific challenges that can enhance your outcomes. Choosing Amazon EMR and Amazon EMR steps to orchestrate Spark applications helped Acxiom to meet or exceed nearly all SLAs and reduce operational overhead. Acxiom initially tried to use Spark for the pivot operation but discovered that Amazon Redshift was a much more performant solution for SQL-heavy pivot work. Because the architecture is decoupled using Airflow as an orchestration layer, extending the pipeline to include Amazon Redshift was a feasible development that Acxiom accomplished early in its migration journey.

Second, patterns from on premises can sneak into a cloud solution without one realizing it. Horizontal scaling is a cloud-native pattern, but Acxiom’s initial design put too much horizontal scaling in the wrong place, effectively turning it into vertical scaling. Switching from a single-cluster design to multiple clusters facilitated true horizontal scaling. Separating tasks that don’t interfere with each other is a crucial advantage of this design.

Third, retry mechanisms are a valid way of handling sporadic errors, but one must be careful not to overlook systemic problems that retries can partially cover up. If retries are occurring frequently or due to fundamentally different types of issues (network time-out versus segmentation faults, for example), that’s a sign of a larger issue that should be addressed.

Fourth, using an orchestration layer, such as Airflow, eases operational overhead considerably. Even though we changed how Acxiom used Amazon EMR during this process, neither the user application nor any of the automation had to be changed to facilitate this design refinement.

Conclusion

Using AWS services, Acxiom has transformed its model-scoring pipeline. Previously, it was a manual process involving several error-prone steps that could occasionally negatively impact resources and other competing applications on Acxiom’s shared clusters.

In AWS, Acxiom doubled its scoring speed, reduced the runtime for the pipeline from seven days to two days, and allowed its clients to build models using state-of-the-art algorithms.

Acxiom’s first solution might not have been perfect, but it provided a solid foundation for the truly cloud-native solution that the company uses today, and that was accomplished through architectural iterations and experimentations.

James Palmer

James Palmer

James Palmer is a Sr. Data Scientist at Acxiom. He operationalizes partner built models at scale, develops new model and data solutions, and supports engineering and deployment efforts across the team. In his free time, he enjoys hiking, bowling, playing video games, and listening to electronic music.

Varadarajan Srinivasan

Varadarajan Srinivasan

Varadarajan "Raj" Srinivasan is a Vice President of Engineering at Acxiom. He leads the Data Science and ML engineering practice at Acxiom focusing on developing large scale ML solutions and data products. In his free time, he enjoys crossword puzzles, traveling, watching sci-fi movies, spending time with family, and listening to music.

Abhijit Rajeshirke

Abhijit Rajeshirke

Abhijit Rajeshirke is a Solutions Architect for the Enterprise customers at AWS. He enjoys providing architectural guidance to AWS customers and building solutions. Outside of work, he enjoys taking long mindful walk on any available track or trails.

Jitesh Kumar

Jitesh Kumar

Jitesh Kumar is a Senior Customer Solutions Manager with Amazon Web Services. In his role, Jitesh provides expert guidance and technical direction to deliver business value to customers cloud transformation journeys.

Praful Kava

Praful Kava

Praful Kava is a Sr. Specialist Solutions Architect at AWS. He guides customers to design and engineer Cloud scale Analytics pipelines on AWS. Outside work, he enjoys travelling with his family and exploring new hiking trails.