AWS Compute Blog

Ad Hoc Big Data Processing Made Simple with Serverless MapReduce

September 8, 2021: Amazon Elasticsearch Service has been renamed to Amazon OpenSearch Service. See details.


Sunil Mallya
Solutions Architect

Big data processing solutions have been using AWS Lambda more lately; customers have been creating solutions such as building metadata indexes for Amazon S3 using Lambda and Amazon DynamoDB and stream processing of data in S3. In this post, I discuss a serverless architecture to run MapReduce jobs using Lambda and S3.

Big data AWS services

Lambda launched in 2015, enabling customers to execute code on demand without any dedicated infrastructure. Since then, customers have successfully used Lambda for various use cases like event-driven processing for data ingested into S3 or Amazon Kinesis, web API backends, and producer/consumer architectures, among others. Lambda has emerged as a key component in building serverless architectures for these architecture paradigms.

S3 integrates directly into other AWS services, such as providing an easy export facility into Amazon Redshift and Amazon Elasticsearch Service, and providing an underlying file system (EMRFS) for Amazon EMR, making it an ideal data lake in the AWS ecosystem.

Amazon Redshift, EMR and the Hadoop ecosystem offer comprehensive solutions for big data processing. While EMR makes it easy, fast and cost-effective to run Hadoop clusters, it still requires provisioning servers, as well as knowledge of Hadoop and its components.

Serverless MapReduce overview

I wanted to extend some of these solutions and present a serverless architecture along with a customizable framework that enables customers to run ad hoc map reduce jobs. Apart from the benefit of not having to manage any servers, this framework was significantly cheaper and, in some cases, faster than existing solutions when running the well-known big data processing Amplab benchmark.

This framework allows you to process big data with no operational effort and no prior Hadoop knowledge. While Hadoop is the most popular big data processing framework today, it does have a steep learning curve given the number of components and their inherent complexity. By minimizing the number of components in the framework and abstracting the infrastructure management, it simplifies data processing for developers or data scientists. They can focus on modeling the data processing problem and deriving value from the data stored in S3.

In addition to reduced complexity, this solution is much cheaper than existing solutions for ad hoc MapReduce workloads. Given that the solution is serverless, customers pay only when the MapReduce job is executed. The cost for the solution is the aggregate usage cost of Lambda and S3. Given that both the services are on-demand, you can compute the cost per query, a feature that’s unique to the solution. This is extremely useful as you can now budget your data processing needs precisely to the query.

For customers who want enhanced security, they can process the data in a VPC by configuring the Lambda functions to access resources in a VPC and creating a VPC endpoint for S3. There are no major performance implications of running multiple variants of the same job or different jobs on the same dataset concurrently. The Lambda function resources are independent, thus allowing for fast iterations and development. This technology is really exciting for our customers, as it enables a truly pay-for-what-you-use cost effective and secure model for big data processing.

Reference architecture for serverless MapReduce

The goals are to:

  • Abstract infrastructure management
  • Get close to a “zero” setup time
  • Provide a pay-per-execution model for every job
  • Be cheaper than other data processing solutions
  • Enable multiple executions on the same dataset

The architecture is composed of three main Lambda functions:

  • Mapper
  • Reducer
  • Coordinator

The MapReduce computing paradigm requires maintaining state, so architecturally you require a coordinator or a scheduler to connect the map phase of the processing to the reduce phase. In this architecture, you use a Lambda function as a coordinator to make this completely serverless. The coordinator maintains all its state information by persisting the state data in S3.

Execution workflow:

  • The workflow starts with the invocation of the driver script that reads in the config file, which includes the mapper and reducer function file paths and the S3 bucket or folder path.
  • The driver function finds the relevant objects in S3 to process by listing the keys and matching by prefix in the S3 bucket. The keys are aggregated to created batches, which are then passed to the mapper. The batch size is determined by a simple heuristic that tries to achieve maximum parallelism while optimizing the data fit based on the mapper memory size.
  • Mapper, reducer, and coordinator Lambda functions are created and the code is uploaded.
  • A S3 folder called job folder is created as a temporary workspace for the current job and is configured as an event source for the coordinator.
  • The mappers write their outputs to the job folder; after all the mappers finish, the coordinator uses the aforementioned logic to create batches and invoke the reducers.
  • The coordinator is notified through the S3 event source mapping when each of the reducers end and continues to create subsequent stages of reducers until a single reduced output is created.

The following diagram shows the overall architecture:

Getting started with serverless MapReduce

In this post, I show you how to build and deploy your first serverless MapReduce job with this framework for data stored in S3. The code used in this post, along with detailed instructions about how to set up and run the application, is available in the awslabs lambda-refarch-mapreduce GitHub repo.

Use the dataset generated by Intel’s Hadoop benchmark tools and data sampled from the Common Crawl document corpus also used by the Amplab benchmark.

For the first job, you compute an aggregation (i.e., sum of ad revenue for every source IP address) on the dataset of size 25.4GB and 155 million individual rows.

Dataset:

s3://big-data-benchmark/pavlo/text/1node/

Schema in CSV:

Each row of the Uservisits dataset is composed of the following:

sourceIP VARCHAR(116)
destURL VARCHAR(100)
visitDate DATE
adRevenue FLOAT
userAgent VARCHAR(256)
countryCode CHAR(3)
languageCode CHAR(6)
searchWord VARCHAR(32)
duration INT

SQL representation of the intended operation:

SELECT SUBSTR(sourceIP, 1, 8), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 8)

Prerequisites

You need the following prerequisites for working with this serverless MapReduce framework:

  • AWS account
  • S3 bucket
  • Lambda execution role
  • IAM credentials with access to create Lambda functions and list S3 buckets

