AWS Big Data Blog

Introducing AWS Glue for Ray: Scaling your data integration workloads using Python

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. Today, AWS Glue processes customer jobs using either Apache Spark’s distributed processing engine for large workloads or Python’s single-node processing engine for smaller workloads. Customers like Python for its ease of use and rich collection of built-in data-processing libraries but might find it difficult for customers to scale Python beyond a single compute node. This limitation makes it difficult for customers to process large datasets. Customers want a solution that allows them to continue using familiar Python tools and AWS Glue jobs on data sets of all sizes, even those that can’t fit on a single instance.

We are happy to announce the release of a new AWS Glue job type: Ray. Ray is an open-source unified compute framework that makes it simple to scale AI and Python workloads. Ray started as an open-source project at RISELab in UC Berkeley. If your application is written in Python, you can scale it with Ray in a distributed cluster in a multi-node environment.  Ray is Python native and you can combine it with the AWS SDK for pandas to prepare, integrate and transform your data for running your data analytics and ML workloads in combination. You can use AWS Glue for Ray with Glue Studio Notebooks, SageMaker Studio Notebook, or a local notebook or IDE of your choice.

This post provides an introduction to AWS Glue for Ray and shows you how to start using Ray to distribute your Python workloads.

What is AWS Glue for Ray?

Customers like the serverless experience and fast start time offered by AWS Glue. With the introduction of Ray, we have ensured that you get the same experience. We have also ensured that you can use the AWS Glue job and AWS Glue interactive session primitives to access the Ray engine. AWS Glue jobs are fire-and-forget systems where customer submit their Ray code to the AWS Glue jobs API and AWS Glue automatically provisions the required compute resources and runs the job. AWS Glue interactive session APIs allow interactive exploration of the data for the purpose of job development. Regardless of the option used, you are only billed for the duration of the compute used. With AWS Glue for Ray, we are also introducing a new Graviton2 based worker (Z.2x) which offers 8 virtual CPUs and 64 GB of RAM.

AWS Glue for Ray consists of two major components:

  1. Ray Core – The distributed computing framework
  2. Ray Dataset – The distributed data framework based on Apache Arrow

When running a Ray job, AWS Glue provisions the Ray cluster for you and runs these distributed Python jobs on a serverless auto-scaling infrastructure.  The cluster in AWS Glue for Ray will consists of exactly one head node and one or more worker nodes.  

The head node is identical to the other worker nodes with the exception that it runs singleton processes for cluster management and the Ray driver process.  The driver is a special worker process in the head node that runs the top-level application in Python that starts the Ray job.  The worker node has processes that are responsible for submitting and running tasks.

The following figure provides a simple introduction to the Ray architecture.  The architecture illustrates how Ray is able to schedule jobs through processes called Raylets.  The Raylet manages the shared resources on each node and is shared between the concurrently running jobs.  For more information on how Ray works, see Ray.io.

The following figure shows the components of the worker node and the shared-memory object store:

There is a Global Control Store in the head node that can treat each separate machine as nodes, similar to how Apache Spark treats workers as nodes.  The following figure shows the components of the head node and the Global Control Store managing the cluster-level metadata.

AWS Glue for Ray comes included with Ray Core, Ray DatasetModin (distributed pandas) and the AWS SDK for pandas (on Modin) for seamless distributed integration into other AWS services.  Ray Core is the foundation of Ray and the basic framework for distributing Python functions and classes. Ray Dataset is a distributed data framework based on Apache Arrow and is most closely analogous to a dataframe in Apache Spark. Modin is a library designed to distribute pandas applications across a Ray cluster without any modification and is compatible with data in Ray Datasets. The included AWS SDK for pandas (formerly AWS Data Wrangler) is an abstraction layer on top of Modin to allow for the creation of pandas dataframes from (and writing to) many AWS sources such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon DynamoDB, Amazon OpenSearch Service, and others.

You can also install your own ARM compatible Python libraries via pip, either through Ray’s environmental configuration in @ray.remote or via --additional-python-modules.

To learn more about Ray, please visit the GitHub repo.

Why use AWS Glue for Ray?

Many of us start our data journey on AWS with Python, looking to prepare data for ML and data science, and move data at scale with AWS APIs and Boto3. Ray allows you to bring those familiar skills, paradigms, frameworks and libraries to AWS Glue and make them scale to handle massive datasets with minimal code changes. You can use the same data processing tools you currently have (such as Python libraries for data cleansing, computation, and ML) on datasets of all sizes. AWS Glue for Ray enables the distributed run of your Python scripts over multi-node clusters.

