AWS Big Data Blog

Optimizing Amazon S3 for High Concurrency in Distributed Workloads

Aaron Friedman is a Healthcare and Life Sciences Solution Architect with Amazon Web Services

The healthcare and life sciences landscape is being transformed rapidly by big data. By intersecting petabytes of genomic data with clinical information, AWS customers and partners are already changing healthcare as we know it.

One of the most important things in any type of data analysis is to represent data in a cost-optimized and performance-efficient manner. Before we can derive insights from the genomes of thousands of individuals, genomic data must first be transformed into a queryable format. This information often starts in a raw Variant Call Format (VCF) file stored in an S3 bucket. To make it queryable across many patients at once, the data can be stored as Apache Parquet files in a data lake built in either the same or a different S3 bucket.

Apache Parquet is a columnar storage file format that is designed for querying large amounts of data, regardless of the data processing framework, data model, or programming language. Amazon S3 is a secure, durable, and highly scalable home for Parquet files. When using computational-intensive algorithms, you can get maximum performance through small renaming optimizations of S3 objects. The extract, transform, load (ETL) processes occur in a write-once, read-many fashion and can produce many S3 objects that collectively are stored and referenced as a Parquet file. Then data scientists can query the Parquet file to identify trends.

In today’s blog post, I will discuss how to optimize Amazon S3 for an architecture commonly used to enable genomic data analyses. This optimization is important to my work in genomics because, as genome sequencing continues to drop in price, the rate at which data becomes available is accelerating.

Although the focus of this post is on genomic data analyses, the optimization can be used in any discipline that has individual source data that must be analyzed together at scale.

Architecture

This architecture has no administration costs. In addition to being scalable, elastic, and automatic, it handles errors and has no impact on downstream users who might be querying the data from S3.

S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. Due to its underlying infrastructure, S3 is excellent for retrieving objects with known keys. S3 maintains an index of object keys in each region and partitions the index based on the key name. For best performance, keys that are often read together should not have sequential prefixes. Keys should be distributed across many partitions rather than on the same partition.

For large datasets like genomics, population-level analyses of these data can require many concurrent S3 reads by many Spark executors. To maximize performance of high-concurrency operations on S3, we need to introduce randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.

The following diagram shows the ETL process, S3 object naming, and error reporting and handling steps of genomic data. This post covers steps 3-5.

ParquetRename

1. Previously generated VCF files are stored in a S3 bucket.

2. Using Spark on Amazon EMR, the VCF files are extracted, transformed, and loaded to Parquet.

3. During Hadoop-based ETL, the Parquet files are written to S3 to a common prefix that you specify. By default, Hadoop tools write Parquet data as objects with sequential keys, for example:

s3://mybucket/prefix.parquet/part-00000.gz.parquet

s3://mybucket/prefix.parquet/part-00001.gz.parquet

Because a Parquet file is represented as a collection of S3 objects, only the object names should be changed, not the name of the file itself. Yet such S3 optimizations are moot if they add extreme complexity to the system (software complexity to handle the data) and impact the ease with which data scientists can query the data.

4. AWS Lambda and an S3 PUT event trigger the renaming of the S3 objects. This occurs in the background so there is no overhead for the data scientist.

5. Errors are sent to Amazon CloudWatch so they can be investigated and fixed. A message that contains error information is sent to an Amazon SQS queue for downstream handling.

Set up the bucket and IAM roles and permissions

First, run the following command to create a bucket that will store the Parquet files. Choose a bucket name that is easy to remember. You will refer to it in later steps.

aws s3 mb s3://<yourtestbucketname>