Limits

In its current incarnation, this framework is best suited for ad hoc processing of data in S3. For sustained usage and time sensitive workloads, Amazon Redshift or EMR may be better suited. The framework has the following limits:

  • By default, each account has a concurrent Lambda execution limit of 100. This is a soft limit and can be increased by opening up a limit increase support ticket.
  • Lambda currently has a maximum container size limit of 1536 MB.

Components

The application has the following components:

  • Driver script and config file
  • Mapper Lambda function
  • Reducer Lambda function
  • Coordinator Lambda function

Driver script and config file

The driver script creates and configures the mapper, reducer, and coordinator Lambda functions in your account based on the config file. Here is an example configuration file that the driver script uses.

  sourceBucket: "big-data-benchmark",
  jobBucket: "my-job-bucket",
  prefix: "pavlo/text/1node/uservisits/",
  region: "us-east-1",
  lambdaMemory: 1536,
  concurrentLambdas: 100,
  mapper: {
        name: "mapper.py",
        handler: "mapper.handler",
        zip: "mapper.zip"
    },
  reducer:{
        name: "reducer.py",
        handler: "reducer.handler",
        zip: "reducer.zip"
    },
  reducerCoordinator:{
        name: "reducerCoordinator.py",
        handler: "reducerCoordinator.handler",
        zip: "reducerCor.zip"
    }

Mapper Lambda function

This is where you perform your map logic; the data is streamed line by line into your mapper function. In this function, you map individual records to a value or as an optimization; also, you need to perform the first reduce step especially when computing aggregations. This is efficient given that you may be reading from multiple input sources in the mapper. In this example, the mapper maps the sourceIP on to adRevenue and stores in a dictionary a running total of the adRevenue of every sourceIP.

    # Download and process all keys

    for key in src_keys:
        response = s3_client.get_object(Bucket=src_bucket, Key=key)
        contents = response['Body'].read()
        for line in contents.split('\n')[:-1]:
            line_count +=1
            try:
                data = line.split(',')
                srcIp = data[0][:8]
                if srcIp not in totals:
                    totals[srcIp] = 0
                totals[srcIp] += float(data[3])
            except Exception, e:
                print e

Reducer Lambda function

The mapper and reducer functions look identical structurally, but perform different operations on the data. The reducer keeps a dictionary of aggregate sums of adRevenue of every sourceIP. The reducer is also responsible for the termination of the job. When the final reducer runs, it then reduces the intermediate results into a single file stored in the job bucket.

    # Download and process all mapper output

    for key in reducer_keys:
        response = s3_client.get_object(Bucket=job_bucket, Key=key)
        contents = response['Body'].read()
        try:
            for srcIp, val in json.loads(contents).iteritems():
                line_count +=1
                if srcIp not in results:
                    results[srcIp] = 0
                results[srcIp] += float(val)
        except Exception, e:
            print e

Coordinator Lambda function

The coordinator function keeps track of the job state with the job bucket as a Lambda event source. It is invoked every time a mapper or reducer finishes. After all the mappers finish, it then invokes the reducers to compute the final output.

The pseudo code for the coordinator looks like the following:

If all mappers are done:
    If currently in the reduce phase:
            number_of_reducers = compute_number_of_reducers(previous_reducer_step_outputs)
Else:
    number_of_reducers = compute_number_of_reducers(mapper_outputs)
    If  number_of_reducers == 1:
       Invoke single reducer and write the results to S3
Job done;
    Else create event source for reducer
Else:
    Return

The coordinator doesn’t need wait for all the mappers to finish in order to start the reducers, but for simplicity, the first version of the framework chooses to wait for all mappers.

Running the job

The driver function is the interface to start the job. It reads the job details like the mapper and reducer code source to create the Lambda functions in your account for the serverless MapReduce job.

Execute the following command:

$ python driver.py

Intermediate outputs from the mappers and reducers are stored in the specified job bucket and the final result is stored as JobBucket/JobID/result. The contents of the job bucket look like the following:

smallya$ aws s3 ls s3://JobBucket/py-bl-1node-2 --recursive --human-readable --summarize

2016-09-26 15:01:17   69 Bytes py-bl-1node-2/jobdata
2016-09-26 15:02:04   74 Bytes py-bl-1node-2/reducerstate.1
2016-09-26 15:03:21   51.6 MiB py-bl-1node-2/result
2016-09-26 15:01:46   18.8 MiB py-bl-1node-2/task/
….
smallya$ head –n 3 result

67.23.87,5.874290244999999
30.94.22,96.25011190570001
25.77.91,14.262780186000002

Cost Analysis

The cost for this job is 2.49 cents, which processed over 25 GB of data and took less than 2 minutes. The majority of the cost can be attributed to the Lambda components and the mapper function in particular, given that the map phase takes the longest and is the most resource-intensive.

A component cost breakdown for the example above is plotted in the following chart.

The cost model is shown to scale almost linearly when the same job is run for a bigger dataset (126.8 GB, 775 million rows) for the Amplab benchmark (more details in the awslabs lambda-refarch-mapreduce GitHub repo), costing around 11 cents and executing in 3.5 minutes.

Summary

In this post, I showed you how to build a simple MapReduce task using a serverless framework for data stored in S3. If you have ad hoc data processing workloads, the cost effectiveness, speed, and price-per-query model makes it very suitable.

The code has been made available in the awslabs lambda-refarch-mapreduce GitHub repo . You can modify or extend this framework to build more complex applications for your data processing needs.

If you have questions or suggestions, please comment below.