AWS Storage Blog

Analytical processing of millions of cell images using Amazon EFS and Amazon S3

Analytical workloads such as batch processing, high performance computing, or machine learning inference often have high IOPS and low latency requirements but operate at irregular intervals on subsets of large datasets. Typically, data is manually copied between storage tiers in preparation of processing, which can be cumbersome and error-prone. Given this, IT teams want to store or archive data but still have easy access for fast processing on file systems in a cost-effective way. They want an automated way to temporarily “push” object data to a transient file system, that is a file system on which data is stored only for the duration of processing, and propagate data updates back to the permanent object store once processing completes.

This push mechanism can be achieved easily with a serverless workflow managing copy operations between Amazon Simple Storage Service (Amazon S3) as permanent data store and Amazon Elastic File System (Amazon EFS) as the file system for the duration of data processing.

In this blog, we discuss key considerations for building such a transient file system and provide a sample implementation. We demonstrate the solution with a reference implementation of a Cell Painting workflow used to process terabytes of cell microscopy images. Transient file systems combine the scale and price advantage of object storage with the higher IOPS and low latency of a shared file system to create cost-efficient and easy to maintain data solution for your batch analytics use case.

Accelerating drug discovery and development with cell imaging at scale

We adopted Amazon EFS for a Cell Painting workflow with the research and development team of a global healthcare organization. Cell Painting is an image-based profiling technique used for generating morphological profile for chemical and genetic perturbation of cells (for more details consult this article). The cell painting process involves highlighting multiple organelles within a cell using fluorescent markers (hence the term painting) and collecting hundreds of cell images for each perturbation.

In our case, 1.6 million images were generated per week using high-throughput automated microscopy (3 TB of image data per week). Every set of new plates coming from the laboratory was first uploaded to Amazon S3, then transferred to a shared file system on Amazon EFS for analytic processing. For each image, a series of steps was performed using CellProfiler on Amazon Elastic Container Services (Amazon ECS). After processing, these images were copied back to Amazon S3 as the permanent data store. With Amazon EFS, we were able to extract and analyze approximately 250 million feature data points per week, with thousands of IOPS and low latency, while making use of Amazon S3 for queries and archiving (see Figure 1).

Figure 1 - Overview of Cell Painting on AWS

Figure 1: Overview of Cell Painting on AWS

Design pattern

The common requirements for a transient file system are as follows:

  • Perform consistent copying of all data from specific Amazon S3 prefixes, including all sub-prefixes, to a file system on Amazon EFS, including error handling. This is because incomplete datasets would distort processing outcome.
  • Ability to clean up the file system once processing and data copy back to S3 is completed.

The orchestration of data transitions between Amazon S3 and Amazon EFS can be offloaded to a state machine implemented with AWS Step Functions:

We use Lambda functions to crawl a user-provided Amazon S3 bucket or prefix and create a temporary index of all objects using the ListObjectsV2 API of Amazon S3.

  • The ListObjectsV2 API is suitable to programmatically create on-demand indices of large amounts of object keys, whereas the Amazon S3 Inventory provides scheduled alternative (daily or weekly).
  • This makes copy operations idempotent, enables parallel processing and graceful failure.
  • It also allows programmatic verification by comparing target to source indices and reporting on completion.

The general design pattern is depicted in the following diagram:

Figure 3 - Serverless workflow for transient file system

Figure 2: Serverless workflow for transient file system

Implementation considerations

The definition of our Step Functions state machine is as follows:

  • For any specified Amazon S3 bucket or prefix and target Amazon EFS directory, trigger copy, reverse copy, and cleanup activities
  • Since copy operations will be idempotent, we incorporate parallel state and Lambda retrier with MaxAttempts (default=3, in our case=5, maximum=99999999) representing the maximum attempts before retries cease and normal error handling resumes.
  • Once the actual processing completes, Step Functions proceeds to carry out the EFS to S3 copy operation, clean up the EFS directory, or fail in case of copy error (Figure 3).

Figure 4 - AWS Step Functions definition for transient file system

Figure 3: AWS Step Functions definition for transient file system

The Step Functions definition example code is as follows:

