AWS Big Data Blog

Introducing Amazon MWAA support for Apache Airflow version 2.8.1

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it straightforward to set up and operate end-to-end data pipelines in the cloud.

Organizations use Amazon MWAA to enhance their business workflows. For example, C2i Genomics uses Amazon MWAA in their data platform to orchestrate the validation of algorithms processing cancer genomics data in billions of records. Twitch, a live streaming platform, manages and orchestrates the training and deployment of its recommendation models for over 140 million active users. They use Amazon MWAA to scale, while significantly improving security and reducing infrastructure management overhead.

Today, we are announcing the availability of Apache Airflow version 2.8.1 environments on Amazon MWAA. In this post, we walk you through some of the new features and capabilities of Airflow now available in Amazon MWAA, and how you can set up or upgrade your Amazon MWAA environment to version 2.8.1.

Object storage

As data pipelines scale, engineers struggle to manage storage across multiple systems with unique APIs, authentication methods, and conventions for accessing data, requiring custom logic and storage-specific operators. Airflow now offers a unified object storage abstraction layer that handles these details, letting engineers focus on their data pipelines. Airflow object storage uses fsspec to enable consistent data access code across different object storage systems, thereby streamlining infrastructure complexity.

The following are some of the feature’s key benefits:

  • Portable workflows – You can switch storage services with minimal changes in your Directed Acyclic Graphs (DAGs)
  • Efficient data transfers – You can stream data instead of loading into memory
  • Reduced maintenance – You don’t need separate operators, making your pipelines straightforward to maintain
  • Familiar programming experience – You can use Python modules, like shutil, for file operations

To use object storage with Amazon Simple Storage Service (Amazon S3), you need to install the package extra s3fs with the Amazon provider (apache-airflow-providers-amazon[s3fs]==x.x.x).

In the sample code below, you can see how to move data directly from Google Cloud Storage to Amazon S3. Because Airflow’s object storage uses shutil.copyfileobj, the objects’ data is read in chunks from gcs_data_source and streamed to amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

For more information on Airflow object storage, refer to Object Storage.

XCom UI

XCom (cross-communications) allows for the passing of data between tasks, facilitating communication and coordination between them. Previously, developers had to switch to a diffferent view to see XComs related to a task. With Airflow 2.8, XCom key-values are rendered directly on a tab within the Airflow Grid view, as shown in the following screenshot.

The new XCom tab provides the following benefits:

  • Improved XCom visibility – A dedicated tab in the UI provides a convenient and user-friendly way to see all XComs associated with a DAG or task.
  • Improved debugging – Being able to see XCom values directly in the UI is helpful for debugging DAGs. You can quickly see the output of upstream tasks without needing to manually pull and inspect them using Python code.

Task context logger

Managing task lifecycles is crucial for the smooth operation of data pipelines in Airflow. However, certain challenges have persisted, particularly in scenarios where tasks are unexpectedly stopped. This can occur due to various reasons, including scheduler timeouts, zombie tasks (tasks that remain in a running state without sending heartbeats), or instances where the worker runs out of memory.

Traditionally, such failures, particularly those triggered by core Airflow components like the scheduler or executor, weren’t recorded within the task logs. This limitation required users to troubleshoot outside the Airflow UI, complicating the process of pinpointing and resolving issues.

Airflow 2.8 introduced a significant improvement that addresses this problem. Airflow components, including the scheduler and executor, can now use the new TaskContextLogger to forward error messages directly to the task logs. This feature allows you to see all the relevant error messages related to a task’s run in one place. This simplifies the process of figuring out why a task failed, offering a complete perspective of what went wrong within a single log view.

The following screenshot shows how the task is detected as zombie, and the scheduler log is being included as part of the task log.

You need to set the environment configuration parameter enable_task_context_logger to True, to enable the feature. Once it’s enabled, Airflow can ship logs from the scheduler, the executor, or callback run context to the task logs, and make them available in the Airflow UI.

Listener hooks for datasets

Datasets were introduced in Airflow 2.4 as a logical grouping of data sources to create data-aware scheduling and dependencies between DAGs. For example, you can schedule a consumer DAG to run when a producer DAG updates a dataset. Listeners enable Airflow users to create subscriptions to certain events happening in the environment. In Airflow 2.8, listeners are added for two datasets events: on_dataset_created and on_dataset_changed, effectively allowing Airflow users to write custom code to react to dataset management operations. For example, you can trigger an external system, or send a notification.

Using listener hooks for datasets is straightforward. Complete the following steps to create a listener for on_dataset_changed:

  1. Create the listener (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following custom code is executed when a dataset is changed."""
        print("Invoking external endpoint")
    
        """Validating a specific dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute specific/different action for this dataset")
  2. Create a plugin to register the listener in your Airflow environment (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        name = "dataset_listener_plugin"
        listeners = [dataset_listener]

For more information on how to install plugins in Amazon MWAA, refer to Installing custom plugins.

Set up a new Airflow 2.8.1 environment in Amazon MWAA

You can initiate the setup in your account and preferred Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts.

Upon successful creation of an Airflow version 2.8.1 environment in Amazon MWAA, certain packages are automatically installed on the scheduler and worker nodes. For a complete list of installed packages and their versions, refer to Apache Airflow provider packages installed on Amazon MWAA environments. You can install additional packages using a requirements file.

Upgrade from older versions of Airflow to version 2.8.1

You can take advantage of these latest capabilities by upgrading your older Airflow version 2.x-based environments to version 2.8.1 using in-place version upgrades. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Conclusion

In this post, we discussed some important features introduced in Airflow version 2.8, such as object storage, the new XCom tab added to the grid view, task context logging, listener hooks for datasets, and how you can start using them. We also provided some sample code to show implementations in Amazon MWAA. For the complete list of changes, refer to Airflow’s release notes.

For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.


About the Authors

Mansi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well-architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work, she enjoys experimenting with food, playing pickleball, and diving into fun board games.

Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.