AWS Open Source Blog

Automating a DAG deployment with Amazon Managed Workflows for Apache Airflow

Many developers and data engineers use Managed Workflows for Apache Airflow (Amazon MWAA), a managed service for open source Apache Airflow, to programmatically author, schedule, and monitor workflows. With Amazon MWAA, you can focus on business logic and build workflows without worrying about the management responsibilities such as setup, patching, upgrades, security, scaling, and so on. Adopting CI/CD and the best practices from the Operational Excellence pillar can help address risks in an environment, and limit errors from manual processes. More importantly, they help free data scientists and data engineers from the related manual processes, so they can focus on building workflows, differentiating an organization, and accelerating the flow of valuable changes into production.

In this blog post, we explain how to automate deploying workflows (Directed Acyclic Graphs, or DAGs) to an Amazon MWAA environment. First, we explore how to sanity-test DAGs, identify issues with dependencies, and apply unit tests so that you get feedback on workflows quickly and avoid unnecessary deployments. Second, we explore how to create and update the MWAA environment through infrastructure as code (IaC) as changes are detected.

Solution overview

The following diagram shows the high-level process flow for the CI/CD pipeline, which is built using AWS CodePipeline, a fully managed continuous delivery service. The pipeline automates the build, test, and deploy phases of the release process every time there is a code change, based on the release model you define. The build and test phase of the pipeline is handled by AWS CodeBuild, a fully managed continuous integration service that compiles source code, runs tests, and produces software packages that are ready to deploy using the build commands from buildspec.yaml.

high-level process flow for the CI/CD pipeline, which is built using AWS CodePipeline, a fully managed continuous delivery service

Setting up the pipeline

The first step in automating the CI/CD process for the workflows is to build the pipeline as infrastructure as code. Create a stack using infra/pipeline.yaml by following the instructions in the GitHub repo to build the pipeline. It creates an AWS CodePipeline pipeline with GitHub as source, AWS CodeBuild job, an artifact bucket in Amazon Simple Storage Service (Amazon S3), and a few AWS Identity and Access Management (IAM) roles necessary for the pipeline to work.

After building the pipeline, every time you make change in the GitHub repository, the pipeline gets invoked. The pipeline downloads the source code from the GitHub repository, invokes the AWS CodeBuild job, and creates or updates the Amazon MWAA environment.

CI/CD steps

The five high-level steps run as part of the deployment process for a workflow are:

  1. Sanity checking DAGs, identifying dependency issues, and unit testing DAGs
  2. Identifying changes to the requirements file and plugins
  3. Copying the plugins and requirements file to the Amazon MWAA S3 bucket and copying the AWS CloudFormation templates and corresponding parameter file for creating the MWAA environment to the artifact bucket
  4. Building the Amazon MWAA environment as IaC
  5. Copying DAGs to the Amazon MWAA S3 bucket

The first three steps are covered in the build/buildspec.yaml. AWS CodeBuild runs this file as part of the build job. The CodeBuild job starts with testing the DAGs, plugins, and requirements.txt using MWAA local runner. When the tests are successful, it moves on to find whether there are changes in requirements.txt or plugins from the last commit. Then it moves on to copy the plugins and requirements.txt to the Amazon S3 bucket. Finally, it copies the template.yaml and the corresponding parameter file to the pipeline artifact bucket.

The fourth and the fifth steps in the CI/CD process are handled by AWS CodePipeline. The pipeline runs the change-set using the infra/template.yaml copied from the last step. It automatically handles creating or updating the MWAA environment. Remember, updates happen only when there are changes in the plugins or requirements file.

The fifth and final step is copying the DAGs to an Amazon S3 bucket. DAG changes are immediately reflected to the MWAA environment, whereas changes to plugins and the requirements file require an environment update. So, if your DAGs have dependency with plugins and requirements file, the DAGs will not function properly until the environment is updated. To avoid this issue, the deployment of DAGs to an Amazon S3 bucket is done as the last step in the pipeline.

In the CI/CD process described previously, the last three steps are common pipeline steps. Thus, I will cover the first two steps, which are unique to Amazon MWAA and Apache Airflow next.

Sanity-checking DAGs, identifying dependency issues, and unit testing DAGs