{
   "StartAt":"Start",
   "States":{
	  "Start":{
         "Next":"Execute S3->EFS Lambda",
         "Type":"Pass"
      },
	  "Execute S3->EFS Lambda":{
         "Catch":[
            {
               "ErrorEquals":[
                  "States.TaskFailed"
               ],
               "Next":"Failure"
            }
         ],
         "Next":"Do Work",
         "Parameters":{
            "efs_root_directory.$":"$.efs_root_directory",
            "input_bucket.$":"$.input_bucket",
            "prefix.$":"$.prefix",
            "transfer_mode":0
         },
         "Resource":"<S3-EFS-TRANSFER-LAMBDA-ARN-HERE>",
         "ResultPath":"$.s3_to_efs_return_code",
         "Retry":[
            {
               "ErrorEquals":[
                  "States.Timeout"
               ],
               "IntervalSeconds":15,
               "MaxAttempts":5
            }
         ],
         "TimeoutSeconds":890,
         "Type":"Task"
      },
      "Do Work":{
         "Next":"Execute EFS->S3 Lambda",
         "Type":"Pass"
      },
      "Execute EFS->S3 Lambda":{
         "Catch":[
            {
               "ErrorEquals":[
                  "States.TaskFailed"
               ],
               "Next":"Failure"
            }
         ],
         "Next":"EFS Cleanup",
         "Parameters":{
            "efs_root_directory.$":"$.efs_root_directory",
            "output_bucket.$":"$.output_bucket",
            "prefix.$":"$.prefix",
            "transfer_mode":1
         },
         "Resource":"<S3-EFS-TRANSFER-LAMBDA-ARN-HERE>",
         "ResultPath":"$.efs_to_s3_return_code",
         "Retry":[
            {
               "ErrorEquals":[
                  "States.Timeout"
               ],
               "IntervalSeconds":15,
               "MaxAttempts":5
            }
         ],
         "TimeoutSeconds":890,
         "Type":"Task"
      },
      "EFS Cleanup":{
         "Catch":[
            {
               "ErrorEquals":[
                  "States.TaskFailed"
               ],
               "Next":"Failure"
            }
         ],
         "Next":"Finish",
         "Resource":"<EFS-CLEANUP-LAMBDA-ARN-HERE>",
         "ResultPath":"$.efs_cleanup_return_code",
         "Type":"Task"
      },
      "Failure":{
         "Cause":"Error",
         "Type":"Fail"
      },
      "Finish":{
         "End":true,
         "Type":"Pass"
      }
   }
}

We define a Lambda function (see upcoming sample code) using boto3 to manage copy operations between Amazon S3 and Amazon EFS, as follows:

  • To create an inventory of the specified S3 bucket or prefix, we define get_s3_keys function to recursively retrieve all object keys. If prefix is not specified, the entire bucket is scanned. We use the ListObjectsV2 boto3 call to return objects in the designated bucket.
  • We define the get_efs_keys function to retrieve all keys of Amazon EFS files and return them as set of strings.
  • The file_transfer function is responsible for copy operations based off the index. Copy is supported in both directions with the transfer_mode argument (0=Amazon S3 to EFS; 1=Amazon EFS to S3). The function will catch errors with specific files (from Amazon S3 or EFS) without stopping processing.
  • Pool connections are kept equal to thread count so the S3 client can handle concurrent requests. The function creates an EFS directory (if it didn’t exist) and appends download job to the list of threads (when copying S3 objects to EFS directory) or an upload job for the reverse copy of EFS files to Amazon S3.
  • Because Lambda is serverless and abstracts CPU resource control, the function is designed with multi-threading rather than CPU-bound multi-processing.
import logging
import boto3
from botocore.config import Config
from pathlib import Path
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# Constants
DEBUG = False               # More logging
MAX_KEYS = 1024             # Number of keys to be retrieved at once
THREAD_COUNT = 20           # Max number of threads to be spawned
S3_TO_EFS_TRANSFER = 0      # Transfer from S3 to EFS
EFS_TO_S3_TRANSFER = 1      # Transfer from EFS to S3


# S3 Client - we keep pool connections equal to thread count
# so client can handle concurrent requests
s3_config = Config(max_pool_connections=THREAD_COUNT)
s3_client = boto3.client('s3', config = s3_config)

# Logging
logger = logging.getLogger()
if DEBUG:
    logger.setLevel(logging.DEBUG)
else:
    logger.setLevel(logging.INFO)

