AWS Big Data Blog

Streamline Spark application development on Amazon EMR with the Data Solutions Framework on AWS

Today, organizations are heavily using Apache Spark for their big data processing needs. However, managing the entire development lifecycle of Spark applications—from local development to production deployment—can be complex and time-consuming. Managing the entire code base—including application code, infrastructure provisioning, and continuous integration and delivery (CI/CD) pipelines—is sometimes not fully automated and a shared responsibility across multiple teams, which slows down release cycles. This undifferentiated heavy lifting diverts valuable resources away from core business objectives: deriving value from data.

In this post, we explore how to use Amazon EMR, the AWS Cloud Development Kit (AWS CDK), and the Data Solutions Framework (DSF) on AWS to streamline the development process, from setting up a local development environment to deploying serverless Spark infrastructure, and implementing a CI/CD pipeline for automated testing and deployment.

By adopting this approach, developers gain full control over their code and the infrastructure responsible for running it, alleviating the need for cross-team dependency. Developers can customize the infrastructure to meet specific business needs and optimize performance. Additionally, they can customize CI/CD stages to facilitate comprehensive testing, using the self-mutation capability of AWS CDK Pipelines to automatically update and refine the deployment process. This level of control not only accelerates development cycles but also enhances the reliability and efficiency of the entire application lifecycle, so developers can focus more on innovation and less on manual infrastructure management.

Solution overview

The solution consists of the following key components:

  • The local development environment to develop and test your Spark code locally
  • The infrastructure as code (IaC) that will run your Spark application in AWS environments
  • The CI/CD pipeline running end-to-end tests and deploying into the different AWS environments

In the following sections, we discuss how to set up these components.

Prerequisites

To set up this solution, you must have an AWS account with appropriate permissions, Docker and the AWS CDK CLI.

Set up the local development environment

Developing Spark applications locally can be a challenging task due to the need for a consistent and efficient environment that mirrors your production setup. With Amazon EMR, Docker, and the Amazon EMR toolkit extension for Visual Studio Code, you can quickly set up a local development environment for Spark applications, developing and testing Spark code locally, and seamlessly port it to the cloud.

The Amazon EMR toolkit for VS Code includes an “EMR: Create Local Spark Environment” command that generates a development container. This container is based on an Amazon EMR on Amazon EKS image corresponding to the Amazon EMR version you select. You can develop Spark and PySpark code locally, with full compatibility with your remote Amazon EMR environment. Additionally, the toolkit provides helpers to make it straightforward to connect to the AWS Cloud, including an Amazon EMR explorer, an AWS Glue Data Catalog explorer, and commands to run Amazon EMR Serverless jobs from VS Code.

To set up your local environment, complete the following steps:

  1. Install VS Code and the Amazon EMR Toolkit for VS Code.
  2. Install and launch Docker.
  3. Create a local Amazon EMR environment in your working directory using the command EMR: Create Local Spark Environment.

Amazon EMR Toolkit bootstrap

  1. Choose PySpark, Amazon EMR 7.5, and the AWS Region you want to use, and choose an authentication mechanism.

Amazon EMR toolkit local environment

  1. Log in to Amazon ECR with your AWS credentials using the following command so you can download the Amazon EMR image:
aws ecr get-login-password --region us-east-1 \
    | docker login \
    --username AWS \
    --password-stdin \
    12345678910.dkr.ecr.us-east-1.amazonaws.com
  1. Now you can launch your dev container using the VS Code command Dev Containers: Rebuild and Reopen in container.

The container will install the latest operating system packages and run a local Spark history server on port 18080.

local Spark history server

The container provides spark-shell, spark-sql, and pyspark from the terminal and a Jupyter Python kernel for connecting a Jupyter notebook to execute interactive Spark code.

local Jupyter notebooks

Using the Amazon EMR Toolkit, you can develop your Spark application and test it locally using Pytest—for example, to validate the business logic. You can also connect to other AWS accounts where you have your development environment.

Build the AWS CDK application with DSF on AWS

After you validate the business logic into your local Spark application, you can implement the infrastructure responsible for running your application. DSF provides AWS CDK L3 Constructs that simplify the creation of Spark-based data pipelines on EMR Serverless or Amazon EMR on EKS.