Whether you are building an application or a data pipeline, testing is an integral part of development. Tests should be done in layers. DAG integrity tests check the validity of all DAGs. Unit tests validate small individual units of work. Integrations tests validate the pipeline as a whole.

Although you can build these layers with any CI/CD tooling, this blog post covers only DAG integrity tests and unit tests, which are written in the Python built-in unittest framework. You can choose to write in other testing frameworks, such as pytest.

Tests are done in the MWAA local runner, which is similar to an MWAA production image, so it confirms that your workflow running in Amazon MWAA works fine.

As a prerequisite, we need the Amazon MWAA local runner Docker image and the Postgres Docker image in a container registry. Follow the instructions in the MWAA local runner GitHub repo to build the image for 2.0.

After you have built the image, you get an image ID, which you can use to tag and push the image to a container registry. I used Amazon Elastic Container Registry (Amazon ECR) to host the built images.

Use the following command for pushing to Amazon ECR. If you are using a different container registry, make sure to update the build/local-runner.py.

aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account}.dkr.ecr.{region}.amazonaws.com (http://amazonaws.com/%7Brepo)
aws ecr create-repository --repository-name mwaa-local --image-tag-mutability IMMUTABLE --image-scanning-configuration scanOnPush=true
docker tag {imageid} {account}.dkr.ecr.{region}.amazonaws.com/mwaa-local:2.0.2 (http://amazonaws.com/%7Brepo)
docker push {account}.dkr.ecr.{region}.amazonaws.com/ (http://amazonaws.com/%7Brepo)mwaa-local:2.0.2 (http://amazonaws.com/%7Brepo)

Let’s make a quick code change to dags/hello-mwaa.py by modifying the print_hello function:

def print_hello():
 return 'Hello World!!!'

This will kick off the pipeline in AWS CodePipeline. Once the source code is downloaded, AWS CodePipeline triggers the build job in AWS CodeBuild. The following line in build/buildspec.yaml launches the testing:

python3 build/local-runner.py $REGION $ACCOUNT_NUMBER `pwd` dags/requirements.txt ${PY_CONSTRAINTS}

Let’s review the details of the testing done by build/local-runner.py.

Dependency test

With many versions of libraries, introducing an incompatible package version in the requirements file and getting the Airflow environment stuck is easy. Hence, Airflow recommends using a constraints file; learn more in the MWAA documentation.

Also, any updates to the requirements file need the MWAA environment updated, which can take 20–30 minutes. Thus, identifying issues with dependencies during the build process is important.

I have a dependency test in test/dag-validation.py like the following, which checks whether the requirements file has a constraints file and then launches installation of requirements.txt in the MWAA local runner to identify issues. If there are issues or the constraints file is missing, it returns failure.

with open(requirements, "r") as requirementFile:
    requirementFileContent = requirementFile.read()
    if constraintFileName not in requirementFileContent:
        exit_code = 1
    requirementFile.close()
   exit_code, dag_test_output = running_container.exec_run(
        "/entrypoint.sh test-requirements"
    )

DAG integrity test

The term integrity test is popularized by the blog post “Data’s Inferno: 7 Circles of Data Testing Hell with Airflow”. It is a simple and common test to help DAGs avoid unnecessary deployments and to provide a faster feedback loop. It confirms that DAGs are syntactically correct, there are no Python dependency errors, and there are no cycles in relationships.

The following checks are performed in test/dag-validation.py:

  • Are there any import errors in DAGs?
  • Are there any cyclic dependencies? (Cyclic dependencies are easily introduced when you write complex relationships, and so DagBag’s check for cyclic dependency is useful.)
  • Are there DAGs that take longer than the expected time to parse? (Airflow scheduler parses all DAGs periodically. This test will make sure that DAG parsing does not impact the performance of the scheduler.)
from airflow.models import DagBag
import unittest
import os
import json
class TestSanityOfDag(unittest.TestCase):
    EXPECTED_LOAD_TIME = 1

    def setUp(self):
        self.dagbag = DagBag(os.path.expanduser("/usr/local/airflow/dags"), include_examples=False)

    def test_import_dags(self):
        self.assertFalse(
            len(self.dagbag.import_errors),
            'DAG failed with import errors : {}'.format(
                self.dagbag.import_errors
            )
        )

    def test_parse_time(self):

        metric = self.dagbag.dagbag_stats
        for o in metric:
            self.assertLessEqual(o.duration, self.EXPECTED_LOAD_TIME, 
            '{file} take {actual_load_time}s and is more than {expected_load_time}s: '.format(
                expected_load_time=self.EXPECTED_LOAD_TIME,
                actual_load_time=o.duration,
                file=o.file[1:]
            ))

Additional helpful tests followed in the community during the integrity tests include:

  • If multiple Python files have the same DAG ID, only one will show up as the unique DAG ID across all DAGs. One practice I have seen is to keep the DAG ID the same as the file name, which you can do with:
       dag_id=os.path.basename(__file__).replace(".py", ""),

    If you follow this practice, writing a test case for it would be best.

  • If you share your Airflow environment with multiple teams, testing whether there are owners defined in the DAG definition is a good practice.
  • If you are using the convenient way of sending email using the email_on_failure DAG argument, checking whether the field is present will be helpful.

Unit test

Unit tests ensure that each smallest part of the workflow is tested in isolation, which in DAG’s case is mostly operators and sensors. You can also assert the number of tasks in the DAG, verify relationship between the tasks, and validate input and output of the tasks. If there are custom sensors or operators, you can verify the correctness of the custom operators/sensors functionality.

Because the unit tests are done in isolation, when there are connections to the external services—for example, the DAG is performing a task in Amazon Elastic Kubernetes Service (Amazon EKS) or getting data from an Amazon Redshift cluster—you can use mock libraries to mock the external services. The Python unittest framework provides a mock library called unittest.mock with which you can create mocks or stubs and assert input arguments or return values to make sure that your code is integrating with the external service correctly.

Identifying changes to requirements file and plugins

Airflow has a built-in plugin manager that allows you to drop an external component to customize an installation. With Amazon MWAA, the plugin is packaged as a ZIP file and dropped into the @AIRFLOW_HOME/plugins folder. The requirements file allows you to install Python dependencies that are not included in the Apache Airflow base install of your MWAA. Both the plugins and requirements file are required to have version numbers associated if they are configured in the MWAA environment.

MwaaEnvironment:
    Type: AWS::MWAA::Environment
    DependsOn: MwaaExecutionPolicy
    Properties:
      Name: !Sub "${AWS::StackName}-MwaaEnvironment"
      SourceBucketArn: !Sub "arn:aws:s3:::${S3Bucket}"
      ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn
      DagS3Path: dags
      RequirementsS3Path: requirements.txt
      RequirementsS3ObjectVersion: !Ref RequirementsFileVersion
      PluginsS3Path: plugins/plugins.zip
      PluginsS3ObjectVersion: !Ref PluginsVersion

Whenever the plugins or requirements files are updated in the Amazon S3 bucket, the new version ID of the files must be updated in the MWAA environment for the changes to take effect. This environment update takes 20–30 minutes. Thus, incorporating testing in your deployment to get faster feedback on issues and triggering the update only if the requirements or plugins are changed is important.

As I highlighted previously, we should not instigate an MWAA environment update for DAG changes. We must identify whether the plugins and requirements files are changed from the last time the build successfully ran. The high-level steps follow, and you can find them in infra/plugin-versioning.py:

  1. Using the git diff-tree command along with a previously successfully built commit ID, find out whether there are changes in the plugins or requirements files.
  2. If there are changes, copy the files to an Amazon S3 bucket and get the version ID of the new file.
  3. If there are no changes to the files, get the version using GetEnvironment API of MWAA.
  4. Update the versions in the parameter file that is passed to infra/template.yaml.

Conclusion

Amazon MWAA takes away the operational responsibility so you can focus on building workflows. By following guidelines described in this post to automate deploying your DAGs to an Amazon MWAA environment, you can build quality DAGs and deploy them faster.

In this post, we explained how to build an automated CI/CD pipeline to accelerate deploying workflows, and we described how to automate tasks, including updates to plugins and requirements files. You can deploy the CI/CD pipeline and related code to your own AWS account using instructions found in the GitHub repository.

Uma Ramadoss

Uma Ramadoss

Uma Ramadoss is a specialist Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping customers design and operate event-driven cloud-native applications using services like Lambda, API Gateway, EventBridge, StepFunctions, and SQS.