def get_s3_keys(s3_bucket, prefix, get_directories=True):

    """ 
    This function retrieves all the keys within a s3 bucket/prefix.
    If prefix is missing, entire bucket will be scanned. 
    """

    s3_request_kwargs = {
        'Bucket': s3_bucket,
        'MaxKeys': MAX_KEYS,
        'Prefix': prefix,
    }

    continuation_token = ''             # Start with an empty continuation token
    all_keys = set()                    # Set to store unique S3 keys

    # Loop until we get a continuation token
    while True:
        if continuation_token:
            s3_request_kwargs['ContinuationToken'] = continuation_token
            logger.debug('Continuation token recieved, fetching more keys')

        response = s3_client.list_objects_v2(**s3_request_kwargs)
        
        if get_directories:
            keys = [k['Key'] for k in response.get('Contents', [])]
        else:
            keys = [k['Key'] for k in response.get('Contents', []) if not k['Key'].endswith(os.sep)]
            
        logger.debug(f'Processing {str(len(keys))} object keys.')

        all_keys.update(keys)
        continuation_token = response.get('NextContinuationToken', '')

        # Exit if there are no more keys
        if not continuation_token:
            logger.debug(f'Got total {str(len(keys))} object keys. Exiting...')
            return all_keys

def get_efs_keys(efs_root_directory, prefix):

    """ 
    This function retrieves all the files from EFS and 
    return them as a set of strings. 
    """
    
    # Root directory on EFS
    prefix_path = efs_root_directory + os.sep + prefix
    
    # If the root directory does not exists, return empty set
    # else return the file path relative to root dir
    if os.path.exists(os.path.dirname(prefix_path)):
        result = list(Path(prefix_path).rglob("*"))
        efs_files = {str(file.relative_to(efs_root_directory)) for file in result}
        logger.debug(f'Total {str(len(efs_files))} files found on EFS.')
        return efs_files
    else:
        return set()    # Empty set

# Function to get download from a set of files
def file_transfer(efs_root_directory, file_list, transfer_mode, input_bucket=None, output_bucket=None):

    """ 
    This function can transfer file(s) between S3 and EFS.

    Argument transfer_mode dictates the direction of transfer.
        0 = S3 to EFS Transfer
        1 = EFS to S3 Transfer

    S3 to EFS Transfer
        Loop through the file set and perform following:
            1. Define the local filesystem path where the file will be copied
            2. Create directory if it does not exists
            3. Add download job to the list 
    
    EFS to S3 Transfer
        Loop through the file set and perform following:
            1. Add upload job to the list 
    """

    all_threads = []

    with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
        for file in file_list:
            efs_file_path = efs_root_directory + os.sep + file
            
            if transfer_mode == S3_TO_EFS_TRANSFER:
                # Make directory if it does not exists
                if not os.path.exists(os.path.dirname(efs_file_path)):
                    os.makedirs(os.path.dirname(efs_file_path))
        
                # Queue the file download if not a directory
                if not file.endswith(os.sep):
                    all_threads.append(executor.submit(download_file, input_bucket, file, efs_file_path))
            
            elif transfer_mode == EFS_TO_S3_TRANSFER:
                all_threads.append(executor.submit(upload_file, efs_file_path, output_bucket, file))

        logger.info(f'Total {len(all_threads)} download jobs queued on {THREAD_COUNT} threads')

        # check for successful completion of threads
        # we do not stop processing if one fails
        for completed_thread in as_completed(all_threads):
            try:
                result = completed_thread.result()
                logger.debug(f"File transferred: {result}")
            except Exception as e:
                logger.error(f"Exception while transferring file {e}")

# Template function to download files
def download_file(bucket, key, filename):
    s3_client.download_file(bucket, key, filename)

def upload_file(efs_file_path, bucket, file):
    s3_client.upload_file(efs_file_path, bucket, file)