To set up IAM roles and permissions

  1. Sign in to the Identity and Access Management (IAM) console at https://console.aws.amazon.com/iam/.
  2. Choose Roles.
  3. In Role Name, type test-lambda-s3rename-role.
  4. In Select Role Type, choose AWS Service Roles, and then choose AWS Lambda. This grants the AWS Lambda service permissions to assume the role.
  5. In Attach Policy, choose the preconfigured AmazonSQSFullAccess policy. This will allow Lambda to access SQS queues.
  6. Choose Create Role, and then choose Inline Policy to create a custom policy. Name your policy ParquetRenameCRUD. Attach and apply the following policy. (Be sure to use the name you specified in the preceding step.)
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CRUDdatalake",
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::<yourtestbucketname>/*"
            ]
        }
    ]
}

Create your Lambda function and set up logging

In Amazon S3, a rename operation requires two steps: you copy the object to a new key and then delete the old key. If you are familiar with the AWS CLI, this is similar to:

aws s3 mv s3://bucket/old_key s3://bucket/new_key

The Lambda function must be able to receive metadata that comes from an S3 PUT event in the bucket. Amazon has provided many example templates for creating a Lambda function, and we will use the s3-get-object-python template as our base as it provides information on how to extract the appropriate key and bucket information from within the Lambda context.

  1. Open the AWS Lambda console at https://console.aws.amazon.com/lambda/.
  2. On the Select blueprint page, choose s3-get-object-python.
  3. You can ignore the fields on the Configure triggers page. Choose Next.
  4. On the Configure function page, in Role, choose the role you created earlier.
  5. In Role Name, type test-lamba-s3rename-role. (You can choose another name if you prefer.)
  6.  In the Advanced settings section, set the timeout to 1 min 0 sec.
  7. Leave the rest of the fields at their defaults, and then choose Next.
  8. Choose Create Function.

The function executes the following steps:

  1. Gets the key and bucket information from the S3 PUT request.
  2. Generates a random hash to prepend to the key.
  3. Copies the object to a new key.
  4. Deletes the old key.

In very rare cases, a copy-delete function can fail. In this Lambda function, you’ll want to ensure that the appropriate error messages are surfaced so that the errors can be handled. You can do this in two ways:

  1. Raise an exception that will be caught in CloudWatch logs.
  2. Send a message to an SQS queue that includes the S3 key and failure type. (The handling of the SQS messages is out of the scope of this post.)

Now, you need to set up the SQS queue so that the Lambda function can write to it. (Save the queue URL for later.) You can use standard settings for the queue. Type the following into your terminal to create your queue:

aws sqs create-queue --queue-name TestRenameDeleteErrors

Here is the full code for the Lambda function. If you used an SQS queue name other than TestRenameDeleteErrors, be sure to specify it here in QUEUE_NAME:


__future__ import print_function

import json
import urllib
import boto3
import os
import binascii
import time
import random

print('Loading function')

QUEUE_NAME='TestRenameDeleteErrors'

# define our S3 clients and resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
sqs = boto3.resource('sqs')


def send_delete_message_to_queue(bucket, orig_key, delete_response):
    # Get the queue and then send the message
    queue = sqs.get_queue_by_name(QueueName=QUEUE_NAME)
    response = queue.send_message(MessageBody='Delete failed for bucket:%s and key:%s' % (bucket, orig_key),
        MessageAttributes={
            'Bucket': {
                'StringValue': bucket,
                'DataType': 'String'},
            'Key': {
                'StringValue': orig_key,
                'DataType': 'String'},
            'DeleteResponse': {
                'StringValue': json.dumps(delete_response),
                'DataType': 'String'},
        })
    return response


def rename_s3_object(bucket, orig_key):
    # First, we create the new random key
    split_key = orig_key.split('/')
    random_prepend_hash = binascii.hexlify(os.urandom(6))

    if len(split_key) ==1:
        new_key = random_prepend_hash + split_key[-1]
    else:
        new_key =  '/'.join(split_key[:-1]) + '/' + random_prepend_hash + split_key[-1]

    # Next, do a copy of the original key to the new key
    try: 
        s3_resource.Object(bucket, new_key).copy_from(CopySource='/'.join([bucket, orig_key]))
    except Exception as e:
        print('Copy of key failed. Raising exception.')
        print(e)
        raise e

    # We only reach this point if the copy succeeded
    # Now, execute the delete of the original object
    response = s3_resource.Object(bucket, orig_key).delete()
    if response['ResponseMetadata']['HTTPStatusCode'] != 200:
        queue_response = send_delete_message_to_queue(bucket, key, response)
        e=Exception('Failed delete. Sent to queue with following response:\n' + json.dumps(queue_response))
        raise e

    return True
            

def lambda_handler(event, context):
    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(
        event['Records'][0]['s3']['object']['key']).encode('utf8')
    try:
        rename_s3_object(bucket, key)
    except Exception as e:
        print(e)
        print('Error renaming object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

Add S3 triggers

To add S3 triggers to invoke the Lambda function

  1. In the S3 console, navigate to the bucket you created earlier (<yourtestbucketname>) and choose Properties.
  2. Expand the Events section.
  3. Configure the event as follows:
  • For Name, type TestS3RenameTrigger.
  • For Events, select ObjectCreated:Put and ObjectCreated:CompleteMultiPartUpload.
  • For Suffix, type .parquet. We use this filter because we are interested only in these objects.
  • For Send to, select Lambda function, and then type the name of the function you just created.
  1. Choose Save. Your event should be updated.

o_OptimizingS3_2

To test that the event is configured correctly, upload a test object into the bucket:

touch part-001-test_upload.gz.parquet
aws s3 cp part-001-test_upload.gz.parquet \
  s3:///test.parquet/

If you see a renaming in the upload path, then you have successfully automated your Lambda function.

Conclusion

Congratulations! You have built a solution that is automated, serverless, scalable, and elastic! Because only the objects, not the referenceable Parquet path, were changed, downstream users should not be affected.

Interested in taking the whole system for a test drive? Combine this solution with the VCF-to-Parquet ETL process described in this post and start analyzing thousands of genomes today!

If you have questions or suggestions, please leave a comment below.

—————————–

Related

Will Spark Power the Data behind Precision Medicine?

Bioinformatics_Image_1