AWS Compute Blog

Orchestrating dependent file uploads with AWS Step Functions

This post is written by Nelson Assis, Enterprise Support Lead, Serverless and Jevon Liburd, Technical Account Manager, Serverless

Amazon S3 is an object storage service that many customers use for file storage. With the use of Amazon S3 Event Notifications or Amazon EventBridge customers can create workloads with event-driven architecture (EDA). This architecture responds to events produced when changes occur to objects in S3 buckets.

EDA involves asynchronous communication between system components. This serves to decouple the components allowing each component to be autonomous.

Some scenarios may introduce coupling in the architecture due to dependency between events. This blog post presents a common example of this coupling and how it can be handled using AWS Step Functions.

Overview

In this example, an organization has two distributed autonomous teams, the Sales team and the Warehouse team. Each team is responsible for uploading a monthly data file to an S3 bucket so it can be processed.

The files generate events when they are uploaded, initiating downstream processes. The processing of the Warehouse file cleans the data and joins it with data from the Shipping team. The processing of the Sales file correlates the data with the combined Warehouse and Shipping data. This enables analysts to perform forecasting and gain other insights.

For this correlation to happen, the Warehouse file must be processed before the Sales file. As the two teams are autonomous, there is no coordination among the teams. This means that the files can be uploaded at any time with no assurance that the Warehouse file is processed before the Sales file.

For scenarios like these, the Aggregator pattern can be used. The pattern collects and stores the events, and triggers a new event based on the combined events. In the described scenario, the combined events are the processed Warehouse file and the uploaded Sales file.

The requirements of the aggregator pattern are:

  1. Correlation – A way to group the related events. This is fulfilled by a unique identifier in the file name.
  2. Event aggregator – A stateful store for the events.
  3. Completion check and trigger – A condition when the combined events have been received and a way to publish the resulting event.

Architecture overview

The architecture uses the following AWS services:

  1. File upload: The Sales and Warehouse teams upload their respective files to S3.
  2. EventBridge: The ObjectCreated event is sent to EventBridge where there is a rule with a target of the main workflow.
  3. Main state machine: This state machine orchestrates the aggregator operations and the processing of the files. It encapsulates the workflows for each file to separate the aggregator logic from the files’ workflow logic.
  4. File parser and correlation: The business logic to identify the file and its type is run in this Lambda function.
  5. Stateful store: A DynamoDB table stores information about the file such as the name, type, and processing status. The state machine reads from and writes to the DynamoDB table. Task tokens are also stored in this table.
  6. File processing: Depending on the file type and any pre-conditions, state machines corresponding to the file type are run. These state machines contain the logic to process the specific file.
  7. Task Token & Callback: The task token is generated when the dependent file tries to be processed before the independent file. The Step Functions “Wait for a Callback” pattern continues the execution of the dependent file after the independent file is processed.

Walkthrough

You need the following prerequisites:

  • AWS CLI and AWS SAM CLI installed.
  • An AWS account.
  • Sufficient permissions to manage the AWS resources.
  • Git installed.

To deploy the example, follow the instructions in the GitHub repo.

This walkthrough shows what happens if the dependent file (Sales file) is uploaded before the independent one (Warehouse file).

  1. The workflow starts with the uploading of the Sales file to the dedicated Sales S3 bucket. The example uses separate S3 buckets for the two files as it assumes that the Sales and Warehouse teams are distributed and autonomous. You can find sample files in the code repository.
  2. Uploading the file to S3 sends an event to EventBridge, which the aggregator state machine acts on. The event pattern used in the EventBridge rule is:
    {
      "detail-type": ["Object Created"],
      "source": ["aws.s3"],
      "detail": {
        "bucket": {
          "name": ["sales-mfu-eda-09092023", "warehouse-mfu-eda-09092023"]
        },
        "reason": ["PutObject"]
      }
    }
  3. The aggregator state machine starts by invoking the file parser Lambda function. This function parses the file type and uses the identifier to correlate the files. In this example, the name of the file contains the file type and the correlation identifier (the year_month). To use other ways of representing the file type and correlation identifier, you can modify this function to parse that information.
  4. The next step in the state machine inserts a record for the event in the event aggregator DynamoDB table. The table has a composite primary key with the correlation identifier as the partition key and the file type as the sort key. The processing status of the file is tracked to give feedback on the state of the workflow.
  5. Based on the file type, the state machine determines which branch to follow. In the example, the Sales branch is run. The state machine tries to get the status of the (dependent) Warehouse file from DynamoDB using the correlation identifier. Using the result of this query, the state machine determines if the corresponding Warehouse file has already been processed.
  6. Since the Warehouse file is not processed yet, the waitForTaskToken integration pattern is used. The state machine waits at this step and creates a task token, which the external services use to trigger the state machine to continue its execution. The Sales record in the DynamoDB table is updated with the Task Token.
  7. Navigate to the S3 console and upload the sample Warehouse file to the Warehouse S3 bucket. This invokes a new instance of the Step Functions workflow, which flows through the other branch after the file type choice step. In this branch, the Warehouse state machine is run and the processing status of the file is updated in DynamoDB.

When the status of the Warehouse file is changed to “Completed”, the Warehouse state machine checks DynamoDB for a pending Sales file. If there is one, it retrieves the task token and calls the SendTaskSuccess method. This triggers the Sales state machine, which is in a waiting state to continue. The Sales state machine is started and the processing status is updated.

Conclusion

This blog post shows how to handle file dependencies in event driven architectures. You can customize the sample provided in the code repository for your own use case.

This solution is specific to file dependencies in event driven architectures. For more information on solving event dependencies and aggregators read the blog post: Moving to event-driven architectures with serverless event aggregators.

To learn more about event driven architectures, visit the event driven architecture section on Serverless Land.