DSF provides the capability to package your local PySpark application, including the Python dependencies, into artifacts that can consumed by EMR Serverless jobs. The PySparkApplicationPackage is a construct that uses a Dockerfile to perform the packaging of dependencies into a Python virtual environment archive and then upload the archive and the PySpark entrypoint file into a secured Amazon Simple Storage Service (Amazon S3) bucket. The following diagram illustrates this architecture.

PySparkApplicationPackage L3 construct

See the following example code:

spark_app = dsf.processing.PySparkApplicationPackage(
    self,
    "SparkApp",
    entrypoint_path="./../spark/src/agg_trip_distance.py",
    application_name="TaxiAggregation",
    # Path of the Dockerfile used to package the dependencies as a Python venv
    dependencies_folder='./../spark',
    # Path of the venv archive in the docker image
    venv_archive_path="/venv-package/pyspark-env.tar.gz",
    removal_policy=RemovalPolicy.DESTROY)

You just need to provide the paths for the following:

  • The PySpark entrypoint. This is the main Python script of your Spark application.
  • The Dockerfile containing the logic for packaging a virtual environment into an archive.
  • The path of the resulting archive in the container file system.

DSF provides helpers to connect the application package to the EMR Serverless job. The PySparkApplicationPackage construct exposes properties that can directly be used into the SparkEmrServerlessJob construct parameters. This construct simplifies the configuration of a batch job using an AWS Step Functions state machine. The following diagram illustrates this architecture.

EmrServerlessJob L3 construct

The following code is an example of an EMR Serverless job:

spark_job = dsf.processing.SparkEmrServerlessJob(
    self,
    "SparkProcessingJob",
    dsf.processing.SparkEmrServerlessJobProps(
        name=f"taxi-agg-job-{Names.unique_resource_name(self)}",
        # ID of the previously created EMR Serverless runtime
        application_id=spark_runtime.application.attr_application_id,
        # The IAM role used by the EMR Job with permissions required by the application
        execution_role=processing_exec_role,
        spark_submit_entry_point=spark_app.entrypoint_uri,
        # Add the Spark parameters from the PySpark package to configure the dependencies (using venv)
        spark_submit_parameters=spark_app.spark_venv_conf + spark_params,
        removal_policy=RemovalPolicy.DESTROY,
        schedule=schedule))

Note the two parameters of SparkEmrServerlessJob that are provided by PySparkApplicationPackage:

  • entrypoint_uri, which is the S3 URI of the entrypoint file
  • spark_venv_conf, which contains the Spark submit parameters for using the Python virtual environment

DSF also provides a SparkEmrServerlessRuntime to simplify the creation of the EMR Serverless application responsible for running the job.

Deploy the Spark application using CI/CD

The final step is to implement a CI/CD pipeline that can test your Spark code and promote from dev/test/stage and then to production. DSF provides a L3 Construct that simplifies the creation of the CI/CD pipeline for your Spark applications. DSF’s implementation of the Spark CI/CD pipeline construct uses the AWS CDK built-in pipeline functionality. One of the key capabilities when using an AWS CDK pipeline is its self-mutating capability. It can update itself whenever you change its definition, avoiding the traditional chicken-and-egg problem of pipeline updates and helping developers fully control their CI/CD pipeline.

When the pipeline runs, it follows a carefully orchestrated sequence. First, it retrieves your code from your repository and synthesizes it into AWS CloudFormation templates. Before doing anything else, it examines these templates to see if you’ve made any changes to the pipeline’s own structure. If the pipeline detects that its definition has changed, it will pause its normal operation and update itself first. After the pipeline has updated itself, it will continue with its regular stages, such as deploying your application.

DSF provides an opinionated implementation of CDK Pipelines for Spark applications, where the PySpark code is automatically unit tested using Pytest and where the configuration is simplified. You only need to configure four components:

  • The CI/CD stages (testing, staging, production, and so on). This includes the AWS account ID and Region where these environments reside in.
  • The AWS CDK stack that is deployed in each environment.
  • (Optional) The integration test script that you want to run against the deployed stack.
  • The SparkEmrCICDPipeline AWS CDK construct.

The following diagram illustrates how everything works together.

SparkCICDPipeline L3 construct

Let’s dive into each of these components.

Define cross-account deployment and CI/CD stages