def lambda_handler(event, context):
    try:                 
        start = time.time()

        # Extract event parameters
        prefix = event.get('prefix')
        efs_root_directory = event.get('efs_root_directory')
        transfer_mode = event.get('transfer_mode')
        assert(transfer_mode == S3_TO_EFS_TRANSFER or transfer_mode == EFS_TO_S3_TRANSFER)

        if transfer_mode == S3_TO_EFS_TRANSFER:
            input_bucket = event.get('input_bucket')
            missing_files = get_s3_keys(input_bucket, prefix, True) - get_efs_keys(efs_root_directory, prefix)
            file_transfer(efs_root_directory, missing_files, transfer_mode, input_bucket, None)
            sanity_check = get_s3_keys(input_bucket, prefix, False) - get_efs_keys(efs_root_directory, prefix)
        elif transfer_mode == EFS_TO_S3_TRANSFER:
            output_bucket = event.get('output_bucket')
            missing_files = get_efs_keys(efs_root_directory, prefix) - get_s3_keys(output_bucket, prefix, False)
            file_transfer(efs_root_directory, missing_files, transfer_mode, None, output_bucket)
            sanity_check = get_efs_keys(efs_root_directory, prefix) - get_s3_keys(output_bucket, prefix, False)

        logger.debug(f'Transferred {len(missing_files)} files Seconds taken: {time.time() - start}')
        
        if len(sanity_check) > 0:
            raise Exception(f"Sanity check failed. Missing files: {sanity_check}")
        
        logger.info(f"Successfully transferred files, sanity check passed!")
        
        return {"file_transfer_status": "successful"}
    
    except Exception as e:
        logger.info(f"Error processing file transfer. Exception details: {e}")
        raise e

After the data is copied back to Amazon S3 from Amazon EFS, Step Functions triggers a separate AWS Lambda function to clean up EFS (see the following code example).

  • File deletion is performed using the shutil module.
  • If the EFS directory is multi-level, the function updates path to root to ensure that all subdirectories are removed.
import shutil
import os
import logging
import pathlib

# Set logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    try:             
        # Set the root directory within the EFS        
        root_directory = event.get('efs_root_directory')    
        prefix = event.get('prefix')

        # If multi-level directory, we need to remove from root
        prefix_path = pathlib.Path(prefix)
        root_prefix = prefix_path.parts[0]
        
        # directory
        path = os.path.join(root_directory, root_prefix)
          
        # removing directory
        shutil.rmtree(path)
        
        logger.info(f"Successfully deleted prefix")
        
        return {"efs_cleanup_status": "successful"}
    
    except Exception as e:
        logger.info("Error processing object")
        raise e

Several limitations of the solution should be acknowledged:

  • S3 index generation is one-time at workflow initiation, assuming no changes to the S3 bucket once copy operations start. S3 objects added after creating the initial index will not be processed.
  • Index comparison is handled on file name only, validating if each file has been copied between Amazon S3 and Amazon EFS. Future adaptations may consider comparing for file size differences to detect parallel changes on source.
  • Memory allocation for Lambda is important depending on S3 objects’ size.
  • Error handling in AWS Step Functions can be extended to cover runtime errors beyond Lambda timeouts.

Cleaning up

Remember to delete example resources if you no longer need them, to avoid incurring future costs. This includes the Amazon S3 bucket, Amazon EFS directory, AWS Lambda functions, and AWS Step Functions workflow.

Conclusion

In this blog post we demonstrated how you can run analytic batch processing on millions of files faster using a combination of Amazon S3 and Amazon EFS. We shared sample code which used AWS Step Functions and Lambda to orchestrate state transitions.

The solution is particularly suitable with batch processing tasks on large data sets when a file system with high IOPS and low latency is only temporarily needed. The solution scales to large datasets, keeps Amazon S3 as your permanent data store so you only incur cost for file system during data processing. The solution is serverless, making it cost-efficient and easy to maintain.

Thank you for reading this blog post! Use the following links to learn more about some of the AWS services covered in this blog:

We are looking forward to reading your feedback and questions in the comments section.

Rostislav Markov

Rostislav Markov

Rostislav Markov is principal architect with AWS Professional Services. As migrations leader on the global delivery team, he works with global and strategic customers and partners on their largest transformation programs. Outside of work, he enjoys spending time with his family outdoors and exploring New York City culture.

Ravindra Agrawal

Ravindra Agrawal

Ravindra is a DevOps Consultant at AWS Professional Services. He loves automation and helps customers adopt DevOps culture. In his free time, he enjoys photography, travelling and watching movies.

Saswata Dash

Saswata Dash

Saswata Dash is a Cloud Developer in Amazon Web Services, specializing in DevOps and supporting enterprise-scale customers to design, automate and build solutions in AWS. Outside of work, she pursues her passion for photography and catching sunrises.