AWS Glue for Ray is designed for the following:

  • Task parallel applications (for example, when you want to apply multiple transforms in parallel)
  • Speeding up your Python workload as well as using Python native libraries.
  • Running the same workload across hundreds of data sources.
  • ML ingestion and parallel batch inference on data

Solution overview

For this post, you will use the Parquet Amazon Customer Reviews Dataset stored in the public S3 bucket. The objective is to perform transformations using the Ray dataset and then write it back to Amazon S3 in the Parquet file format.

Configure Amazon S3

The first step is to create an Amazon S3 bucket to store the transformed Parquet dataset as the end result.

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name for your Amazon S3 bucket.
  4. Choose Create.

Set up a Jupyter notebook with an AWS Glue interactive session

For our development environment, we use a Jupyter notebook to run the code.

You’re required to install the AWS Glue interactive sessions locally or run interactive sessions with an AWS Glue Studio notebook. Using AWS Glue Interactive sessions will help you follow and run the series of demonstration steps.

Refer to Getting started with AWS Glue interactive sessions for instructions to spin up a notebook on an AWS Glue interactive session.

Run your code using Ray in a Jupyter notebook

This section walks you through several notebook paragraphs on how to use AWS Glue for Ray. In this exercise, we look at the customer reviews from the Amazon Customer Review Parquet dataset, perform some Ray transformations, and write the results to Amazon S3 in a Parquet format.

  1. On Jupyter console, under New, choose Glue Python.
  2. Signify you want to use Ray as the engine by using the %glue_ray magic.
  3. Import the Ray library along with additional Python libraries:
    %glue_ray
    
    import ray
    import pandas
    import pyarrow
    from ray import data
    import time
    from ray.data import ActorPoolStrategy
  4. Initialize a Ray Cluster with AWS Glue.
    ray.init('auto')
  5. Next, we read a single partition from the dataset, which is Parquet file format:
    start = time.time()
    ds = ray.data.read_parquet("s3://amazon-reviews-pds/parquet/product_category=Wireless/")
    end = time.time()
    print(f"Reading the data to dataframe: {end - start} seconds")

  6. Parquet files store the number of rows per file in the metadata, so we can get the total number of records in ds without performing a full data read:
    ds.count()

  7. Next , we can check the schema of this dataset. We don’t have to read the actual data to get the schema; we can read it from the metadata:
    ds.schema()

  8. We can check the total size in bytes for the full Ray dataset:
    #calculate the size in bytes of the full dataset,  Note that for Parquet files, this size-in-bytes will be pulled from the Parquet
    #  metadata (not triggering a data read).
    ds.size_bytes()

  9. We can see a sample record from the Ray dataset:
    #Show sample records from the underlying Parquet dataset  
    start = time.time()
    ds.show(1)
    end = time.time()
    print(f"Time taken to show the data from dataframe : {end - start} seconds")

Applying dataset transformations with Ray

There are primarily two types of transformations that can be applied to Ray datasets:

  • One-to-One transformations – Each input block will contributes to only one output block, such as add_column(), map_batches() and drop_column() , and so on.
  • All-to-All transformations – Input blocks can contribute to multiple output blocks such as sort() and groupby(), and so on.

