Automating a DAG deployment with Amazon Managed Workflows for Apache Airflow
Many developers and data engineers use Managed Workflows for Apache Airflow (Amazon MWAA), a managed service for open source Apache Airflow, to programmatically author, schedule, and monitor workflows. With Amazon MWAA, you can focus on business logic and build workflows without worrying about the management responsibilities such as setup, patching, upgrades, security, scaling, and so on. Adopting CI/CD and the best practices from the Operational Excellence pillar can help address risks in an environment, and limit errors from manual processes. More importantly, they help free data scientists and data engineers from the related manual processes, so they can focus on building workflows, differentiating an organization, and accelerating the flow of valuable changes into production.
In this blog post, we explain how to automate deploying workflows (Directed Acyclic Graphs, or DAGs) to an Amazon MWAA environment. First, we explore how to sanity-test DAGs, identify issues with dependencies, and apply unit tests so that you get feedback on workflows quickly and avoid unnecessary deployments. Second, we explore how to create and update the MWAA environment through infrastructure as code (IaC) as changes are detected.
The following diagram shows the high-level process flow for the CI/CD pipeline, which is built using AWS CodePipeline, a fully managed continuous delivery service. The pipeline automates the build, test, and deploy phases of the release process every time there is a code change, based on the release model you define. The build and test phase of the pipeline is handled by AWS CodeBuild, a fully managed continuous integration service that compiles source code, runs tests, and produces software packages that are ready to deploy using the build commands from
Setting up the pipeline
The first step in automating the CI/CD process for the workflows is to build the pipeline as infrastructure as code. Create a stack using
infra/pipeline.yaml by following the instructions in the GitHub repo to build the pipeline. It creates an AWS CodePipeline pipeline with GitHub as source, AWS CodeBuild job, an artifact bucket in Amazon Simple Storage Service (Amazon S3), and a few AWS Identity and Access Management (IAM) roles necessary for the pipeline to work.
After building the pipeline, every time you make change in the GitHub repository, the pipeline gets invoked. The pipeline downloads the source code from the GitHub repository, invokes the AWS CodeBuild job, and creates or updates the Amazon MWAA environment.
The five high-level steps run as part of the deployment process for a workflow are:
- Sanity checking DAGs, identifying dependency issues, and unit testing DAGs
- Identifying changes to the requirements file and plugins
- Copying the plugins and requirements file to the Amazon MWAA S3 bucket and copying the AWS CloudFormation templates and corresponding parameter file for creating the MWAA environment to the artifact bucket
- Building the Amazon MWAA environment as IaC
- Copying DAGs to the Amazon MWAA S3 bucket
The first three steps are covered in the
build/buildspec.yaml. AWS CodeBuild runs this file as part of the build job. The CodeBuild job starts with testing the DAGs, plugins, and
requirements.txt using MWAA local runner. When the tests are successful, it moves on to find whether there are changes in
requirements.txt or plugins from the last commit. Then it moves on to copy the plugins and
requirements.txt to the Amazon S3 bucket. Finally, it copies the
template.yaml and the corresponding parameter file to the pipeline artifact bucket.
The fourth and the fifth steps in the CI/CD process are handled by AWS CodePipeline. The pipeline runs the change-set using the
infra/template.yaml copied from the last step. It automatically handles creating or updating the MWAA environment. Remember, updates happen only when there are changes in the plugins or requirements file.
The fifth and final step is copying the DAGs to an Amazon S3 bucket. DAG changes are immediately reflected to the MWAA environment, whereas changes to plugins and the requirements file require an environment update. So, if your DAGs have dependency with plugins and requirements file, the DAGs will not function properly until the environment is updated. To avoid this issue, the deployment of DAGs to an Amazon S3 bucket is done as the last step in the pipeline.
In the CI/CD process described previously, the last three steps are common pipeline steps. Thus, I will cover the first two steps, which are unique to Amazon MWAA and Apache Airflow next.
Sanity-checking DAGs, identifying dependency issues, and unit testing DAGs
Whether you are building an application or a data pipeline, testing is an integral part of development. Tests should be done in layers. DAG integrity tests check the validity of all DAGs. Unit tests validate small individual units of work. Integrations tests validate the pipeline as a whole.
Although you can build these layers with any CI/CD tooling, this blog post covers only DAG integrity tests and unit tests, which are written in the Python built-in
unittest framework. You can choose to write in other testing frameworks, such as pytest.
Tests are done in the MWAA local runner, which is similar to an MWAA production image, so it confirms that your workflow running in Amazon MWAA works fine.
As a prerequisite, we need the Amazon MWAA local runner Docker image and the Postgres Docker image in a container registry. Follow the instructions in the MWAA local runner GitHub repo to build the image for 2.0.
After you have built the image, you get an image ID, which you can use to tag and push the image to a container registry. I used Amazon Elastic Container Registry (Amazon ECR) to host the built images.
Use the following command for pushing to Amazon ECR. If you are using a different container registry, make sure to update the
Let’s make a quick code change to
dags/hello-mwaa.py by modifying the
This will kick off the pipeline in AWS CodePipeline. Once the source code is downloaded, AWS CodePipeline triggers the build job in AWS CodeBuild. The following line in
build/buildspec.yaml launches the testing:
Let’s review the details of the testing done by
With many versions of libraries, introducing an incompatible package version in the requirements file and getting the Airflow environment stuck is easy. Hence, Airflow recommends using a constraints file; learn more in the MWAA documentation.
Also, any updates to the requirements file need the MWAA environment updated, which can take 20–30 minutes. Thus, identifying issues with dependencies during the build process is important.
I have a dependency test in
test/dag-validation.py like the following, which checks whether the requirements file has a constraints file and then launches installation of
requirements.txt in the MWAA local runner to identify issues. If there are issues or the constraints file is missing, it returns failure.
DAG integrity test
The term integrity test is popularized by the blog post “Data’s Inferno: 7 Circles of Data Testing Hell with Airflow”. It is a simple and common test to help DAGs avoid unnecessary deployments and to provide a faster feedback loop. It confirms that DAGs are syntactically correct, there are no Python dependency errors, and there are no cycles in relationships.
The following checks are performed in
- Are there any import errors in DAGs?
- Are there any cyclic dependencies? (Cyclic dependencies are easily introduced when you write complex relationships, and so DagBag’s check for cyclic dependency is useful.)
- Are there DAGs that take longer than the expected time to parse? (Airflow scheduler parses all DAGs periodically. This test will make sure that DAG parsing does not impact the performance of the scheduler.)
Additional helpful tests followed in the community during the integrity tests include:
- If multiple Python files have the same DAG ID, only one will show up as the unique DAG ID across all DAGs. One practice I have seen is to keep the DAG ID the same as the file name, which you can do with:
If you follow this practice, writing a test case for it would be best.
- If you share your Airflow environment with multiple teams, testing whether there are owners defined in the DAG definition is a good practice.
- If you are using the convenient way of sending email using the
email_on_failureDAG argument, checking whether the field is present will be helpful.
Unit tests ensure that each smallest part of the workflow is tested in isolation, which in DAG’s case is mostly operators and sensors. You can also assert the number of tasks in the DAG, verify relationship between the tasks, and validate input and output of the tasks. If there are custom sensors or operators, you can verify the correctness of the custom operators/sensors functionality.
Because the unit tests are done in isolation, when there are connections to the external services—for example, the DAG is performing a task in Amazon Elastic Kubernetes Service (Amazon EKS) or getting data from an Amazon Redshift cluster—you can use mock libraries to mock the external services. The Python
unittest framework provides a mock library called
unittest.mock with which you can create mocks or stubs and assert input arguments or return values to make sure that your code is integrating with the external service correctly.
Identifying changes to requirements file and plugins
Airflow has a built-in plugin manager that allows you to drop an external component to customize an installation. With Amazon MWAA, the plugin is packaged as a ZIP file and dropped into the
@AIRFLOW_HOME/plugins folder. The requirements file allows you to install Python dependencies that are not included in the Apache Airflow base install of your MWAA. Both the plugins and requirements file are required to have version numbers associated if they are configured in the MWAA environment.
Whenever the plugins or requirements files are updated in the Amazon S3 bucket, the new version ID of the files must be updated in the MWAA environment for the changes to take effect. This environment update takes 20–30 minutes. Thus, incorporating testing in your deployment to get faster feedback on issues and triggering the update only if the requirements or plugins are changed is important.
As I highlighted previously, we should not instigate an MWAA environment update for DAG changes. We must identify whether the plugins and requirements files are changed from the last time the build successfully ran. The high-level steps follow, and you can find them in
- Using the
git diff-treecommand along with a previously successfully built commit ID, find out whether there are changes in the plugins or requirements files.
- If there are changes, copy the files to an Amazon S3 bucket and get the version ID of the new file.
- If there are no changes to the files, get the version using GetEnvironment API of MWAA.
- Update the versions in the parameter file that is passed to
Amazon MWAA takes away the operational responsibility so you can focus on building workflows. By following guidelines described in this post to automate deploying your DAGs to an Amazon MWAA environment, you can build quality DAGs and deploy them faster.
In this post, we explained how to build an automated CI/CD pipeline to accelerate deploying workflows, and we described how to automate tasks, including updates to plugins and requirements files. You can deploy the CI/CD pipeline and related code to your own AWS account using instructions found in the GitHub repository.