With the SparkEmrCICDPipeline construct, you can deploy your Spark application stack across different AWS accounts. For example, you can have a separate account for your CI/CD processes and different accounts for your staging and production environments.To set this up, first bootstrap the various AWS accounts (staging, production, and so on):

cdk bootstrap --profile <ENVIRONMENT_ACCOUNT_PROFILE> \ 
    aws://<ENVIRONMENT_ACCOUNT_ID&gt;/&lt;REGION> \ 
    --trust <CICD_ACCOUNT_ID> \ 
    --cloudformation-execution-policies "POLICY_ARN"

This step sets up the necessary resources in the environment accounts and creates a trust relationship between those accounts and the CI/CD account where the pipeline will run.Next, choose between two options to define the environments (both options require the relevant configuration in the cdk.context.json file.The first option is to use pre-defined environments, which is defined as follows:

{ 
    "staging": { 
        "account": "<STAGING_ACCOUNT_ID>", 
        "region": "<REGION>" 
    }, 
    "prod": { 
        "account": "<PROD_ACCOUNT_ID>", 
        "region": "<REGION>" 
    } 
}

Alternatively, you can use user-defined environments, which is defined as follows:

{
   "environments":[
      {
         "stageName":"<STAGE_NAME_1>",
         "account":"<STAGE_ACCOUNT_ID>",
         "region":"<REGION>",
         "triggerIntegTest":"<OPTIONAL_BOOLEAN_CAN_BE_OMMITTED>"
      },
      {
         "stageName":"<STAGE_NAME_2>",
         "account":"<STAGE_ACCOUNT_ID>",
         "region":"<REGION>",
         "triggerIntegTest":"<OPTIONAL_BOOLEAN_CAN_BE_OMMITTED>"
      },
      {
         "stageName":"<STAGE_NAME_3>",
         "account":"<STAGE_ACCOUNT_ID>",
         "region":"<REGION>",
         "triggerIntegTest":"<OPTIONAL_BOOLEAN_CAN_BE_OMMITTED>"
      }
   ]
}

Customize the stack to be deployed

Now that the environments have been bootstrapped and configured, let’s look at the actual stack that contains the resources that will be deployed in the various environments. Two classes must be implemented:

  • A class that extends the stack – This is where the resources that are going to be deployed in each of the environments are defined. This can be a normal AWS CDK stack, but it can be deployed in another AWS account depending on the environment configuration defined in the previous section.
  • A class that extends ApplicationStackFactory – This is DSF specific, and makes it possible to configure and then return the stack that is created.

The following code shows a full example:

class MyApplicationStack(cdk.Stack): 
    def __init__(self, scope, *, stage): 
        super().__init__(scope, "MyApplicationStack") 
        bucket = Bucket(self, "TestBucket",
                        auto_delete_objects=True, 
                        removal_policy=cdk.RemovalPolicy.DESTROY) 
        cdk.CfnOutput(self, "BucketName", value=bucket.bucket_name) 
        
class MyStackFactory(dsf.utils.ApplicationStackFactory): 
    def create_stack(self, scope, stage): 
        return MyApplicationStack(scope, stage=stage)

ApplicationStackFactory supports customization of the stack before returning the initialized object to be deployed by the CI/CD pipeline. You can customize your stack behavior by passing the current stage to your stack. For example, you can skip scheduling the Spark application in the integration tests stage because the integration tests trigger it manually as part of the CI/CD pipeline. For the production stage, the scheduling facilitates automatic execution of the Spark application.

Write the integration test script

The integration test script is a bash script that is triggered after the main application stack has been deployed. Inputs to the bash script can come from the AWS CloudFormation outputs of the main application stack. These outputs are mapped into environment variables that the bash script can access directly.

In the Spark CI/CD example, the application stack uses the SparkEMRServerlessJob CDK construct. This construct uses a Step Functions state machine to manage the execution and monitoring of the Spark job. The following is an example integration test bash script that we use to test that the deployed stack can run the associated Spark job successfully:

#!/bin/bash 
EXECUTION_ARN=$(aws stepfunctions start-execution --state-machine-arn $STEP_FUNCTION_ARN | jq -r '.executionArn')

while true 
do 
    STATUS=$(aws stepfunctions describe-execution --execution-arn $EXECUTION_ARN | jq -r '.status') 
    if [ $STATUS = "SUCCEEDED" ]; then 
        exit 0 
    elif [ $STATUS = "FAILED" ] || [ $STATUS = "TIMED_OUT" ] || [ $STATUS = "ABORTED" ]; then 
        exit 1 
    else 
        sleep 10
        continue 
    fi
done

The integration test scripts are executed within an AWS CodeBuild project. As part of the IntegrationTestStack, we’ve included a custom resource that periodically checks the status of the integration test script as it runs. Failure of the CodeBuild execution causes the parent pipeline (residing in the pipeline account) to fail. This helps teams only promote changes that pass all the required testing.

Bring all the components together

When you have your components ready, you can use the SparkEmrCICDPipeline to bring them together. See the following example code:

dsf.processing.SparkEmrCICDPipeline(
    self,
    "SparkCICDPipeline",
    spark_application_name="SparkTest",
    # The Spark image to use in the CICD unit tests
    spark_image=dsf.processing.SparkImage.EMR_7_5,
    # The factory class to dynamically pass the Application Stack
    application_stack_factory=SparkApplicationStackFactory(),
    # Path of the CDK python application to be used by the CICD build and deploy phases
    cdk_application_path="infra",
    # Path of the Spark application to be built and unit tested in the CICD
    spark_application_path="spark",
    # Path of the bash script responsible to run integration tests 
    integ_test_script='./infra/resources/integ-test.sh',
    # Environment variables used by the integration test script, value is the CFN output name
    integ_test_env={
        "STEP_FUNCTION_ARN": "ProcessingStateMachineArn"
    },
    # Additional permissions to give to the CICD to run the integration tests
    integ_test_permissions=[
        PolicyStatement(
            actions=["states:StartExecution", "states:DescribeExecution"
            ],
            resources=["*"]
        )
    ],
    source= CodePipelineSource.connection("your/repo", "branch",
        connection_arn="arn:aws:codeconnections:us-east-1:222222222222:connection/7d2469ff-514a-4e4f-9003-5ca4a43cdc41"
    ),
    removal_policy=RemovalPolicy.DESTROY,
)

The following elements of the code are worth highlighting:

  • With the integ_test_env parameter, you can define the environment variable mapping with the output of your application stack that’s defined in the application_stack_factory parameter
  • The integ_test_permissions parameter specifies the AWS Identity and Access Management (IAM) permissions that are attached to the CodeBuild project where the integration test script runs in
  • CDK Pipelines needs an AWS code connection Amazon Resource Name (ARN) to connect to your Git repository when you host your code

Now you can deploy the stack containing the CI/CD pipeline. This is a one-time operation because the CI/CD pipeline will dynamically be updated based on code changes that impact the CI/CD pipeline itself:

cd infra 
cdk deploy CICDPipeline

Then you can commit and push the code into the source code repository defined in the source parameter. This step triggers the pipeline and deploys the application in the configured environments. You can check the pipeline definition and status on the AWS CodePipeline console.

AWS CodePipeline

You can find the full example on the Data Solutions Framework GitHub repository.

Clean up

Follow the readme guide to delete the resources created by the solution.

Conclusion

By using Amazon EMR, the AWS CDK, DSF on AWS, and the Amazon EMR toolkit, developers can now streamline their Spark application development process. The solution described in this post helps developers gain full control over their code and infrastructure, making it possible to set up local development environments, implement automated CI/CD pipelines, and deploy serverless Spark infrastructure across multiple environments.

DSF supports other patterns, such as streaming governance and data sharing and Amazon Redshift data warehousing. The DSF roadmap is publicly available, and we look forward to your feature requests, contributions, and feedback. You can get started using DSF by following our Quick start guide.

 


About the authors

Jan Michael Go Tan

Jan Michael Go Tan

Jan is a Principal Solutions Architect for Amazon Web Services. He helps customers design scalable and innovative solutions with the AWS Cloud.

Vincent Gromakowski

Vincent Gromakowski

Vincent is an Analytics Specialist Solutions Architect at AWS where he enjoys solving customers’ analytics, NoSQL, and streaming challenges. He has a strong expertise on distributed data processing engines and resource orchestration platform.

Lotfi Mouhib

Lotfi Mouhib

Lotfi is a Principal Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.