In the next series of steps we will apply some of these transformations on our resultant Ray datasets from the previous section.

  1. We can add a new column and check the schema to verify the newly added column, followed by retrieving a sample record. This transformation is only available for the datasets that can be converted to pandas format.
    # Add the given new column to the dataset and show the sample record after adding a new column
    
    start = time.time()
    ds = ds.add_column( "helpful_votes_ratio", lambda df: df["helpful_votes"] / df["total_votes"])
    end = time.time()
    print(f"Time taken to Add a new columns : {end - start} seconds")
    ds.show(1)

  2. Let’s drop a few columns we don’t need using a drop_columns transformation and then check the schema to verify if those columns are dropped from the Ray dataset:
    # Dropping few columns from the underlying Dataset 
    start = time.time()
    ds = ds.drop_columns(["review_body", "vine", "product_parent", "verified_purchase", "review_headline"])
    end = time.time()
    print(f"Time taken to drop a few columns : {end - start} seconds")
    ds.schema()


    Ray datasets have built-in transformations such as sorting the dataset by the specified key column or key function.

  3. Next, we apply the sort transformation using one of the columns present in the dataset (total_votes):
    #Sort the dataset by total votes
    start = time.time()
    ds =ds.sort("total_votes")
    end = time.time()
    print(f"Time taken for sort operation  : {end - start} seconds")
    ds.show(3)

  4. Next, we will create a Python UDF function that allows you to write customized business logic in transformations. In our UDF we have written a logic to find out the products that are rated low (i.e. total votes less than 100).We create a UDF as a function on pandas DataFrame batches. For the supported input batch formats, see the UDF Input Batch Format. We also demonstrate using map_batches() which applies the given function to the batches of records of this dataset. Map_batches() uses the default compute strategy (tasks), which helps distribute the data processing to multiple Ray workers, which are used to run tasks. For more information on a map_batches() transformation, please see the following documentation.
    # UDF as a function on pandas DataFrame - To Find products with total_votes < 100 
    def low_rated_products(df: pandas.DataFrame) -> pandas.DataFrame:
        return df[(df["total_votes"] < 100)]
        
    #Calculate the number of products which are rated low in terms of low votes i.e. less than 100
    # This technique is called Batch inference processing with Ray tasks (the default compute strategy).
    ds = ds.map_batches(low_rated_products)
    
    #See sample records for the products which are rated low in terms of low votes i.e. less than 100
    ds.show(1)

    #Count total number of products which are rated low 
    ds.count()

  5. If you have complex transformations that require more resources for data processing, we recommend utilizing Ray actors using additional configurations with applicable transformations. We have demonstrated with map_batches() below:
    # Batch inference processing with Ray actors. Autoscale the actors between 2 and 4.
    
    class LowRatedProducts:
        def __init__(self):
            self._model = low_rated_products
    
        def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame:
            return self._model(batch)
    
    start = time.time()
    predicted = ds.map_batches(
        LowRatedProducts, compute=ActorPoolStrategy(2, 4), batch_size=4)
    end = time.time()
    
  6. Next, before writing the final resultant Ray dataset we will apply map_batches() transformations to filter out the customer reviews data where the total votes for a given product is greater than 0 and the reviews belongs to the “US” marketplace only. Using map_batches() for the filter operation is better in terms of performance in comparison to filter() transformation.
    # Filter our records with total_votes == 0
    ds = ds.map_batches(lambda df: df[df["total_votes"] > 0])
    
    # Filter and select records with marketplace equals US only
    ds = ds.map_batches(lambda df: df[df["marketplace"] == 'US'])
    
    ds.count()

  7. Finally, we write the resultant data to the S3 bucket you created in a Parquet file format. You can use different dataset APIs available, such as write_csv() or write_json() for different file formats.  Additionally, you can convert the resultant dataset to another DataFrame type such as Mars, Modin or pandas.
    ds.write_parquet("s3://<your-own-s3-bucket>/manta/Output/Raydemo/")

Clean up

To avoid incurring future charges, delete the Amazon S3 bucket and Jupyter notebook.

  1. On the Amazon S3 console, choose Buckets.
  2. Choose the bucket you created.
  3. Choose Empty and enter your bucket name.
  4. Choose Confirm.
  5. Choose Delete and enter your bucket name.
  6. Choose Delete bucket.
  7. On the AWS Glue console, choose Interactive Sessions
  8. Choose the interactive session you created.
  9. Choose Delete to remove the interactive session.

Conclusion

In this post, we demonstrated how you can use AWS Glue for Ray to run your Python code in a distributed environment.  You can now run your data and ML applications in a multi-node environment.

Refer to the Ray documentation for additional information and use cases.


About the authors

Zach Mitchell is a Sr. Big Data Architect. He works within the product team to enhance understanding between product engineers and their customers while guiding customers through their journey to develop data lakes and other data solutions on AWS analytics services.

Ishan Gaur works as Sr. Big Data Cloud Engineer ( ETL ) specialized in AWS Glue. He’s passionate about helping customers build out scalable distributed ETL workloads and implement scalable data processing and analytics pipelines on AWS. When not at work, Ishan likes to cook, travel with his family, or listen to music.

Derek Liu is a Solutions Architect on the Enterprise team based out of Vancouver, BC.  He is part of the AWS Analytics field community and enjoys helping customers solve big data challenges through AWS analytic services.

Kinshuk Pahare is a Principal Product Manager on AWS Glue.